Skip to content

Commit

Permalink
Add CLI check for scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj committed Feb 28, 2021
1 parent fff3444 commit 2af6f57
Show file tree
Hide file tree
Showing 7 changed files with 299 additions and 13 deletions.
76 changes: 67 additions & 9 deletions airflow/cli/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,21 @@ def add_to_parser(self, parser: argparse.ArgumentParser):
parser.add_argument(*self.flags, **self.kwargs)


def positive_int(value):
def positive_int(*, allow_zero):
"""Define a positive int type for an argument."""
try:
value = int(value)
if value > 0:
return value
except ValueError:
pass
raise argparse.ArgumentTypeError(f"invalid positive int value: '{value}'")

def _check(value):
try:
value = int(value)
if allow_zero and value == 0:
return value
if value > 0:
return value
except ValueError:
pass
raise argparse.ArgumentTypeError(f"invalid positive int value: '{value}'")

return _check


# Shared
Expand Down Expand Up @@ -204,7 +210,7 @@ def positive_int(value):
ARG_NUM_EXECUTIONS = Arg(
("-n", "--num-executions"),
default=1,
type=positive_int,
type=positive_int(allow_zero=False),
help="The number of next execution datetimes to show",
)

Expand Down Expand Up @@ -682,6 +688,34 @@ def positive_int(value):
help="Kubernetes Namespace",
)

# jobs check
ARG_JOB_TYPE_FILTER = Arg(
('--job-type',),
choices=('BackfillJob', 'LocalTaskJob', 'SchedulerJob'),
action='store',
help='The type of job(s) that will be checked.',
)

ARG_JOB_HOSTNAME_FILTER = Arg(
("--hostname",),
default=None,
type=str,
help="The hostname of job(s) that will be checked.",
)

ARG_JOB_LIMIT = Arg(
("--limit",),
default=1,
type=positive_int(allow_zero=True),
help="The number of recent jobs that will be checked. To disable limit, set 0. ",
)

ARG_ALLOW_MULTIPLE = Arg(
("--allow-multiple",),
action='store_true',
help="If passed, this command will be successful even if multiple matching alive jobs are found.",
)

ALTERNATIVE_CONN_SPECS_ARGS = [
ARG_CONN_TYPE,
ARG_CONN_DESCRIPTION,
Expand Down Expand Up @@ -1354,6 +1388,25 @@ class GroupCommand(NamedTuple):
),
)

JOBS_COMMANDS = (
ActionCommand(
name='check',
help="Checks if job(s) are still alive",
func=lazy_load_command('airflow.cli.commands.jobs_command.check'),
args=(ARG_JOB_TYPE_FILTER, ARG_JOB_HOSTNAME_FILTER, ARG_JOB_LIMIT, ARG_ALLOW_MULTIPLE),
epilog=(
'examples:\n'
'To check if the local scheduler is still working properly, run:\n'
'\n'
' $ airflow jobs check --job-type SchedulerJob --hostname "$(hostname)"\n'
'\n'
'To check if any scheduler is running when you are using high availability, run:\n'
'\n'
' $ airflow jobs check --job-type SchedulerJob --allow-multiple --limit 100'
),
),
)

