Skip to content

Commit 31ba41e

Browse files
authored
AIP-84 Refactor Filter Query Parameters (apache#43947)
* Add FilterParam and type_filter_param_factory * Refactor Get Event Logs with filter_param_factory * Refactor add type option for filter_param_factory * Fix Get Event Logs with latest paginated_select * Refactor Get Assets Event * Refactor List Dag Warnings * Refactor DagRun related - QueryLastDagRunStateFilter - dag_ids of get_dag_stats * Remove unused parameters * Refactor on Dag parameters * Add any_equal to FilterParam * Refactor Task Instance * Fix Get Event Logs type * Fix after rebase * Refactor with search_param_factory * Refactor QueryLastDagRunStateFilter * Fix get_list_dag_runs_batch
1 parent d059d4a commit 31ba41e

File tree

13 files changed

+339
-524
lines changed

13 files changed

+339
-524
lines changed

airflow/api_fastapi/common/parameters.py

+191-406
Large diffs are not rendered by default.

airflow/api_fastapi/core_api/openapi/v1-generated.yaml

+23-23
Original file line numberDiff line numberDiff line change
@@ -2969,6 +2969,29 @@ paths:
29692969
description: Get all Event Logs.
29702970
operationId: get_event_logs
29712971
parameters:
2972+
- name: limit
2973+
in: query
2974+
required: false
2975+
schema:
2976+
type: integer
2977+
minimum: 0
2978+
default: 100
2979+
title: Limit
2980+
- name: offset
2981+
in: query
2982+
required: false
2983+
schema:
2984+
type: integer
2985+
minimum: 0
2986+
default: 0
2987+
title: Offset
2988+
- name: order_by
2989+
in: query
2990+
required: false
2991+
schema:
2992+
type: string
2993+
default: id
2994+
title: Order By
29722995
- name: dag_id
29732996
in: query
29742997
required: false
@@ -3063,29 +3086,6 @@ paths:
30633086
format: date-time
30643087
- type: 'null'
30653088
title: After
3066-
- name: limit
3067-
in: query
3068-
required: false
3069-
schema:
3070-
type: integer
3071-
minimum: 0
3072-
default: 100
3073-
title: Limit
3074-
- name: offset
3075-
in: query
3076-
required: false
3077-
schema:
3078-
type: integer
3079-
minimum: 0
3080-
default: 0
3081-
title: Offset
3082-
- name: order_by
3083-
in: query
3084-
required: false
3085-
schema:
3086-
type: string
3087-
default: id
3088-
title: Order By
30893089
responses:
30903090
'200':
30913091
description: Successful Response

airflow/api_fastapi/core_api/routes/public/assets.py

+17-10
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,14 @@
2626

2727
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
2828
from airflow.api_fastapi.common.parameters import (
29+
FilterParam,
2930
OptionalDateTimeQuery,
3031
QueryAssetDagIdPatternSearch,
31-
QueryAssetIdFilter,
3232
QueryLimit,
3333
QueryOffset,
34-
QuerySourceDagIdFilter,
35-
QuerySourceMapIndexFilter,
36-
QuerySourceRunIdFilter,
37-
QuerySourceTaskIdFilter,
3834
QueryUriPatternSearch,
3935
SortParam,
36+
filter_param_factory,
4037
)
4138
from airflow.api_fastapi.common.router import AirflowRouter
4239
from airflow.api_fastapi.core_api.datamodels.assets import (
@@ -135,11 +132,21 @@ def get_asset_events(
135132
).dynamic_depends("timestamp")
136133
),
137134
],
138-
asset_id: QueryAssetIdFilter,
139-
source_dag_id: QuerySourceDagIdFilter,
140-
source_task_id: QuerySourceTaskIdFilter,
141-
source_run_id: QuerySourceRunIdFilter,
142-
source_map_index: QuerySourceMapIndexFilter,
135+
asset_id: Annotated[
136+
FilterParam[int | None], Depends(filter_param_factory(AssetEvent.asset_id, int | None))
137+
],
138+
source_dag_id: Annotated[
139+
FilterParam[str | None], Depends(filter_param_factory(AssetEvent.source_dag_id, str | None))
140+
],
141+
source_task_id: Annotated[
142+
FilterParam[str | None], Depends(filter_param_factory(AssetEvent.source_task_id, str | None))
143+
],
144+
source_run_id: Annotated[
145+
FilterParam[str | None], Depends(filter_param_factory(AssetEvent.source_run_id, str | None))
146+
],
147+
source_map_index: Annotated[
148+
FilterParam[int | None], Depends(filter_param_factory(AssetEvent.source_map_index, int | None))
149+
],
143150
session: SessionDep,
144151
) -> AssetEventCollectionResponse:
145152
"""Get asset events."""

