Skip to content

Commit 43d7c1c

Browse files
authored
AIP-84 Migrate /object/grid_data from views to FastAPI (apache#44332)
* Include grid endpoint to FastAPI * Finalise the endpoint other than dynamically created tasks and address comments * Finalise the endpoint with dynamically mapped tasks and include unit test * Use SessionDep for session param * Create service/grid.py and move methods, adjust usage of SortParam, use SortParam rather than order_expressions, remove unnecessary version check, include additional checks for MappedTaskGroups, Move include upstream and downstream to parameters.py, remove unrelated comment and include task and dagrun notes * Fix unit test according to adjustments * Include missing MappedTaskGroup parent check to MappedOperator * Remove remaining version check * Move service/ to services/ui/, make sort_param unique for dag_run_sort_param, remove unreachable statement from parameters.py::_transform_ti_states, make include upstream/downstream Annotated optional and include new test for upstream/downstream * Fix SortParam creation * Revert changes in QueryLastDagRunStateFilter * include missing task_count to parent_node for recursive taskgroups, fix loop order for calculating overall_state, fix sorting and include user to pass it as parameter and include new test for order_by * Rename num_runs to limit for consistency, make base_date filter range query * Fix task_count and states for nested task_groups, add alias to run_id, change types of Pydantic models * Rebase and rerun pre-commit * Change GridTaskInstanceSummary state to TaskInstanceState object * Fix setting state in GridTaskInstanceSummary, change name states to child_states to prevent confusion * Select all model columns, move priority to paramteres as state_priority, decide the order_by with first element of timetable.run_ordering, add __or__ method to SortParam, calculate overall_ti_state after proper additions in child_states, add more test cases around order_by, limit, filtering such as logical_date_gte/lte, make test 3 depth nested task group * Fix run_type and state param not working due to naming and include remaining filter in unit test, make tests use params and parametrize test methods * Move SortParam not provided comparison logic to view * Remove forgotten code piece * Remove None from param definition, adjust the log of wrong DagRun type in parameters and test
1 parent 7002966 commit 43d7c1c

File tree

18 files changed

+4781
-16
lines changed

18 files changed

+4781
-16
lines changed

airflow/api_fastapi/common/parameters.py

+52
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
from airflow.typing_compat import Self
5454
from airflow.utils import timezone
5555
from airflow.utils.state import DagRunState, TaskInstanceState
56+
from airflow.utils.types import DagRunType
5657

5758
if TYPE_CHECKING:
5859
from sqlalchemy.sql import ColumnElement, Select
@@ -528,6 +529,32 @@ def _transform_dag_run_states(states: Iterable[str] | None) -> list[DagRunState
528529
),
529530
]
530531

532+
533+
def _transform_dag_run_types(types: list[str] | None) -> list[DagRunType | None] | None:
534+
try:
535+
if not types:
536+
return None
537+
return [None if run_type in ("none", None) else DagRunType(run_type) for run_type in types]
538+
except ValueError:
539+
raise HTTPException(
540+
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
541+
detail=f"Invalid value for run type. Valid values are {', '.join(DagRunType)}",
542+
)
543+
544+
545+
QueryDagRunRunTypesFilter = Annotated[
546+
FilterParam[list[str]],
547+
Depends(
548+
filter_param_factory(
549+
attribute=DagRun.run_type,
550+
_type=list[str],
551+
filter_option=FilterOptionEnum.ANY_EQUAL,
552+
default_factory=list,
553+
transform_callable=_transform_dag_run_types,
554+
)
555+
),
556+
]
557+
531558
# DAGTags
532559
QueryDagTagPatternSearch = Annotated[
533560
_SearchParam, Depends(search_param_factory(DagTag.name, "tag_name_pattern"))
@@ -601,3 +628,28 @@ def _transform_ti_states(states: list[str] | None) -> list[TaskInstanceState | N
601628
QueryVariableKeyPatternSearch = Annotated[
602629
_SearchParam, Depends(search_param_factory(Variable.key, "variable_key_pattern"))
603630
]
631+
632+
633+
# UI Shared
634+
def _optional_boolean(value: bool | None) -> bool | None:
635+
return value if value is not None else False
636+
637+
638+
QueryIncludeUpstream = Annotated[Union[bool], AfterValidator(_optional_boolean)]
639+
QueryIncludeDownstream = Annotated[Union[bool], AfterValidator(_optional_boolean)]
640+
641+
state_priority: list[None | TaskInstanceState] = [
642+
TaskInstanceState.FAILED,
643+
TaskInstanceState.UPSTREAM_FAILED,
644+
TaskInstanceState.UP_FOR_RETRY,
645+
TaskInstanceState.UP_FOR_RESCHEDULE,
646+
TaskInstanceState.QUEUED,
647+
TaskInstanceState.SCHEDULED,
648+
TaskInstanceState.DEFERRED,
649+
TaskInstanceState.RUNNING,
650+
TaskInstanceState.RESTARTING,
651+
None,
652+
TaskInstanceState.SUCCESS,
653+
TaskInstanceState.SKIPPED,
654+
TaskInstanceState.REMOVED,
655+
]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
from datetime import datetime
21+
from uuid import UUID
22+
23+
from pydantic import BaseModel, Field
24+
25+
from airflow.utils.state import DagRunState, TaskInstanceState
26+
from airflow.utils.types import DagRunType
27+
28+
29+
class GridTaskInstanceSummary(BaseModel):
30+
"""Task Instance Summary model for the Grid UI."""
31+
32+
task_id: str
33+
try_number: int
34+
start_date: datetime | None
35+
end_date: datetime | None
36+
queued_dttm: datetime | None
37+
child_states: dict[str, int] | None
38+
task_count: int
39+
state: TaskInstanceState | None
40+
note: str | None
41+
42+
43+
class GridDAGRunwithTIs(BaseModel):
44+
"""DAG Run model for the Grid UI."""
45+
46+
run_id: str = Field(serialization_alias="dag_run_id", validation_alias="run_id")
47+
queued_at: datetime | None
48+
start_date: datetime | None
49+
end_date: datetime | None
50+
state: DagRunState
51+
run_type: DagRunType
52+
data_interval_start: datetime | None
53+
data_interval_end: datetime | None
54+
version_number: UUID | None
55+
note: str | None
56+
task_instances: list[GridTaskInstanceSummary]
57+
58+
59+
class GridResponse(BaseModel):
60+
"""Response model for the Grid UI."""
61+
62+
dag_runs: list[GridDAGRunwithTIs]

0 commit comments

Comments
 (0)