From a6eaa908bf302286c4454ced957f7b09c4ae15b6 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 27 Feb 2025 16:28:20 +1300 Subject: [PATCH 1/2] User subproc pool timeout on foregrounded handlers. --- cylc/flow/subprocpool.py | 10 +++++++-- cylc/flow/workflow_events.py | 42 ++++++++++++++++++++---------------- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/cylc/flow/subprocpool.py b/cylc/flow/subprocpool.py index 5af8b897ac1..80c19b7fb8d 100644 --- a/cylc/flow/subprocpool.py +++ b/cylc/flow/subprocpool.py @@ -365,16 +365,22 @@ def put_command( ) @classmethod - def run_command(cls, ctx): + def run_command(cls, ctx, timeout: Optional[float] = None): """Execute command in ctx and capture its output and exit status. + Raises subprocess.TimeoutExpired (via subprocess.communicate) if + the command gets killed for exceeding a given timeout. + Arguments: ctx (cylc.flow.subprocctx.SubProcContext): A context object containing the command to run and its status. + timeout: + Timeout in seconds, after which to kill the command. """ proc = cls._run_command_init(ctx) if proc: - ctx.out, ctx.err = (f.decode() for f in proc.communicate()) + ctx.out, ctx.err = ( + f.decode() for f in proc.communicate(timeout=timeout)) ctx.ret_code = proc.wait() cls._run_command_exit(ctx) diff --git a/cylc/flow/workflow_events.py b/cylc/flow/workflow_events.py index 2b99ef3d2cf..d097599a513 100644 --- a/cylc/flow/workflow_events.py +++ b/cylc/flow/workflow_events.py @@ -18,6 +18,7 @@ from enum import Enum import os from shlex import quote +from subprocess import TimeoutExpired from typing import Any, Dict, List, Union, TYPE_CHECKING from cylc.flow import LOG @@ -222,6 +223,8 @@ class WorkflowEventHandler(): def __init__(self, proc_pool): self.proc_pool = proc_pool + self.proc_timeout = ( + glbl_cfg().get(['scheduler', 'process pool timeout'])) @staticmethod def get_events_conf( @@ -301,14 +304,26 @@ def _send_mail( env=env, stdin_str=message ) - if self.proc_pool.closed: - # Run command in foreground if process pool is closed - self.proc_pool.run_command(proc_ctx) - self._run_event_handlers_callback(proc_ctx) + self._run_cmd(proc_ctx, callback=self._run_event_mail_callback) + + def _run_cmd(self, ctx, callback): + """Queue or directly run a command and its callback. + + Queue the command to the subprocess pool if possible, or otherwise + run it in the foreground but subject to the subprocess pool timeout. + + """ + if not self.proc_pool.closed: + # Queue it to the subprocess pool. + self.proc_pool.put_command(ctx, callback=callback) else: - # Run command using process pool otherwise - self.proc_pool.put_command( - proc_ctx, callback=self._run_event_mail_callback) + # Run it in the foreground, but use the subprocess pool timeout. + try: + self.proc_pool.run_command(ctx, float(self.proc_timeout)) + except TimeoutExpired: + ctx.ret_code = -9 + ctx.err = f"killed on timeout ({self.proc_timeout})" + callback(ctx) def _run_event_custom_handlers(self, schd, template_variables, event): """Helper for "run_event_handlers", custom event handlers.""" @@ -348,23 +363,14 @@ def _run_event_custom_handlers(self, schd, template_variables, event): env=dict(os.environ), shell=True # nosec (designed to run user defined code) ) - if self.proc_pool.closed: - # Run command in foreground if abort on failure is set or if - # process pool is closed - self.proc_pool.run_command(proc_ctx) - self._run_event_handlers_callback(proc_ctx) - else: - # Run command using process pool otherwise - self.proc_pool.put_command( - proc_ctx, callback=self._run_event_handlers_callback) + self._run_cmd(proc_ctx, self._run_event_handlers_callback) @staticmethod def _run_event_handlers_callback(proc_ctx): """Callback on completion of a workflow event handler.""" if proc_ctx.ret_code: - msg = '%s EVENT HANDLER FAILED' % proc_ctx.cmd_key[1] LOG.error(str(proc_ctx)) - LOG.error(msg) + LOG.error(f'{proc_ctx.cmd_key[1]} EVENT HANDLER FAILED') else: LOG.info(str(proc_ctx)) From 4beafcdea468a5bde5f594252c7f06e62b8ac55c Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 27 Feb 2025 16:41:22 +1300 Subject: [PATCH 2/2] Add a change log entry. --- changes.d/6639.fix.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes.d/6639.fix.md diff --git a/changes.d/6639.fix.md b/changes.d/6639.fix.md new file mode 100644 index 00000000000..b59f390d9db --- /dev/null +++ b/changes.d/6639.fix.md @@ -0,0 +1 @@ +Ensure that shutdown event handlers are killed if they exceed the process pool timeout.