Skip to content

Commit 84ad390

Browse files
committed
Clean up task expire handling.
1 parent d1e6e1c commit 84ad390

File tree

6 files changed

+74
-67
lines changed

6 files changed

+74
-67
lines changed

cylc/flow/scheduler.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1759,7 +1759,7 @@ async def main_loop(self) -> None:
17591759
# (Could do this periodically?)
17601760
self.xtrigger_mgr.housekeep(self.pool.get_tasks())
17611761

1762-
self.pool.set_expired_tasks()
1762+
self.pool.clock_expire_tasks()
17631763
self.release_queued_tasks()
17641764

17651765
if self.pool.sim_time_check(self.message_queue):

cylc/flow/task_events_mgr.py

+33-7
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,13 @@
7171
TASK_STATUS_WAITING
7272
)
7373
from cylc.flow.task_outputs import (
74-
TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED, TASK_OUTPUT_SUCCEEDED,
75-
TASK_OUTPUT_FAILED, TASK_OUTPUT_SUBMIT_FAILED)
74+
TASK_OUTPUT_EXPIRED,
75+
TASK_OUTPUT_SUBMITTED,
76+
TASK_OUTPUT_STARTED,
77+
TASK_OUTPUT_SUCCEEDED,
78+
TASK_OUTPUT_FAILED,
79+
TASK_OUTPUT_SUBMIT_FAILED
80+
)
7681
from cylc.flow.wallclock import (
7782
get_current_time_string,
7883
get_seconds_as_interval_string as intvl_as_str
@@ -116,11 +121,15 @@ def log_task_job_activity(ctx, workflow, point, name, submit_num=None):
116121
try:
117122
with open(os.path.expandvars(job_activity_log), "ab") as handle:
118123
handle.write((ctx_str + '\n').encode())
119-
except IOError as exc:
120-
# This happens when there is no job directory, e.g. if job host
121-
# selection command causes an submission failure, there will be no job
122-
# directory. In this case, just send the information to the log.
123-
LOG.exception(exc)
124+
except IOError:
125+
# This happens when there is no job directory. E.g., if a job host
126+
# selection command causes a submission failure, or if a waiting task
127+
# expires before a job log directory is otherwise needed.
128+
# (Don't log the exception content, it looks like a bug).
129+
LOG.warning(
130+
f"There is no log directory for {point}/{name} job:{submit_num}"
131+
" so I'll just log the following activity."
132+
)
124133
LOG.info(ctx_str)
125134
if ctx.cmd and ctx.ret_code:
126135
LOG.error(ctx_str)
@@ -337,6 +346,7 @@ class TaskEventsManager():
337346
EVENT_RETRY = "retry"
338347
EVENT_STARTED = TASK_OUTPUT_STARTED
339348
EVENT_SUBMITTED = TASK_OUTPUT_SUBMITTED
349+
EVENT_EXPIRED = TASK_OUTPUT_EXPIRED
340350
EVENT_SUBMIT_FAILED = "submission failed"
341351
EVENT_SUBMIT_RETRY = "submission retry"
342352
EVENT_SUCCEEDED = TASK_OUTPUT_SUCCEEDED
@@ -638,6 +648,11 @@ def process_message(
638648
elif message == self.EVENT_SUCCEEDED:
639649
self._process_message_succeeded(itask, event_time)
640650
self.spawn_children(itask, TASK_OUTPUT_SUCCEEDED)
651+
652+
elif message == self.EVENT_EXPIRED:
653+
self._process_message_expired(itask, event_time)
654+
self.spawn_children(itask, TASK_OUTPUT_EXPIRED)
655+
641656
elif message == self.EVENT_FAILED:
642657
if (
643658
flag == self.FLAG_RECEIVED
@@ -647,6 +662,7 @@ def process_message(
647662
if self._process_message_failed(
648663
itask, event_time, self.JOB_FAILED):
649664
self.spawn_children(itask, TASK_OUTPUT_FAILED)
665+
650666
elif message == self.EVENT_SUBMIT_FAILED:
651667
if (
652668
flag == self.FLAG_RECEIVED
@@ -659,6 +675,7 @@ def process_message(
659675
submit_num
660676
):
661677
self.spawn_children(itask, TASK_OUTPUT_SUBMIT_FAILED)
678+
662679
elif message == self.EVENT_SUBMITTED:
663680
if (
664681
flag == self.FLAG_RECEIVED
@@ -1159,6 +1176,15 @@ def _process_message_started(self, itask, event_time):
11591176
if TimerFlags.SUBMISSION_RETRY in itask.try_timers:
11601177
itask.try_timers[TimerFlags.SUBMISSION_RETRY].num = 0
11611178

1179+
def _process_message_expired(self, itask, event_time):
1180+
"""Helper for process_message, handle task expiry."""
1181+
# state reset already done for expired
1182+
msg = 'Task expired: will not submit job.'
1183+
self.setup_event_handlers(itask, self.EVENT_EXPIRED, msg)
1184+
self.data_store_mgr.delta_task_state(itask)
1185+
# self.data_store_mgr.delta_task_held(itask) # ??
1186+
self._reset_job_timers(itask)
1187+
11621188
def _process_message_succeeded(self, itask, event_time):
11631189
"""Helper for process_message, handle a succeeded message."""
11641190

cylc/flow/task_pool.py

+7-56
Original file line numberDiff line numberDiff line change
@@ -1640,27 +1640,13 @@ def _set_outputs(self, point, taskdef, outputs, flow_nums, flow_wait):
16401640
# in the previous-submit log directory.
16411641

16421642
# convert labels to messages, to send to task events manager.
1643-
good = set()
16441643
for out in outputs:
16451644
msg = itask.state.outputs.get_msg(out)
16461645
if msg is None:
16471646
LOG.warning(f"{point}/{taskdef.name} has no output {out}")
16481647
else:
1649-
good.add(msg)
1650-
if not good:
1651-
# No valid outputs requested.
1652-
return
1653-
1654-
# Try to spawn children of the outputs.
1655-
for msg in good:
1656-
if msg == TASK_OUTPUT_EXPIRED:
1657-
# not caused by task messages
1658-
self._expire_task(itask)
1659-
self.spawn_on_output(itask, expired) ## TODO CONTINUE FROM pr 5412
1660-
else:
1648+
# Try to spawn children of this output.
16611649
self.task_events_mgr.process_message(itask, logging.INFO, msg)
1662-
# TODO remove this - just log the actual spawning events
1663-
LOG.info(f"[{itask}] Forced spawning on {msg}")
16641650

16651651
def _set_prereqs(self, point, taskdef, prereqs, flow_nums, flow_wait):
16661652
"""Set given prerequisites of a target task.
@@ -1877,48 +1863,13 @@ def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool:
18771863
sim_task_state_changed = True
18781864
return sim_task_state_changed
18791865

1880-
def set_expired_tasks(self):
1881-
res = False
1866+
def clock_expire_tasks(self):
1867+
"""Expire any tasks past their clock-expiry time."""
18821868
for itask in self.get_tasks():
1883-
if self._set_expired_task(itask):
1884-
res = True
1885-
return res
1886-
1887-
def _set_expired_task(self, itask):
1888-
"""Check if task has expired. Set state and event handler if so.
1889-
1890-
Return True if task has expired.
1891-
"""
1892-
if (
1893-
not itask.state(
1894-
TASK_STATUS_WAITING,
1895-
is_held=False
1896-
)
1897-
or itask.tdef.expiration_offset is None
1898-
):
1899-
return False
1900-
1901-
if itask.expire_time is None:
1902-
itask.expire_time = (
1903-
itask.get_point_as_seconds() +
1904-
itask.get_offset_as_seconds(itask.tdef.expiration_offset))
1905-
1906-
if (
1907-
time() > itask.expire_time
1908-
and itask.state_reset(TASK_STATUS_EXPIRED)
1909-
):
1910-
self._expire_task(itask)
1911-
return True
1912-
1913-
return False
1914-
1915-
def _expire_task(self, itask):
1916-
msg = 'Task expired: will not submit job.'
1917-
LOG.warning(f"[{itask}] {msg}")
1918-
self.task_events_mgr.setup_event_handlers(itask, "expired", msg)
1919-
self.data_store_mgr.delta_task_state(itask)
1920-
self.data_store_mgr.delta_task_held(itask)
1921-
self.spawn_on_output(itask, 'expired')
1869+
if not itask.clock_expire():
1870+
continue
1871+
self.task_events_mgr.process_message(
1872+
itask, logging.WARNING, TASK_OUTPUT_EXPIRED)
19221873

19231874
def task_succeeded(self, id_):
19241875
"""Return True if task with id_ is in the succeeded state."""

cylc/flow/task_proxy.py

+28-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from collections import Counter
2020
from copy import copy
2121
from fnmatch import fnmatchcase
22+
from time import time
2223
from typing import (
2324
Any, Callable, Dict, List, Set, Tuple, Optional, TYPE_CHECKING
2425
)
@@ -29,7 +30,11 @@
2930
from cylc.flow.id import Tokens
3031
from cylc.flow.platforms import get_platform
3132
from cylc.flow.task_action_timer import TimerFlags
32-
from cylc.flow.task_state import TaskState, TASK_STATUS_WAITING
33+
from cylc.flow.task_state import (
34+
TaskState,
35+
TASK_STATUS_WAITING,
36+
TASK_STATUS_EXPIRED
37+
)
3338
from cylc.flow.taskdef import generate_graph_children
3439
from cylc.flow.wallclock import get_unix_time_from_time_string as str2time
3540
from cylc.flow.cycling.iso8601 import (
@@ -248,16 +253,23 @@ def __init__(
248253
self.non_unique_events = Counter() # type: ignore # TODO: figure out
249254

250255
self.clock_trigger_time: Optional[float] = None
251-
self.expire_time: Optional[float] = None
252256
self.late_time: Optional[float] = None
253257
self.is_late = is_late
254258
self.waiting_on_job_prep = False
255259

256260
self.state = TaskState(tdef, self.point, status, is_held)
257261

258-
# Determine graph children of this task (for spawning).
259262
self.graph_children = generate_graph_children(tdef, self.point)
260263

264+
self.expire_time: Optional[float] = None
265+
if self.tdef.expiration_offset is not None:
266+
self.expire_time = (
267+
self.get_point_as_seconds() +
268+
self.get_offset_as_seconds(
269+
self.tdef.expiration_offset
270+
)
271+
)
272+
261273
def __repr__(self) -> str:
262274
return f"<{self.__class__.__name__} '{self.tokens}'>"
263275

@@ -483,3 +495,16 @@ def satisfy_me(self, prereqs) -> bool:
483495
for err in bad:
484496
LOG.warning(f"{self.identity} has no prerequisites {err}")
485497
return len(bad) == 0
498+
499+
def clock_expire(self) -> bool:
500+
"""Check for, and do, clock expiry. Return True if expired."""
501+
502+
if (
503+
self.expire_time is None # expiry not configured
504+
or self.state(TASK_STATUS_EXPIRED) # already expired
505+
or time() < self.expire_time # not time yet
506+
):
507+
return False
508+
509+
self.state.reset(TASK_STATUS_EXPIRED)
510+
return True

cylc/flow/taskdef.py

+4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from cylc.flow.exceptions import TaskDefError
2424
from cylc.flow.task_id import TaskID
2525
from cylc.flow.task_state import (
26+
TASK_OUTPUT_EXPIRED,
2627
TASK_OUTPUT_SUBMITTED,
2728
TASK_OUTPUT_SUBMIT_FAILED,
2829
TASK_OUTPUT_SUCCEEDED,
@@ -212,6 +213,9 @@ def tweak_outputs(self):
212213
):
213214
self.set_required_output(TASK_OUTPUT_SUCCEEDED, True)
214215

216+
# Expired must be optional
217+
self.set_required_output(TASK_OUTPUT_EXPIRED, False)
218+
215219
# In Cylc 7 back compat mode, make all success outputs required.
216220
if cylc.flow.flags.cylc7_back_compat:
217221
for output in [

tests/unit/test_id_match.py

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ def _task_proxy(id_, hier):
4343
hier.append('root')
4444
tdef = create_autospec(TaskDef, namespace_hierarchy=hier)
4545
tdef.name = tokens['task']
46+
tdef.expiration_offset = None
4647
return TaskProxy(
4748
Tokens('~user/workflow'),
4849
tdef,

0 commit comments

Comments
 (0)