airflow/api_fastapi/core_api/routes/public/dag_run.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
)
3131
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
3232
from airflow.api_fastapi.common.parameters import (
33-
DagIdsFilter,
33+
FilterOptionEnum,
34+
FilterParam,
3435
LimitFilter,
3536
OffsetFilter,
3637
QueryDagRunStateFilter,
@@ -374,7 +375,7 @@ def get_list_dag_runs_batch(
374375
dag_id: Literal["~"], body: DAGRunsBatchBody, session: SessionDep
375376
) -> DAGRunCollectionResponse:
376377
"""Get a list of DAG Runs."""
377-
dag_ids = DagIdsFilter(DagRun, body.dag_ids)
378+
dag_ids = FilterParam(DagRun.dag_id, body.dag_ids, FilterOptionEnum.IN)
378379
logical_date = RangeFilter(
379380
Range(lower_bound=body.logical_date_gte, upper_bound=body.logical_date_lte),
380381
attribute=DagRun.logical_date,
@@ -387,8 +388,7 @@ def get_list_dag_runs_batch(
387388
Range(lower_bound=body.end_date_gte, upper_bound=body.end_date_lte),
388389
attribute=DagRun.end_date,
389390
)
390-
391-
state = QueryDagRunStateFilter(body.states)
391+
state = FilterParam(DagRun.state, body.states, FilterOptionEnum.ANY_EQUAL)
392392

393393
offset = OffsetFilter(body.page_offset)
394394
limit = LimitFilter(body.page_limit)

airflow/api_fastapi/core_api/routes/public/dag_stats.py

+13-3
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,28 @@
1717

1818
from __future__ import annotations
1919

20-
from fastapi import status
20+
from typing import Annotated
21+
22+
from fastapi import Depends, status
2123

2224
from airflow.api_fastapi.common.db.common import (
2325
SessionDep,
2426
paginated_select,
2527
)
2628
from airflow.api_fastapi.common.db.dag_runs import dagruns_select_with_state_count
27-
from airflow.api_fastapi.common.parameters import QueryDagIdsFilter
29+
from airflow.api_fastapi.common.parameters import (
30+
FilterOptionEnum,
31+
FilterParam,
32+
filter_param_factory,
33+
)
2834
from airflow.api_fastapi.common.router import AirflowRouter
2935
from airflow.api_fastapi.core_api.datamodels.dag_stats import (
3036
DagStatsCollectionResponse,
3137
DagStatsResponse,
3238
DagStatsStateResponse,
3339
)
3440
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
41+
from airflow.models.dagrun import DagRun
3542
from airflow.utils.state import DagRunState
3643

3744
dag_stats_router = AirflowRouter(tags=["DagStats"], prefix="/dagStats")
@@ -48,7 +55,10 @@
4855
)
4956
def get_dag_stats(
5057
session: SessionDep,
51-
dag_ids: QueryDagIdsFilter,
58+
dag_ids: Annotated[
59+
FilterParam[list[str]],
60+
Depends(filter_param_factory(DagRun.dag_id, list[str], FilterOptionEnum.IN, "dag_ids")),
61+
],
5262
) -> DagStatsCollectionResponse:
5363
"""Get Dag statistics."""
5464
dagruns_select, _ = paginated_select(

airflow/api_fastapi/core_api/routes/public/dag_warning.py

+8-5
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,17 @@
2727
paginated_select,
2828
)
2929
from airflow.api_fastapi.common.parameters import (
30-
QueryDagIdInDagWarningFilter,
30+
FilterParam,
3131
QueryLimit,
3232
QueryOffset,
33-
QueryWarningTypeFilter,
3433
SortParam,
34+
filter_param_factory,
3535
)
3636
from airflow.api_fastapi.common.router import AirflowRouter
3737
from airflow.api_fastapi.core_api.datamodels.dag_warning import (
3838
DAGWarningCollectionResponse,
3939
)
40-
from airflow.models import DagWarning
40+
from airflow.models.dagwarning import DagWarning, DagWarningType
4141

4242
dag_warning_router = AirflowRouter(tags=["DagWarning"])
4343

@@ -46,8 +46,11 @@
4646
"/dagWarnings",
4747
)
4848
def list_dag_warnings(
49-
dag_id: QueryDagIdInDagWarningFilter,
50-
warning_type: QueryWarningTypeFilter,
49+
dag_id: Annotated[FilterParam[str | None], Depends(filter_param_factory(DagWarning.dag_id, str | None))],
50+
warning_type: Annotated[
51+
FilterParam[DagWarningType | None],
52+
Depends(filter_param_factory(DagWarning.warning_type, DagWarningType | None)),
53+
],
5154
limit: QueryLimit,
5255
offset: QueryOffset,
5356
order_by: Annotated[

airflow/api_fastapi/core_api/routes/public/event_logs.py

+42-35
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,20 @@
1919
from datetime import datetime
2020
from typing import Annotated
2121

22-
from fastapi import Depends, HTTPException, Query, status
22+
from fastapi import Depends, HTTPException, status
2323
from sqlalchemy import select
2424

2525
from airflow.api_fastapi.common.db.common import (
2626
SessionDep,
2727
paginated_select,
2828
)
2929
from airflow.api_fastapi.common.parameters import (
30+
FilterOptionEnum,
31+
FilterParam,
3032
QueryLimit,
3133
QueryOffset,
3234
SortParam,
35+
filter_param_factory,
3336
)
3437
from airflow.api_fastapi.common.router import AirflowRouter
3538
from airflow.api_fastapi.core_api.datamodels.event_logs import (
@@ -83,46 +86,50 @@ def get_event_logs(
8386
).dynamic_depends()
8487
),
8588
],
86-
dag_id: str | None = None,
87-
task_id: str | None = None,
88-
run_id: str | None = None,
89-
map_index: int | None = None,
90-
try_number: int | None = None,
91-
owner: str | None = None,
92-
event: str | None = None,
93-
excluded_events: list[str] | None = Query(None),
94-
included_events: list[str] | None = Query(None),
95-
before: datetime | None = None,
96-
after: datetime | None = None,
89+
dag_id: Annotated[FilterParam[str | None], Depends(filter_param_factory(Log.dag_id, str | None))],
90+
task_id: Annotated[FilterParam[str | None], Depends(filter_param_factory(Log.task_id, str | None))],
91+
run_id: Annotated[FilterParam[str | None], Depends(filter_param_factory(Log.run_id, str | None))],
92+
map_index: Annotated[FilterParam[int | None], Depends(filter_param_factory(Log.map_index, int | None))],
93+
try_number: Annotated[FilterParam[int | None], Depends(filter_param_factory(Log.try_number, int | None))],
94+
owner: Annotated[FilterParam[str | None], Depends(filter_param_factory(Log.owner, str | None))],
95+
event: Annotated[FilterParam[str | None], Depends(filter_param_factory(Log.event, str | None))],
96+
excluded_events: Annotated[
97+
FilterParam[list[str] | None],
98+
Depends(
99+
filter_param_factory(Log.event, list[str] | None, FilterOptionEnum.NOT_IN, "excluded_events")
100+
),
101+
],
102+
included_events: Annotated[
103+
FilterParam[list[str] | None],
104+
Depends(filter_param_factory(Log.event, list[str] | None, FilterOptionEnum.IN, "included_events")),
105+
],
106+
before: Annotated[
107+
FilterParam[datetime | None],
108+
Depends(filter_param_factory(Log.dttm, datetime | None, FilterOptionEnum.LESS_THAN, "before")),
109+
],
110+
after: Annotated[
111+
FilterParam[datetime | None],
112+
Depends(filter_param_factory(Log.dttm, datetime | None, FilterOptionEnum.GREATER_THAN, "after")),
113+
],
97114
) -> EventLogCollectionResponse:
98115
"""Get all Event Logs."""
99116
query = select(Log).group_by(Log.id)
100-
# TODO: Refactor using the `FilterParam` class in commit `574b72e41cc5ed175a2bbf4356522589b836bb11`
101-
if dag_id is not None:
102-
query = query.where(Log.dag_id == dag_id)
103-
if task_id is not None:
104-
query = query.where(Log.task_id == task_id)
105-
if run_id is not None:
106-
query = query.where(Log.run_id == run_id)
107-
if map_index is not None:
108-
query = query.where(Log.map_index == map_index)
109-
if try_number is not None:
110-
query = query.where(Log.try_number == try_number)
111-
if owner is not None:
112-
query = query.where(Log.owner == owner)
113-
if event is not None:
114-
query = query.where(Log.event == event)
115-
if excluded_events is not None:
116-
query = query.where(Log.event.notin_(excluded_events))
117-
if included_events is not None:
118-
query = query.where(Log.event.in_(included_events))
119-
if before is not None:
120-
query = query.where(Log.dttm < before)
121-
if after is not None:
122-
query = query.where(Log.dttm > after)
123117
event_logs_select, total_entries = paginated_select(
124118
statement=query,
125119
order_by=order_by,
120+
filters=[
121+
dag_id,
122+
task_id,
123+
run_id,
124+
map_index,
125+
try_number,
126+
owner,
127+
event,
128+
excluded_events,
129+
included_events,
130+
before,
131+
after,
132+
],
126133
offset=offset,
127134
limit=limit,
128135
session=session,

airflow/api_fastapi/core_api/routes/public/job.py

+17-9
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,13 @@
2626
paginated_select,
2727
)
2828
from airflow.api_fastapi.common.parameters import (
29-
QueryJobExecutorClassFilter,
30-
QueryJobHostnameFilter,
31-
QueryJobStateFilter,
32-
QueryJobTypeFilter,
29+
FilterParam,
3330
QueryLimit,
3431
QueryOffset,
3532
RangeFilter,
3633
SortParam,
3734
datetime_range_filter_factory,
35+
filter_param_factory,
3836
)
3937
from airflow.api_fastapi.common.router import AirflowRouter
4038
from airflow.api_fastapi.core_api.datamodels.job import (
@@ -84,15 +82,25 @@ def get_jobs(
8482
),
8583
],
8684
session: SessionDep,
87-
state: QueryJobStateFilter,
88-
job_type: QueryJobTypeFilter,
89-
hostname: QueryJobHostnameFilter,
90-
executor_class: QueryJobExecutorClassFilter,
85+
state: Annotated[
86+
FilterParam[str | None], Depends(filter_param_factory(Job.state, str | None, filter_name="job_state"))
87+
],
88+
job_type: Annotated[
89+
FilterParam[str | None],
90+
Depends(filter_param_factory(Job.job_type, str | None, filter_name="job_type")),
91+
],
92+
hostname: Annotated[
93+
FilterParam[str | None],
94+
Depends(filter_param_factory(Job.hostname, str | None, filter_name="hostname")),
95+
],
96+
executor_class: Annotated[
97+
FilterParam[str | None],
98+
Depends(filter_param_factory(Job.executor_class, str | None, filter_name="executor_class")),
99+
],
91100
is_alive: bool | None = None,
92101
) -> JobCollectionResponse:
93102
"""Get all jobs."""
94103
base_select = select(Job).where(Job.state == JobState.RUNNING).order_by(Job.latest_heartbeat.desc())
95-
# TODO: Refactor using the `FilterParam` class in commit `574b72e41cc5ed175a2bbf4356522589b836bb11`
96104

97105
jobs_select, total_entries = paginated_select(
98106
statement=base_select,

0 commit comments

Comments
 (0)