Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates Runs Page to work with Child Workflows #3264

Merged
merged 3 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions backend/services/mgmt/v1alpha1/job-service/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/nucleuscloud/neosync/internal/ee/rbac"
nucleuserrors "github.com/nucleuscloud/neosync/internal/errors"
"github.com/nucleuscloud/neosync/internal/neosyncdb"
tablesync_workflow "github.com/nucleuscloud/neosync/worker/pkg/workflows/tablesync/workflow"
"go.temporal.io/api/enums/v1"
temporalclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
Expand Down Expand Up @@ -234,6 +235,81 @@ func (s *Service) GetJobRunEvents(
isRunComplete = true
case enums.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
isRunComplete = true

case enums.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED:
isRunComplete = true
case enums.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED:
activityOrder = append(activityOrder, event.GetEventId())
attributes := event.GetStartChildWorkflowExecutionInitiatedEventAttributes()
jobRunEvent := &mgmtv1alpha1.JobRunEvent{
Id: event.EventId,
Type: attributes.GetWorkflowType().GetName(),
StartTime: event.EventTime,
Tasks: []*mgmtv1alpha1.JobRunEventTask{
dtomaps.ToJobRunEventTaskDto(event, nil),
},
}
if len(attributes.Input.Payloads) > 0 {
if attributes.GetWorkflowType().GetName() == "TableSync" {
var tableSyncRequest tablesync_workflow.TableSyncRequest
err := converter.GetDefaultDataConverter().FromPayload(attributes.Input.Payloads[0], &tableSyncRequest)
if err != nil {
logger.Error(fmt.Errorf("unable to convert to event input payload: %w", err).Error())
}

metadata := &mgmtv1alpha1.JobRunEventMetadata{}
metadata.Metadata = &mgmtv1alpha1.JobRunEventMetadata_SyncMetadata{
SyncMetadata: &mgmtv1alpha1.JobRunSyncMetadata{
Schema: tableSyncRequest.TableSchema,
Table: tableSyncRequest.TableName,
},
}

jobRunEvent.Metadata = metadata
}
}
activityMap[event.EventId] = jobRunEvent

case enums.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED:
attributes := event.GetChildWorkflowExecutionStartedEventAttributes()
activity := activityMap[attributes.InitiatedEventId]
activity.Tasks = append(activity.Tasks, dtomaps.ToJobRunEventTaskDto(event, nil))

case enums.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED:
attributes := event.GetChildWorkflowExecutionCompletedEventAttributes()
activity := activityMap[attributes.InitiatedEventId]
activity.CloseTime = event.EventTime
activity.Tasks = append(activity.Tasks, dtomaps.ToJobRunEventTaskDto(event, nil))

case enums.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED:
attributes := event.GetChildWorkflowExecutionFailedEventAttributes()
activity := activityMap[attributes.InitiatedEventId]
activity.CloseTime = event.EventTime
errorDto := dtomaps.ToJobRunEventTaskErrorDto(attributes.Failure, attributes.RetryState)
activity.Tasks = append(activity.Tasks, dtomaps.ToJobRunEventTaskDto(event, errorDto))

isRunComplete = true

case enums.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT:
attributes := event.GetChildWorkflowExecutionTimedOutEventAttributes()
activity := activityMap[attributes.InitiatedEventId]
activity.CloseTime = event.EventTime
activity.Tasks = append(activity.Tasks, dtomaps.ToJobRunEventTaskDto(event, nil))
isRunComplete = true

case enums.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED:
attributes := event.GetChildWorkflowExecutionCanceledEventAttributes()
activity := activityMap[attributes.InitiatedEventId]
activity.CloseTime = event.EventTime
activity.Tasks = append(activity.Tasks, dtomaps.ToJobRunEventTaskDto(event, nil))
isRunComplete = true

case enums.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED:
attributes := event.GetChildWorkflowExecutionTerminatedEventAttributes()
activity := activityMap[attributes.InitiatedEventId]
activity.CloseTime = event.EventTime
activity.Tasks = append(activity.Tasks, dtomaps.ToJobRunEventTaskDto(event, nil))
isRunComplete = true
default:
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ export function getColumns(props: GetColumnsProps): ColumnDef<JobRunEvent>[] {
),
cell: ({ row }) => {
const scheduledTime = row.original.tasks.find(
(item) => item.type == 'ActivityTaskScheduled'
(item) =>
item.type == 'ActivityTaskScheduled' ||
item.type == 'StartChildWorkflowExecutionInitiated'
)?.eventTime;
return (
<div className="flex space-x-2">
Expand Down Expand Up @@ -71,7 +73,9 @@ export function getColumns(props: GetColumnsProps): ColumnDef<JobRunEvent>[] {
),
cell: ({ row }) => {
const closeTime = row.original.tasks.find(
(item) => item.type == 'ActivityTaskCompleted'
(item) =>
item.type == 'ActivityTaskCompleted' ||
item.type == 'ChildWorkflowExecutionCompleted'
)?.eventTime;

return (
Expand Down
60 changes: 48 additions & 12 deletions frontend/apps/web/components/RunTImeline/RunTimeline.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ interface Props {
const expandedRowHeight = 165;
const defaultRowHeight = 40;

type RunStatus = 'running' | 'completed' | 'failed' | 'canceled';
type RunStatus = 'running' | 'completed' | 'failed' | 'canceled' | 'terminated';

export default function RunTimeline(props: Props): ReactElement {
const { tasks, jobStatus } = props;
Expand All @@ -58,6 +58,7 @@ export default function RunTimeline(props: Props): ReactElement {
'completed',
'failed',
'canceled',
'terminated',
]);

const { timelineStart, totalDuration, timeLabels } = useMemo(() => {
Expand All @@ -67,7 +68,9 @@ export default function RunTimeline(props: Props): ReactElement {

tasks.forEach((t) => {
const scheduled = t.tasks.find(
(st) => st.type == 'ActivityTaskScheduled'
(st) =>
st.type === 'ActivityTaskScheduled' ||
st.type === 'StartChildWorkflowExecutionInitiated'
)?.eventTime;
startTime = Math.min(
startTime,
Expand Down Expand Up @@ -226,7 +229,7 @@ function LeftActivityBar(props: LeftActivityBarProps): ReactElement {
>
<ActivityLabel
task={task}
getStatus={() => getTaskStatus(task, jobStatus)}
status={getTaskStatus(task, jobStatus)}
/>
</div>
);
Expand Down Expand Up @@ -262,7 +265,9 @@ function TimelineBar(props: TimelineBarProps) {
} = props;

const scheduled = task.tasks.find(
(st) => st.type == 'ActivityTaskScheduled'
(st) =>
st.type == 'ActivityTaskScheduled' ||
st.type == 'StartChildWorkflowExecutionInitiated'
)?.eventTime;

const failedTask = task.tasks.find((item) => item.error);
Expand All @@ -285,7 +290,7 @@ function TimelineBar(props: TimelineBarProps) {
className={cn(
status === 'failed'
? 'bg-red-400 dark:bg-red-700'
: status === 'canceled'
: status === 'canceled' || status === 'terminated'
? 'bg-yellow-400 dark:bg-yellow-700'
: 'bg-blue-500',
'absolute h-8 rounded hover:bg-opacity-80 cursor-pointer mx-6 flex items-center '
Expand Down Expand Up @@ -336,7 +341,9 @@ function TimelineBar(props: TimelineBarProps) {
<div className="flex flex-row gap-2 items-center justify-between w-full">
<strong>Finish:</strong>{' '}
<Badge variant="default" className="w-[180px]">
{status == 'failed' || status == 'canceled'
{status == 'failed' ||
status == 'canceled' ||
status == 'terminated'
? 'N/A'
: formatFullDate(endTime)}
</Badge>
Expand Down Expand Up @@ -407,7 +414,12 @@ function getCloseOrErrorOrCancelDate(task: JobRunEvent): Date {
const errorTask = task.tasks.find((item) => item.error);
const errorTime = errorTask ? errorTask.eventTime : undefined;
const cancelTime = task.tasks.find(
(t) => t.type === 'ActivityTaskCancelRequested'
(t) =>
t.type === 'ActivityTaskCancelRequested' ||
t.type === 'ActivityTaskCanceled' ||
t.type === 'ChildWorkflowExecutionCanceled' ||
t.type === 'ChildWorkflowExecutionTerminated' ||
t.type === 'ChildWorkflowExecutionTimedOut'
)?.eventTime;
return errorTime
? convertTimestampToDate(errorTime)
Expand All @@ -434,13 +446,11 @@ function isSyncActivity(task: JobRunEvent): boolean {

interface ActivityLabelProps {
task: JobRunEvent;
getStatus: () => RunStatus;
status: RunStatus;
}

// generates the activity label that we see on the left hand activity column
function ActivityLabel({ task, getStatus }: ActivityLabelProps) {
const status = getStatus();

function ActivityLabel({ task, status }: ActivityLabelProps) {
return (
<div className="flex flex-row items-center gap-2 overflow-hidden">
{task.id.toString()}.
Expand All @@ -461,6 +471,8 @@ function ActivityStatus({ status }: { status: RunStatus }) {
return <MinusCircledIcon className="text-yellow-500" />;
case 'running':
return <Spinner />;
case 'terminated':
return <CrossCircledIcon className="text-gray-500" />;
default:
return null;
}
Expand Down Expand Up @@ -509,6 +521,12 @@ function StatusFilter({ selectedStatuses, onStatusChange }: StatusFilterProps) {
>
Canceled
</DropdownMenuCheckboxItem>
<DropdownMenuCheckboxItem
checked={uniqueSelectedStatuses.has('terminated')}
onCheckedChange={(checked) => onStatusChange('terminated', checked)}
>
Terminated
</DropdownMenuCheckboxItem>
</DropdownMenuContent>
</DropdownMenu>
);
Expand Down Expand Up @@ -567,17 +585,25 @@ function getTaskStatus(
for (const t of task.tasks) {
switch (t.type) {
case 'ActivityTaskCompleted':
case 'ChildWorkflowExecutionCompleted':
isCompleted = true;
break;
case 'ActivityTaskFailed':
case 'ActivityTaskTimedOut':
case 'ActivityTaskCanceled':
case 'ActivityTaskTerminated':
case 'ChildWorkflowExecutionFailed':
case 'ChildWorkflowExecutionTimedOut':
case 'ChildWorkflowExecutionTerminated':
case 'StartChildWorkflowExecutionFailed':
isFailed = true;
break;
case 'ActivityTaskCancelRequested':
isCanceled = true;
break;
case 'ActivityTaskStarted':
case 'ActivityTaskScheduled':
case 'StartChildWorkflowExecutionInitiated':
break;
}

Expand All @@ -592,8 +618,9 @@ function getTaskStatus(
if (isFailed) return 'failed';

const isJobTerminated = jobStatus === JobRunStatus.TERMINATED;
if (isJobTerminated) return 'terminated';

if (isCanceled || (isJobTerminated && !isCompleted)) return 'canceled';
if (isCanceled || !isCompleted) return 'canceled';
return 'running';
}

Expand Down Expand Up @@ -654,17 +681,26 @@ function ExpandedRowBody(props: ExpandedRowBodyProps): ReactElement {
const getLabel = (type: string) => {
switch (type) {
case 'ActivityTaskScheduled':
case 'StartChildWorkflowExecutionInitiated':
return 'Scheduled';
case 'ActivityTaskStarted':
case 'ChildWorkflowExecutionStarted':
return 'Started';
case 'ActivityTaskCompleted':
case 'ChildWorkflowExecutionCompleted':
return 'Completed';
case 'ActivityTaskFailed':
case 'ChildWorkflowExecutionFailed':
case 'StartChildWorkflowExecutionFailed':
return 'Failed';
case 'ActivityTaskTimedOut':
case 'ChildWorkflowExecutionTimedOut':
return 'Timed Out';
case 'ActivityTaskCancelRequested':
return 'Cancel Requested';
case 'ActivityTaskTerminated':
case 'ChildWorkflowExecutionTerminated':
return 'Terminated';
default:
return type;
}
Expand Down
Loading