Skip to content

Commit

Permalink
Default to unpause dag on trigger,materialize,backfill (apache#47627)
Browse files Browse the repository at this point in the history
* Default to unpause dag on trigger,materialize,backfill

* Explicitly set isPaused to false on modals
  • Loading branch information
bbovenzi authored Mar 11, 2025
1 parent 5dae3b5 commit eebdf4e
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 50 deletions.
6 changes: 1 addition & 5 deletions airflow/ui/src/components/Graph/DagNode.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,7 @@ export const DagNode = ({
>
<HStack alignItems="center" justifyContent="space-between">
<DagIcon />
<TogglePause
dagId={dag?.dag_id ?? label}
disabled={!Boolean(dag)}
isPaused={dag?.is_paused ?? false}
/>
<TogglePause dagId={dag?.dag_id ?? label} disabled={!Boolean(dag)} isPaused={dag?.is_paused} />
</HStack>
<Link asChild color="fg.info" mb={2}>
<RouterLink to={`/dags/${dag?.dag_id ?? label}`}>{dag?.dag_display_name ?? label}</RouterLink>
Expand Down
17 changes: 17 additions & 0 deletions airflow/ui/src/components/Menu/RunBackfillForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { Alert, Button } from "src/components/ui";
import { reprocessBehaviors } from "src/constants/reprocessBehaviourParams";
import { useCreateBackfill } from "src/queries/useCreateBackfill";
import { useCreateBackfillDryRun } from "src/queries/useCreateBackfillDryRun";
import { useTogglePause } from "src/queries/useTogglePause";

import { ErrorAlert } from "../ErrorAlert";
import { Checkbox } from "../ui/Checkbox";
Expand All @@ -38,6 +39,7 @@ const today = new Date().toISOString().slice(0, 16);

const RunBackfillForm = ({ dag, onClose }: RunBackfillFormProps) => {
const [errors, setErrors] = useState<{ conf?: string; date?: unknown }>({});
const [unpause, setUnpause] = useState(true);

const { control, handleSubmit, reset, watch } = useForm<BackfillPostBody>({
defaultValues: {
Expand Down Expand Up @@ -69,6 +71,8 @@ const RunBackfillForm = ({ dag, onClose }: RunBackfillFormProps) => {
},
});

const { mutate: togglePause } = useTogglePause({ dagId: dag.dag_id });

const { createBackfill, dateValidationError, error, isPending } = useCreateBackfill({
onSuccessConfirm: onClose,
});
Expand All @@ -83,6 +87,14 @@ const RunBackfillForm = ({ dag, onClose }: RunBackfillFormProps) => {
const dataIntervalEnd = watch("to_date");

const onSubmit = (fdata: BackfillPostBody) => {
if (unpause && dag.is_paused) {
togglePause({
dagId: dag.dag_id,
requestBody: {
is_paused: false,
},
});
}
createBackfill({
requestBody: fdata,
});
Expand Down Expand Up @@ -195,6 +207,11 @@ const RunBackfillForm = ({ dag, onClose }: RunBackfillFormProps) => {
<Alert>{affectedTasks.total_entries} runs will be triggered</Alert>
) : undefined}
</VStack>
{dag.is_paused ? (
<Checkbox checked={unpause} colorPalette="blue" onChange={() => setUnpause(!unpause)}>
Unpause {dag.dag_display_name} on trigger
</Checkbox>
) : undefined}
<ErrorAlert error={errors.date ?? error} />
<Box as="footer" display="flex" justifyContent="flex-end" mt={4}>
<HStack w="full">
Expand Down
44 changes: 12 additions & 32 deletions airflow/ui/src/components/TogglePause.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,61 +17,35 @@
* under the License.
*/
import { useDisclosure } from "@chakra-ui/react";
import { useQueryClient } from "@tanstack/react-query";
import { useCallback } from "react";

import {
UseDagRunServiceGetDagRunsKeyFn,
UseDagServiceGetDagDetailsKeyFn,
UseDagServiceGetDagKeyFn,
useDagServiceGetDagsKey,
useDagServicePatchDag,
useDagsServiceRecentDagRunsKey,
UseTaskInstanceServiceGetTaskInstancesKeyFn,
} from "openapi/queries";
import { useConfig } from "src/queries/useConfig";
import { useTogglePause } from "src/queries/useTogglePause";

import { ConfirmationModal } from "./ConfirmationModal";
import { Switch, type SwitchProps } from "./ui";

type Props = {
readonly dagDisplayName?: string;
readonly dagId: string;
readonly isPaused: boolean;
readonly isPaused?: boolean;
readonly skipConfirm?: boolean;
} & SwitchProps;

export const TogglePause = ({ dagDisplayName, dagId, isPaused, skipConfirm, ...rest }: Props) => {
const queryClient = useQueryClient();
const { onClose, onOpen, open } = useDisclosure();

const onSuccess = async () => {
const queryKeys = [
[useDagServiceGetDagsKey],
[useDagsServiceRecentDagRunsKey],
UseDagServiceGetDagKeyFn({ dagId }, [{ dagId }]),
UseDagServiceGetDagDetailsKeyFn({ dagId }, [{ dagId }]),
UseDagRunServiceGetDagRunsKeyFn({ dagId }, [{ dagId }]),
UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId: "~" }, [{ dagId, dagRunId: "~" }]),
];

await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })));
};

const { mutate } = useDagServicePatchDag({
onSuccess,
});

const { mutate: togglePause } = useTogglePause({ dagId });
const showConfirmation = Boolean(useConfig("require_confirmation_dag_change"));

const onToggle = useCallback(() => {
mutate({
togglePause({
dagId,
requestBody: {
is_paused: !isPaused,
},
});
}, [dagId, isPaused, mutate]);
}, [dagId, isPaused, togglePause]);

const onChange = () => {
if (showConfirmation && skipConfirm !== true) {
Expand All @@ -83,7 +57,13 @@ export const TogglePause = ({ dagDisplayName, dagId, isPaused, skipConfirm, ...r

return (
<>
<Switch checked={!isPaused} colorPalette="blue" onCheckedChange={onChange} size="sm" {...rest} />
<Switch
checked={isPaused === undefined ? undefined : !isPaused}
colorPalette="blue"
onCheckedChange={onChange}
size="sm"
{...rest}
/>
<ConfirmationModal
header={`${isPaused ? "Unpause" : "Pause"} ${dagDisplayName ?? dagId}?`}
onConfirm={onToggle}
Expand Down
21 changes: 20 additions & 1 deletion airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@ import { useForm, Controller } from "react-hook-form";
import { FiPlay } from "react-icons/fi";

import { useDagParams } from "src/queries/useDagParams";
import { useTogglePause } from "src/queries/useTogglePause";
import { useTrigger } from "src/queries/useTrigger";

import { ErrorAlert } from "../ErrorAlert";
import { FlexibleForm, flexibleFormDefaultSection } from "../FlexibleForm";
import { JsonEditor } from "../JsonEditor";
import { Accordion } from "../ui";
import { Checkbox } from "../ui/Checkbox";
import EditableMarkdown from "./EditableMarkdown";
import { useParamStore } from "./useParamStore";

type TriggerDAGFormProps = {
readonly dagId: string;
readonly isPaused: boolean;
readonly onClose: () => void;
readonly open: boolean;
};
Expand All @@ -44,11 +47,14 @@ export type DagRunTriggerParams = {
note: string;
};

const TriggerDAGForm = ({ dagId, onClose, open }: TriggerDAGFormProps) => {
const TriggerDAGForm = ({ dagId, isPaused, onClose, open }: TriggerDAGFormProps) => {
const [errors, setErrors] = useState<{ conf?: string; date?: unknown }>({});
const initialParamsDict = useDagParams(dagId, open);
const { error: errorTrigger, isPending, triggerDagRun } = useTrigger({ dagId, onSuccessConfirm: onClose });
const { conf, setConf } = useParamStore();
const [unpause, setUnpause] = useState(true);

const { mutate: togglePause } = useTogglePause({ dagId });

const { control, handleSubmit, reset } = useForm<DagRunTriggerParams>({
defaultValues: {
Expand All @@ -67,6 +73,14 @@ const TriggerDAGForm = ({ dagId, onClose, open }: TriggerDAGFormProps) => {
}, [conf, reset]);

const onSubmit = (data: DagRunTriggerParams) => {
if (unpause && isPaused) {
togglePause({
dagId,
requestBody: {
is_paused: false,
},
});
}
triggerDagRun(data);
};

Expand Down Expand Up @@ -186,6 +200,11 @@ const TriggerDAGForm = ({ dagId, onClose, open }: TriggerDAGFormProps) => {
</Accordion.ItemContent>
</Accordion.Item>
</Accordion.Root>
{isPaused ? (
<Checkbox checked={unpause} colorPalette="blue" onChange={() => setUnpause(!unpause)}>
Unpause {dagId} on trigger
</Checkbox>
) : undefined}
<ErrorAlert error={errors.date ?? errorTrigger} />
<Box as="footer" display="flex" justifyContent="flex-end" mt={4}>
<HStack w="full">
Expand Down
14 changes: 3 additions & 11 deletions airflow/ui/src/components/TriggerDag/TriggerDAGModal.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
import { Heading, VStack } from "@chakra-ui/react";
import React from "react";

import { Alert, Dialog } from "src/components/ui";
import { Dialog } from "src/components/ui";

import { TogglePause } from "../TogglePause";
import TriggerDAGForm from "./TriggerDAGForm";

type TriggerDAGModalProps = {
Expand All @@ -43,21 +42,14 @@ const TriggerDAGModal: React.FC<TriggerDAGModalProps> = ({
<Dialog.Content backdrop>
<Dialog.Header paddingBottom={0}>
<VStack align="start" gap={4}>
<Heading size="xl">
Trigger Dag - {dagDisplayName} <TogglePause dagId={dagId} isPaused={isPaused} skipConfirm />
</Heading>
{isPaused ? (
<Alert status="warning" title="Paused DAG">
Triggering will create a DAG run, but it will not start until the Dag is unpaused.
</Alert>
) : undefined}
<Heading size="xl">Trigger Dag - {dagDisplayName}</Heading>
</VStack>
</Dialog.Header>

<Dialog.CloseTrigger />

<Dialog.Body>
<TriggerDAGForm dagId={dagId} onClose={onClose} open={open} />
<TriggerDAGForm dagId={dagId} isPaused={isPaused} onClose={onClose} open={open} />
</Dialog.Body>
</Dialog.Content>
</Dialog.Root>
Expand Down
25 changes: 24 additions & 1 deletion airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
UseAssetServiceGetAssetEventsKeyFn,
useAssetServiceMaterializeAsset,
UseDagRunServiceGetDagRunsKeyFn,
useDagServiceGetDagDetails,
useDagsServiceRecentDagRunsKey,
useDependenciesServiceGetDependencies,
UseGridServiceGridDataKeyFn,
Expand All @@ -40,7 +41,9 @@ import type {
import { ErrorAlert } from "src/components/ErrorAlert";
import { JsonEditor } from "src/components/JsonEditor";
import { Dialog, toaster } from "src/components/ui";
import { Checkbox } from "src/components/ui/Checkbox";
import { RadioCardItem, RadioCardRoot } from "src/components/ui/RadioCard";
import { useTogglePause } from "src/queries/useTogglePause";

type Props = {
readonly asset: AssetResponse;
Expand All @@ -51,6 +54,7 @@ type Props = {
export const CreateAssetEventModal = ({ asset, onClose, open }: Props) => {
const [eventType, setEventType] = useState("manual");
const [extraError, setExtraError] = useState<string | undefined>();
const [unpause, setUnpause] = useState(true);
const [extra, setExtra] = useState("{}");
const queryClient = useQueryClient();

Expand Down Expand Up @@ -118,6 +122,12 @@ export const CreateAssetEventModal = ({ asset, onClose, open }: Props) => {
await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })));
};

const { data: dag } = useDagServiceGetDagDetails({ dagId: upstreamDagId ?? "" }, undefined, {
enabled: Boolean(upstreamDagId),
});

const { mutate: togglePause } = useTogglePause({ dagId: dag?.dag_id ?? upstreamDagId ?? "" });

const {
error: manualError,
isPending,
Expand All @@ -133,6 +143,14 @@ export const CreateAssetEventModal = ({ asset, onClose, open }: Props) => {

const handleSubmit = () => {
if (eventType === "materialize") {
if (unpause && dag?.is_paused) {
togglePause({
dagId: dag.dag_id,
requestBody: {
is_paused: false,
},
});
}
materializeAsset({ assetId: asset.id });
} else {
createAssetEvent({
Expand Down Expand Up @@ -162,7 +180,7 @@ export const CreateAssetEventModal = ({ asset, onClose, open }: Props) => {
>
<HStack align="stretch">
<RadioCardItem
description={`Trigger the Dag upstream of this asset${upstreamDagId === undefined ? "" : `: ${upstreamDagId}`}`}
description={`Trigger the Dag upstream of this asset${upstreamDagId === undefined ? "" : `: ${dag?.dag_display_name ?? upstreamDagId}`}`}
disabled={!hasUpstreamDag}
label="Materialize"
value="materialize"
Expand All @@ -177,6 +195,11 @@ export const CreateAssetEventModal = ({ asset, onClose, open }: Props) => {
<Text color="fg.error">{extraError}</Text>
</Field.Root>
) : undefined}
{eventType === "materialize" && dag?.is_paused ? (
<Checkbox checked={unpause} colorPalette="blue" onChange={() => setUnpause(!unpause)}>
Unpause {dag.dag_display_name} on trigger
</Checkbox>
) : undefined}
<ErrorAlert error={eventType === "manual" ? manualError : materializeError} />
</Dialog.Body>
<Dialog.Footer>
Expand Down
50 changes: 50 additions & 0 deletions airflow/ui/src/queries/useTogglePause.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*!
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { useQueryClient } from "@tanstack/react-query";

import {
UseDagRunServiceGetDagRunsKeyFn,
UseDagServiceGetDagDetailsKeyFn,
UseDagServiceGetDagKeyFn,
useDagServiceGetDagsKey,
useDagServicePatchDag,
useDagsServiceRecentDagRunsKey,
UseTaskInstanceServiceGetTaskInstancesKeyFn,
} from "openapi/queries";

export const useTogglePause = ({ dagId }: { dagId: string }) => {
const queryClient = useQueryClient();

const onSuccess = async () => {
const queryKeys = [
[useDagServiceGetDagsKey],
[useDagsServiceRecentDagRunsKey],
UseDagServiceGetDagKeyFn({ dagId }, [{ dagId }]),
UseDagServiceGetDagDetailsKeyFn({ dagId }, [{ dagId }]),
UseDagRunServiceGetDagRunsKeyFn({ dagId }, [{ dagId }]),
UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId: "~" }, [{ dagId, dagRunId: "~" }]),
];

await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })));
};

return useDagServicePatchDag({
onSuccess,
});
};

0 comments on commit eebdf4e

Please sign in to comment.