airflow_commands: List[CLICommand] = [
GroupCommand(
name='dags',
Expand All @@ -1378,6 +1431,11 @@ class GroupCommand(NamedTuple):
help="Manage variables",
subcommands=VARIABLES_COMMANDS,
),
GroupCommand(
name='jobs',
help="Manage jobs",
subcommands=JOBS_COMMANDS,
),
GroupCommand(
name='db',
help="Database operations",
Expand Down
53 changes: 53 additions & 0 deletions airflow/cli/commands/jobs_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import List

from airflow.jobs.base_job import BaseJob
from airflow.utils.session import provide_session
from airflow.utils.state import State


@provide_session
def check(args, session=None):
"""Checks if job(s) are still alive"""
if args.allow_multiple and not args.limit > 1:
raise SystemExit("To use option --allow-multiple, you must set the limit to a value greater than 1.")
query = (
session.query(BaseJob)
.filter(BaseJob.state == State.RUNNING)
.order_by(BaseJob.latest_heartbeat.desc())
)
if args.job_type:
query = query.filter(BaseJob.job_type == args.job_type)
if args.hostname:
query = query.filter(BaseJob.hostname == args.hostname)
if args.limit > 0:
query = query.limit(args.limit)

jobs: List[BaseJob] = query.all()
alive_jobs = [job for job in jobs if job.is_alive()]

count_alive_jobs = len(alive_jobs)
if count_alive_jobs == 0:
raise SystemExit("No alive jobs found.")
if count_alive_jobs > 1 and not args.allow_multiple:
raise SystemExit(f"Found {count_alive_jobs} alive jobs. Expected only one.")
if count_alive_jobs == 1:
print("Found one alive job.")
else:
print(f"Found {count_alive_jobs} alive jobs.")
41 changes: 41 additions & 0 deletions docs/apache-airflow/logging-monitoring/check-health.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
Checking Airflow Health Status
==============================

Airflow has s two methods to check the health of components - HTTP checks and CLI checks. Their choice depends on the role of the component as well as what tools it uses to monitor the deployment.

Health Check Endpoint
---------------------

To check the health status of your Airflow instance, you can simply access the endpoint
``/health``. It will return a JSON object in which a high-level glance is provided.

Expand All @@ -45,6 +50,42 @@ To check the health status of your Airflow instance, you can simply access the e
considered unhealthy
* This threshold value can be specified using the option ``scheduler_health_check_threshold`` within the
``[scheduler]`` section in ``airflow.cfg``
* If you run more than one scheduler, only the state of one scheduler will be reported, i.e. only one working scheduler is enough
for the scheduler state to be considered healthy

Please keep in mind that the HTTP response code of ``/health`` endpoint **should not** be used to determine the health
status of the application. The return code is only indicative of the state of the rest call (200 for success).

.. note::

For this check to work, at least one working web server is required. Suppose you use this check for scheduler
monitoring, then in case of failure of the web server, you will lose the ability to monitor scheduler, which means
that it can be restarted even if it is in good condition. For greater confidence, consider using :ref:`CLI Check for Scheduler <check-health/cli-checks-for-scheduler>`.

.. _check-health/cli-checks-for-scheduler:

CLI Check for Scheduler
-----------------------

Scheduler creates an entry in the table :class:`airflow.jobs.base_job.BaseJob` with information about the host and
timestamp (heartbeat) at startup, and then updates it regularly. You can use this to check if the scheduler is
working correctly. To do this, you can use the ``airflow jobs checks`` command. On failure, the command will exit
with a non-zero error code.

To check if the local scheduler is still working properly, run:

.. code-block:: bash
airflow jobs check --job-type SchedulerJob --hostname "$(hostname)"
To check if any scheduler is running when you are using high availability, run:

.. code-block:: bash
airflow jobs check --job-type SchedulerJob --allow-multiple --limit 100
CLI Check for Database
----------------------

To verify that the database is working correctly, you can use the ``airflow db check`` command. On failure, the command will exit
with a non-zero error code.
5 changes: 5 additions & 0 deletions docs/apache-airflow/start/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ services:
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always

airflow-worker:
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/start/docker.rst
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ In the second terminal you can check the condition of the containers and make su
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
247ebe6cf87a apache/airflow:master-python3.8 "/usr/bin/dumb-init …" 3 minutes ago Up 3 minutes (healthy) 8080/tcp compose_airflow-worker_1
ed9b09fc84b1 apache/airflow:master-python3.8 "/usr/bin/dumb-init …" 3 minutes ago Up 3 minutes 8080/tcp compose_airflow-scheduler_1
ed9b09fc84b1 apache/airflow:master-python3.8 "/usr/bin/dumb-init …" 3 minutes ago Up 3 minutes (healthy) 8080/tcp compose_airflow-scheduler_1
65ac1da2c219 apache/airflow:master-python3.8 "/usr/bin/dumb-init …" 3 minutes ago Up 3 minutes (healthy) 0.0.0.0:5555->5555/tcp, 8080/tcp compose_flower_1
7cb1fb603a98 apache/airflow:master-python3.8 "/usr/bin/dumb-init …" 3 minutes ago Up 3 minutes (healthy) 0.0.0.0:8080->8080/tcp compose_airflow-webserver_1
74f3bbe506eb postgres:13 "docker-entrypoint.s…" 18 minutes ago Up 17 minutes (healthy) 5432/tcp compose_postgres_1
Expand Down
128 changes: 128 additions & 0 deletions tests/cli/commands/test_jobs_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import contextlib
import io
import unittest

import pytest

from airflow.cli import cli_parser
from airflow.cli.commands import jobs_command
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.utils.session import create_session
from airflow.utils.state import State
from tests.test_utils.db import clear_db_jobs


class TestCliConfigList(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.parser = cli_parser.get_parser()

def setUp(self) -> None:
clear_db_jobs()

def tearDown(self) -> None:
clear_db_jobs()

def test_should_report_success_for_one_working_scheduler(self):
with create_session() as session:
job = SchedulerJob()
job.state = State.RUNNING
session.add(job)
session.commit()
job.heartbeat()

with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
jobs_command.check(self.parser.parse_args(['jobs', 'check', '--job-type', 'SchedulerJob']))
self.assertIn("Found one alive job.", temp_stdout.getvalue())

def test_should_report_success_for_one_working_scheduler_with_hostname(self):
with create_session() as session:
job = SchedulerJob()
job.state = State.RUNNING
job.hostname = 'HOSTNAME'
session.add(job)
session.commit()
job.heartbeat()

with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
jobs_command.check(
self.parser.parse_args(
['jobs', 'check', '--job-type', 'SchedulerJob', '--hostname', 'HOSTNAME']
)
)
self.assertIn("Found one alive job.", temp_stdout.getvalue())

def test_should_report_success_for_ha_schedulers(self):
with create_session() as session:
for _ in range(3):
job = SchedulerJob()
job.state = State.RUNNING
session.add(job)
session.commit()
job.heartbeat()

with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
jobs_command.check(
self.parser.parse_args(
['jobs', 'check', '--job-type', 'SchedulerJob', '--limit', '100', '--allow-multiple']
)
)
self.assertIn("Found 3 alive jobs.", temp_stdout.getvalue())

def test_should_ignore_not_running_jobs(self):
with create_session() as session:
for _ in range(3):
job = SchedulerJob()
job.state = State.SHUTDOWN
session.add(job)
session.commit()
# No alive jobs found.
with pytest.raises(SystemExit, match=r"No alive jobs found."):
jobs_command.check(self.parser.parse_args(['jobs', 'check']))

def test_should_raise_exception_for_multiple_scheduler_on_one_host(self):
with create_session() as session:
for _ in range(3):
job = SchedulerJob()
job.state = State.RUNNING
job.hostname = 'HOSTNAME'
session.add(job)
session.commit()
job.heartbeat()

with pytest.raises(SystemExit, match=r"Found 3 alive jobs. Expected only one."):
jobs_command.check(
self.parser.parse_args(
[
'jobs',
'check',
'--job-type',
'SchedulerJob',
'--limit',
'100',
]
)
)

def test_should_raise_exception_for_allow_multiple_and_limit_1(self):
with pytest.raises(
SystemExit,
match=r"To use option --allow-multiple, you must set the limit to a value greater than 1.",
):
jobs_command.check(self.parser.parse_args(['jobs', 'check', '--allow-multiple']))
7 changes: 4 additions & 3 deletions tests/cli/test_cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ def test_should_display_help(self):
parser.parse_args([*cmd_args, '--help'])

def test_positive_int(self):
assert 1 == cli_parser.positive_int('1')
assert 1 == cli_parser.positive_int(allow_zero=True)('1')
assert 0 == cli_parser.positive_int(allow_zero=True)('0')

with pytest.raises(argparse.ArgumentTypeError):
cli_parser.positive_int('0')
cli_parser.positive_int('-1')
cli_parser.positive_int(allow_zero=False)('0')
cli_parser.positive_int(allow_zero=True)('-1')

0 comments on commit 2af6f57

Please sign in to comment.