-
Notifications
You must be signed in to change notification settings - Fork 14.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add CLI check for scheduler (#14519)
Co-authored-by: Kamil Breguła <[email protected]> (cherry picked from commit f25ec33)
- Loading branch information
Showing
7 changed files
with
299 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
from typing import List | ||
|
||
from airflow.jobs.base_job import BaseJob | ||
from airflow.utils.session import provide_session | ||
from airflow.utils.state import State | ||
|
||
|
||
@provide_session | ||
def check(args, session=None): | ||
"""Checks if job(s) are still alive""" | ||
if args.allow_multiple and not args.limit > 1: | ||
raise SystemExit("To use option --allow-multiple, you must set the limit to a value greater than 1.") | ||
query = ( | ||
session.query(BaseJob) | ||
.filter(BaseJob.state == State.RUNNING) | ||
.order_by(BaseJob.latest_heartbeat.desc()) | ||
) | ||
if args.job_type: | ||
query = query.filter(BaseJob.job_type == args.job_type) | ||
if args.hostname: | ||
query = query.filter(BaseJob.hostname == args.hostname) | ||
if args.limit > 0: | ||
query = query.limit(args.limit) | ||
|
||
jobs: List[BaseJob] = query.all() | ||
alive_jobs = [job for job in jobs if job.is_alive()] | ||
|
||
count_alive_jobs = len(alive_jobs) | ||
if count_alive_jobs == 0: | ||
raise SystemExit("No alive jobs found.") | ||
if count_alive_jobs > 1 and not args.allow_multiple: | ||
raise SystemExit(f"Found {count_alive_jobs} alive jobs. Expected only one.") | ||
if count_alive_jobs == 1: | ||
print("Found one alive job.") | ||
else: | ||
print(f"Found {count_alive_jobs} alive jobs.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
import contextlib | ||
import io | ||
import unittest | ||
|
||
import pytest | ||
|
||
from airflow.cli import cli_parser | ||
from airflow.cli.commands import jobs_command | ||
from airflow.jobs.scheduler_job import SchedulerJob | ||
from airflow.utils.session import create_session | ||
from airflow.utils.state import State | ||
from tests.test_utils.db import clear_db_jobs | ||
|
||
|
||
class TestCliConfigList(unittest.TestCase): | ||
@classmethod | ||
def setUpClass(cls): | ||
cls.parser = cli_parser.get_parser() | ||
|
||
def setUp(self) -> None: | ||
clear_db_jobs() | ||
|
||
def tearDown(self) -> None: | ||
clear_db_jobs() | ||
|
||
def test_should_report_success_for_one_working_scheduler(self): | ||
with create_session() as session: | ||
job = SchedulerJob() | ||
job.state = State.RUNNING | ||
session.add(job) | ||
session.commit() | ||
job.heartbeat() | ||
|
||
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: | ||
jobs_command.check(self.parser.parse_args(['jobs', 'check', '--job-type', 'SchedulerJob'])) | ||
self.assertIn("Found one alive job.", temp_stdout.getvalue()) | ||
|
||
def test_should_report_success_for_one_working_scheduler_with_hostname(self): | ||
with create_session() as session: | ||
job = SchedulerJob() | ||
job.state = State.RUNNING | ||
job.hostname = 'HOSTNAME' | ||
session.add(job) | ||
session.commit() | ||
job.heartbeat() | ||
|
||
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: | ||
jobs_command.check( | ||
self.parser.parse_args( | ||
['jobs', 'check', '--job-type', 'SchedulerJob', '--hostname', 'HOSTNAME'] | ||
) | ||
) | ||
self.assertIn("Found one alive job.", temp_stdout.getvalue()) | ||
|
||
def test_should_report_success_for_ha_schedulers(self): | ||
with create_session() as session: | ||
for _ in range(3): | ||
job = SchedulerJob() | ||
job.state = State.RUNNING | ||
session.add(job) | ||
session.commit() | ||
job.heartbeat() | ||
|
||
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: | ||
jobs_command.check( | ||
self.parser.parse_args( | ||
['jobs', 'check', '--job-type', 'SchedulerJob', '--limit', '100', '--allow-multiple'] | ||
) | ||
) | ||
self.assertIn("Found 3 alive jobs.", temp_stdout.getvalue()) | ||
|
||
def test_should_ignore_not_running_jobs(self): | ||
with create_session() as session: | ||
for _ in range(3): | ||
job = SchedulerJob() | ||
job.state = State.SHUTDOWN | ||
session.add(job) | ||
session.commit() | ||
# No alive jobs found. | ||
with pytest.raises(SystemExit, match=r"No alive jobs found."): | ||
jobs_command.check(self.parser.parse_args(['jobs', 'check'])) | ||
|
||
def test_should_raise_exception_for_multiple_scheduler_on_one_host(self): | ||
with create_session() as session: | ||
for _ in range(3): | ||
job = SchedulerJob() | ||
job.state = State.RUNNING | ||
job.hostname = 'HOSTNAME' | ||
session.add(job) | ||
session.commit() | ||
job.heartbeat() | ||
|
||
with pytest.raises(SystemExit, match=r"Found 3 alive jobs. Expected only one."): | ||
jobs_command.check( | ||
self.parser.parse_args( | ||
[ | ||
'jobs', | ||
'check', | ||
'--job-type', | ||
'SchedulerJob', | ||
'--limit', | ||
'100', | ||
] | ||
) | ||
) | ||
|
||
def test_should_raise_exception_for_allow_multiple_and_limit_1(self): | ||
with pytest.raises( | ||
SystemExit, | ||
match=r"To use option --allow-multiple, you must set the limit to a value greater than 1.", | ||
): | ||
jobs_command.check(self.parser.parse_args(['jobs', 'check', '--allow-multiple'])) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters