diff --git a/backend/services/mgmt/v1alpha1/job-service/runs.go b/backend/services/mgmt/v1alpha1/job-service/runs.go index 529cbb0b99..eb35d685a1 100644 --- a/backend/services/mgmt/v1alpha1/job-service/runs.go +++ b/backend/services/mgmt/v1alpha1/job-service/runs.go @@ -22,6 +22,7 @@ import ( "github.com/nucleuscloud/neosync/internal/ee/rbac" nucleuserrors "github.com/nucleuscloud/neosync/internal/errors" "github.com/nucleuscloud/neosync/internal/neosyncdb" + tablesync_workflow "github.com/nucleuscloud/neosync/worker/pkg/workflows/tablesync/workflow" "go.temporal.io/api/enums/v1" temporalclient "go.temporal.io/sdk/client" "go.temporal.io/sdk/converter" @@ -234,6 +235,81 @@ func (s *Service) GetJobRunEvents( isRunComplete = true case enums.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT: isRunComplete = true + + case enums.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED: + isRunComplete = true + case enums.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED: + activityOrder = append(activityOrder, event.GetEventId()) + attributes := event.GetStartChildWorkflowExecutionInitiatedEventAttributes() + jobRunEvent := &mgmtv1alpha1.JobRunEvent{ + Id: event.EventId, + Type: attributes.GetWorkflowType().GetName(), + StartTime: event.EventTime, + Tasks: []*mgmtv1alpha1.JobRunEventTask{ + dtomaps.ToJobRunEventTaskDto(event, nil), + }, + } + if len(attributes.Input.Payloads) > 0 { + if attributes.GetWorkflowType().GetName() == "TableSync" { + var tableSyncRequest tablesync_workflow.TableSyncRequest + err := converter.GetDefaultDataConverter().FromPayload(attributes.Input.Payloads[0], &tableSyncRequest) + if err != nil { + logger.Error(fmt.Errorf("unable to convert to event input payload: %w", err).Error()) + } + + metadata := &mgmtv1alpha1.JobRunEventMetadata{} + metadata.Metadata = &mgmtv1alpha1.JobRunEventMetadata_SyncMetadata{ + SyncMetadata: &mgmtv1alpha1.JobRunSyncMetadata{ + Schema: tableSyncRequest.TableSchema, + Table: tableSyncRequest.TableName, + }, + } + + jobRunEvent.Metadata = metadata + } + } + activityMap[event.EventId] = jobRunEvent + + case enums.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED: + attributes := event.GetChildWorkflowExecutionStartedEventAttributes() + activity := activityMap[attributes.InitiatedEventId] + activity.Tasks = append(activity.Tasks, dtomaps.ToJobRunEventTaskDto(event, nil)) + + case enums.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED: + attributes := event.GetChildWorkflowExecutionCompletedEventAttributes() + activity := activityMap[attributes.InitiatedEventId] + activity.CloseTime = event.EventTime + activity.Tasks = append(activity.Tasks, dtomaps.ToJobRunEventTaskDto(event, nil)) + + case enums.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED: + attributes := event.GetChildWorkflowExecutionFailedEventAttributes() + activity := activityMap[attributes.InitiatedEventId] + activity.CloseTime = event.EventTime + errorDto := dtomaps.ToJobRunEventTaskErrorDto(attributes.Failure, attributes.RetryState) + activity.Tasks = append(activity.Tasks, dtomaps.ToJobRunEventTaskDto(event, errorDto)) + + isRunComplete = true + + case enums.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT: + attributes := event.GetChildWorkflowExecutionTimedOutEventAttributes() + activity := activityMap[attributes.InitiatedEventId] + activity.CloseTime = event.EventTime + activity.Tasks = append(activity.Tasks, dtomaps.ToJobRunEventTaskDto(event, nil)) + isRunComplete = true + + case enums.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED: + attributes := event.GetChildWorkflowExecutionCanceledEventAttributes() + activity := activityMap[attributes.InitiatedEventId] + activity.CloseTime = event.EventTime + activity.Tasks = append(activity.Tasks, dtomaps.ToJobRunEventTaskDto(event, nil)) + isRunComplete = true + + case enums.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED: + attributes := event.GetChildWorkflowExecutionTerminatedEventAttributes() + activity := activityMap[attributes.InitiatedEventId] + activity.CloseTime = event.EventTime + activity.Tasks = append(activity.Tasks, dtomaps.ToJobRunEventTaskDto(event, nil)) + isRunComplete = true default: } } diff --git a/frontend/apps/web/app/(mgmt)/[account]/runs/[id]/components/JobRunActivityTable/columns.tsx b/frontend/apps/web/app/(mgmt)/[account]/runs/[id]/components/JobRunActivityTable/columns.tsx index 3a538f1a13..57de81c3d1 100644 --- a/frontend/apps/web/app/(mgmt)/[account]/runs/[id]/components/JobRunActivityTable/columns.tsx +++ b/frontend/apps/web/app/(mgmt)/[account]/runs/[id]/components/JobRunActivityTable/columns.tsx @@ -35,7 +35,9 @@ export function getColumns(props: GetColumnsProps): ColumnDef[] { ), cell: ({ row }) => { const scheduledTime = row.original.tasks.find( - (item) => item.type == 'ActivityTaskScheduled' + (item) => + item.type == 'ActivityTaskScheduled' || + item.type == 'StartChildWorkflowExecutionInitiated' )?.eventTime; return (
@@ -71,7 +73,9 @@ export function getColumns(props: GetColumnsProps): ColumnDef[] { ), cell: ({ row }) => { const closeTime = row.original.tasks.find( - (item) => item.type == 'ActivityTaskCompleted' + (item) => + item.type == 'ActivityTaskCompleted' || + item.type == 'ChildWorkflowExecutionCompleted' )?.eventTime; return ( diff --git a/frontend/apps/web/components/RunTImeline/RunTimeline.tsx b/frontend/apps/web/components/RunTImeline/RunTimeline.tsx index 4d89cb816a..590b0939b1 100644 --- a/frontend/apps/web/components/RunTImeline/RunTimeline.tsx +++ b/frontend/apps/web/components/RunTImeline/RunTimeline.tsx @@ -48,7 +48,7 @@ interface Props { const expandedRowHeight = 165; const defaultRowHeight = 40; -type RunStatus = 'running' | 'completed' | 'failed' | 'canceled'; +type RunStatus = 'running' | 'completed' | 'failed' | 'canceled' | 'terminated'; export default function RunTimeline(props: Props): ReactElement { const { tasks, jobStatus } = props; @@ -58,6 +58,7 @@ export default function RunTimeline(props: Props): ReactElement { 'completed', 'failed', 'canceled', + 'terminated', ]); const { timelineStart, totalDuration, timeLabels } = useMemo(() => { @@ -67,7 +68,9 @@ export default function RunTimeline(props: Props): ReactElement { tasks.forEach((t) => { const scheduled = t.tasks.find( - (st) => st.type == 'ActivityTaskScheduled' + (st) => + st.type === 'ActivityTaskScheduled' || + st.type === 'StartChildWorkflowExecutionInitiated' )?.eventTime; startTime = Math.min( startTime, @@ -226,7 +229,7 @@ function LeftActivityBar(props: LeftActivityBarProps): ReactElement { > getTaskStatus(task, jobStatus)} + status={getTaskStatus(task, jobStatus)} />
); @@ -262,7 +265,9 @@ function TimelineBar(props: TimelineBarProps) { } = props; const scheduled = task.tasks.find( - (st) => st.type == 'ActivityTaskScheduled' + (st) => + st.type == 'ActivityTaskScheduled' || + st.type == 'StartChildWorkflowExecutionInitiated' )?.eventTime; const failedTask = task.tasks.find((item) => item.error); @@ -285,7 +290,7 @@ function TimelineBar(props: TimelineBarProps) { className={cn( status === 'failed' ? 'bg-red-400 dark:bg-red-700' - : status === 'canceled' + : status === 'canceled' || status === 'terminated' ? 'bg-yellow-400 dark:bg-yellow-700' : 'bg-blue-500', 'absolute h-8 rounded hover:bg-opacity-80 cursor-pointer mx-6 flex items-center ' @@ -336,7 +341,9 @@ function TimelineBar(props: TimelineBarProps) {
Finish:{' '} - {status == 'failed' || status == 'canceled' + {status == 'failed' || + status == 'canceled' || + status == 'terminated' ? 'N/A' : formatFullDate(endTime)} @@ -407,7 +414,12 @@ function getCloseOrErrorOrCancelDate(task: JobRunEvent): Date { const errorTask = task.tasks.find((item) => item.error); const errorTime = errorTask ? errorTask.eventTime : undefined; const cancelTime = task.tasks.find( - (t) => t.type === 'ActivityTaskCancelRequested' + (t) => + t.type === 'ActivityTaskCancelRequested' || + t.type === 'ActivityTaskCanceled' || + t.type === 'ChildWorkflowExecutionCanceled' || + t.type === 'ChildWorkflowExecutionTerminated' || + t.type === 'ChildWorkflowExecutionTimedOut' )?.eventTime; return errorTime ? convertTimestampToDate(errorTime) @@ -434,13 +446,11 @@ function isSyncActivity(task: JobRunEvent): boolean { interface ActivityLabelProps { task: JobRunEvent; - getStatus: () => RunStatus; + status: RunStatus; } // generates the activity label that we see on the left hand activity column -function ActivityLabel({ task, getStatus }: ActivityLabelProps) { - const status = getStatus(); - +function ActivityLabel({ task, status }: ActivityLabelProps) { return (
{task.id.toString()}. @@ -461,6 +471,8 @@ function ActivityStatus({ status }: { status: RunStatus }) { return ; case 'running': return ; + case 'terminated': + return ; default: return null; } @@ -509,6 +521,12 @@ function StatusFilter({ selectedStatuses, onStatusChange }: StatusFilterProps) { > Canceled + onStatusChange('terminated', checked)} + > + Terminated + ); @@ -567,10 +585,17 @@ function getTaskStatus( for (const t of task.tasks) { switch (t.type) { case 'ActivityTaskCompleted': + case 'ChildWorkflowExecutionCompleted': isCompleted = true; break; case 'ActivityTaskFailed': case 'ActivityTaskTimedOut': + case 'ActivityTaskCanceled': + case 'ActivityTaskTerminated': + case 'ChildWorkflowExecutionFailed': + case 'ChildWorkflowExecutionTimedOut': + case 'ChildWorkflowExecutionTerminated': + case 'StartChildWorkflowExecutionFailed': isFailed = true; break; case 'ActivityTaskCancelRequested': @@ -578,6 +603,7 @@ function getTaskStatus( break; case 'ActivityTaskStarted': case 'ActivityTaskScheduled': + case 'StartChildWorkflowExecutionInitiated': break; } @@ -592,8 +618,9 @@ function getTaskStatus( if (isFailed) return 'failed'; const isJobTerminated = jobStatus === JobRunStatus.TERMINATED; + if (isJobTerminated) return 'terminated'; - if (isCanceled || (isJobTerminated && !isCompleted)) return 'canceled'; + if (isCanceled || !isCompleted) return 'canceled'; return 'running'; } @@ -654,17 +681,26 @@ function ExpandedRowBody(props: ExpandedRowBodyProps): ReactElement { const getLabel = (type: string) => { switch (type) { case 'ActivityTaskScheduled': + case 'StartChildWorkflowExecutionInitiated': return 'Scheduled'; case 'ActivityTaskStarted': + case 'ChildWorkflowExecutionStarted': return 'Started'; case 'ActivityTaskCompleted': + case 'ChildWorkflowExecutionCompleted': return 'Completed'; case 'ActivityTaskFailed': + case 'ChildWorkflowExecutionFailed': + case 'StartChildWorkflowExecutionFailed': return 'Failed'; case 'ActivityTaskTimedOut': + case 'ChildWorkflowExecutionTimedOut': return 'Timed Out'; case 'ActivityTaskCancelRequested': return 'Cancel Requested'; + case 'ActivityTaskTerminated': + case 'ChildWorkflowExecutionTerminated': + return 'Terminated'; default: return type; }