Skip to content

Commit a9becc2

Browse files
authored
Merge pull request #5721 from wxtim/feature.sim_mode_at_runtime
Feature: Sim mode at runtime
2 parents ae5709d + 9186ad1 commit a9becc2

19 files changed

+732
-137
lines changed

changes.d/5721.feat.md

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Allow task simulation mode settings to be changed dynamically using `cylc broadcast`.

cylc/flow/broadcast_mgr.py

+14-1
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@
3030
from cylc.flow.cfgspec.workflow import SPEC
3131
from cylc.flow.cycling.loader import get_point, standardise_point_string
3232
from cylc.flow.exceptions import PointParsingError
33-
from cylc.flow.parsec.util import listjoin
33+
from cylc.flow.parsec.util import listjoin, pdeepcopy, poverride
3434
from cylc.flow.parsec.validate import BroadcastConfigValidator
3535

3636
if TYPE_CHECKING:
3737
from cylc.flow.id import Tokens
38+
from cylc.flow.task_proxy import TaskProxy
3839

3940

4041
ALL_CYCLE_POINTS_STRS = ["*", "all-cycle-points", "all-cycles"]
@@ -179,6 +180,18 @@ def get_broadcast(self, tokens: 'Optional[Tokens]' = None) -> dict:
179180
addict(ret, self.broadcasts[cycle][namespace])
180181
return ret
181182

183+
def get_updated_rtconfig(self, itask: 'TaskProxy') -> dict:
184+
"""Retrieve updated rtconfig for a single task proxy"""
185+
overrides = self.get_broadcast(
186+
itask.tokens
187+
)
188+
if overrides:
189+
rtconfig = pdeepcopy(itask.tdef.rtconfig)
190+
poverride(rtconfig, overrides, prepend=True)
191+
else:
192+
rtconfig = itask.tdef.rtconfig
193+
return rtconfig
194+
182195
def load_db_broadcast_states(self, row_idx, row):
183196
"""Load broadcast variables from runtime DB broadcast states row."""
184197
if row_idx == 0:

cylc/flow/cfgspec/workflow.py

+6
Original file line numberDiff line numberDiff line change
@@ -1288,6 +1288,12 @@ def get_script_common_text(this: str, example: Optional[str] = None):
12881288
12891289
Task instances must be set to fail by
12901290
:cylc:conf:`[..]fail cycle points`.
1291+
1292+
.. note::
1293+
1294+
This setting is designed for use with automatic
1295+
retries. Subsequent manual submissions will not
1296+
change the outcome of the task.
12911297
''')
12921298
Conf('disable task event handlers', VDR.V_BOOLEAN, True,
12931299
desc='''

cylc/flow/network/resolvers.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -779,7 +779,9 @@ def broadcast(
779779
cycle_points, namespaces, settings)
780780
if mode == 'clear_broadcast':
781781
return self.schd.task_events_mgr.broadcast_mgr.clear_broadcast(
782-
cycle_points, namespaces, settings)
782+
point_strings=cycle_points,
783+
namespaces=namespaces,
784+
cancel_settings=settings)
783785
if mode == 'expire_broadcast':
784786
return self.schd.task_events_mgr.broadcast_mgr.expire_broadcast(
785787
cutoff)

cylc/flow/rundb.py

+1
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ class CylcWorkflowDAO:
259259
["flow_nums"],
260260
["is_manual_submit", {"datatype": "INTEGER"}],
261261
["try_num", {"datatype": "INTEGER"}],
262+
# This is used to store simulation task start time across restarts.
262263
["time_submit"],
263264
["time_submit_exit"],
264265
["submit_status", {"datatype": "INTEGER"}],

cylc/flow/scheduler.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -1757,7 +1757,10 @@ async def main_loop(self) -> None:
17571757
if (
17581758
self.pool.config.run_mode('simulation')
17591759
and sim_time_check(
1760-
self.message_queue, self.pool.get_tasks())
1760+
self.task_events_mgr,
1761+
self.pool.get_tasks(),
1762+
self.workflow_db_mgr,
1763+
)
17611764
):
17621765
# A simulated task state change occurred.
17631766
self.reset_inactivity_timer()

cylc/flow/simulation.py

+147-48
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,118 @@
1616
"""Utilities supporting simulation and skip modes
1717
"""
1818

19+
from dataclasses import dataclass
1920
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
2021
from time import time
2122

23+
from cylc.flow import LOG
2224
from cylc.flow.cycling.loader import get_point
23-
from cylc.flow.network.resolvers import TaskMsg
25+
from cylc.flow.exceptions import PointParsingError
2426
from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM
2527
from cylc.flow.task_state import (
2628
TASK_STATUS_RUNNING,
2729
TASK_STATUS_FAILED,
2830
TASK_STATUS_SUCCEEDED,
2931
)
30-
from cylc.flow.wallclock import get_current_time_string
32+
from cylc.flow.wallclock import get_unix_time_from_time_string
3133

3234
from metomi.isodatetime.parsers import DurationParser
3335

3436
if TYPE_CHECKING:
35-
from queue import Queue
36-
from cylc.flow.cycling import PointBase
37+
from cylc.flow.task_events_mgr import TaskEventsManager
3738
from cylc.flow.task_proxy import TaskProxy
39+
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
40+
from cylc.flow.cycling import PointBase
41+
42+
43+
@dataclass
44+
class ModeSettings:
45+
"""A store of state for simulation modes.
46+
47+
Used instead of modifying the runtime config.
48+
49+
Args:
50+
itask:
51+
The task proxy this submission relates to.
52+
broadcast_mgr:
53+
The broadcast manager is used to apply any runtime alterations
54+
pre simulated submission.
55+
db_mgr:
56+
The database manager must be provided for simulated jobs
57+
that are being resumed after workflow restart. It is used to
58+
extract the original scheduled finish time for the job.
59+
60+
Attrs:
61+
simulated_run_length:
62+
The length of time this simulated job will take to run in seconds.
63+
timeout:
64+
The wall-clock time at which this simulated job will finish as
65+
a Unix epoch time.
66+
sim_task_fails:
67+
True, if this job is intended to fail when it finishes, else False.
68+
69+
"""
70+
simulated_run_length: float = 0.0
71+
sim_task_fails: bool = False
72+
timeout: float = 0.0
73+
74+
def __init__(
75+
self,
76+
itask: 'TaskProxy',
77+
db_mgr: 'WorkflowDatabaseManager',
78+
rtconfig: Dict[str, Any]
79+
):
80+
81+
# itask.summary['started_time'] and mode_settings.timeout need
82+
# repopulating from the DB on workflow restart:
83+
started_time = itask.summary['started_time']
84+
try_num = None
85+
if started_time is None:
86+
# Get DB info
87+
db_info = db_mgr.pri_dao.select_task_job(
88+
itask.tokens['cycle'],
89+
itask.tokens['task'],
90+
itask.tokens['job'],
91+
)
92+
93+
# Get the started time:
94+
if db_info['time_submit']:
95+
started_time = get_unix_time_from_time_string(
96+
db_info["time_submit"])
97+
itask.summary['started_time'] = started_time
98+
else:
99+
started_time = time()
100+
101+
# Get the try number:
102+
try_num = db_info["try_num"]
103+
104+
# Parse fail cycle points:
105+
if rtconfig != itask.tdef.rtconfig:
106+
try:
107+
rtconfig["simulation"][
108+
"fail cycle points"
109+
] = parse_fail_cycle_points(
110+
rtconfig["simulation"]["fail cycle points"]
111+
)
112+
except PointParsingError as exc:
113+
# Broadcast Fail CP didn't parse
114+
LOG.warning(
115+
'Broadcast fail cycle point was invalid:\n'
116+
f' {exc.args[0]}'
117+
)
118+
rtconfig['simulation'][
119+
'fail cycle points'
120+
] = itask.tdef.rtconfig['simulation']['fail cycle points']
121+
122+
# Calculate simulation info:
123+
self.simulated_run_length = (
124+
get_simulated_run_len(rtconfig))
125+
self.sim_task_fails = sim_task_failed(
126+
rtconfig['simulation'],
127+
itask.point,
128+
try_num or itask.get_try_num()
129+
)
130+
self.timeout = started_time + self.simulated_run_length
38131

39132

40133
def configure_sim_modes(taskdefs, sim_mode):
@@ -46,23 +139,17 @@ def configure_sim_modes(taskdefs, sim_mode):
46139
for tdef in taskdefs:
47140
# Compute simulated run time by scaling the execution limit.
48141
rtc = tdef.rtconfig
49-
sleep_sec = get_simulated_run_len(rtc)
50142

51-
rtc['execution time limit'] = (
52-
sleep_sec + DurationParser().parse(str(
53-
rtc['simulation']['time limit buffer'])).get_seconds()
54-
)
55-
56-
rtc['simulation']['simulated run length'] = sleep_sec
57143
rtc['submission retry delays'] = [1]
58144

59-
# Generate dummy scripting.
60-
rtc['init-script'] = ""
61-
rtc['env-script'] = ""
62-
rtc['pre-script'] = ""
63-
rtc['post-script'] = ""
64-
rtc['script'] = build_dummy_script(
65-
rtc, sleep_sec) if dummy_mode else ""
145+
if dummy_mode:
146+
# Generate dummy scripting.
147+
rtc['init-script'] = ""
148+
rtc['env-script'] = ""
149+
rtc['pre-script'] = ""
150+
rtc['post-script'] = ""
151+
rtc['script'] = build_dummy_script(
152+
rtc, get_simulated_run_len(rtc))
66153

67154
disable_platforms(rtc)
68155

@@ -77,12 +164,13 @@ def configure_sim_modes(taskdefs, sim_mode):
77164

78165

79166
def get_simulated_run_len(rtc: Dict[str, Any]) -> int:
80-
"""Get simulated run time.
167+
"""Calculate simulation run time from a task's config.
81168
82169
rtc = run time config
83170
"""
84171
limit = rtc['execution time limit']
85172
speedup = rtc['simulation']['speedup factor']
173+
86174
if limit and speedup:
87175
sleep_sec = (DurationParser().parse(
88176
str(limit)).get_seconds() / speedup)
@@ -145,19 +233,26 @@ def parse_fail_cycle_points(
145233
True
146234
>>> this([])
147235
[]
236+
>>> this(None) is None
237+
True
148238
"""
149-
f_pts: 'Optional[List[PointBase]]'
150-
if 'all' in f_pts_orig:
239+
f_pts: 'Optional[List[PointBase]]' = []
240+
if (
241+
f_pts_orig is None
242+
or f_pts_orig and 'all' in f_pts_orig
243+
):
151244
f_pts = None
152-
else:
245+
elif f_pts_orig:
153246
f_pts = []
154247
for point_str in f_pts_orig:
155248
f_pts.append(get_point(point_str).standardise())
156249
return f_pts
157250

158251

159252
def sim_time_check(
160-
message_queue: 'Queue[TaskMsg]', itasks: 'List[TaskProxy]'
253+
task_events_manager: 'TaskEventsManager',
254+
itasks: 'List[TaskProxy]',
255+
db_mgr: 'WorkflowDatabaseManager',
161256
) -> bool:
162257
"""Check if sim tasks have been "running" for as long as required.
163258
@@ -166,38 +261,42 @@ def sim_time_check(
166261
Returns:
167262
True if _any_ simulated task state has changed.
168263
"""
169-
sim_task_state_changed = False
170264
now = time()
265+
sim_task_state_changed: bool = False
171266
for itask in itasks:
172267
if itask.state.status != TASK_STATUS_RUNNING:
173268
continue
174-
# Started time is not set on restart
175-
if itask.summary['started_time'] is None:
176-
itask.summary['started_time'] = now
177-
timeout = (
178-
itask.summary['started_time'] +
179-
itask.tdef.rtconfig['simulation']['simulated run length']
180-
)
181-
if now > timeout:
182-
job_d = itask.tokens.duplicate(job=str(itask.submit_num))
183-
now_str = get_current_time_string()
184-
if sim_task_failed(
185-
itask.tdef.rtconfig['simulation'],
186-
itask.point,
187-
itask.get_try_num()
188-
):
189-
message_queue.put(
190-
TaskMsg(job_d, now_str, 'CRITICAL', TASK_STATUS_FAILED)
269+
270+
# This occurs if the workflow has been restarted.
271+
if itask.mode_settings is None:
272+
rtconfig = task_events_manager.broadcast_mgr.get_updated_rtconfig(
273+
itask)
274+
itask.mode_settings = ModeSettings(
275+
itask,
276+
db_mgr,
277+
rtconfig
278+
)
279+
280+
if now > itask.mode_settings.timeout:
281+
if itask.mode_settings.sim_task_fails:
282+
task_events_manager.process_message(
283+
itask, 'CRITICAL', TASK_STATUS_FAILED,
284+
flag=task_events_manager.FLAG_RECEIVED
191285
)
192286
else:
193-
# Simulate message outputs.
194-
for msg in itask.tdef.rtconfig['outputs'].values():
195-
message_queue.put(
196-
TaskMsg(job_d, now_str, 'DEBUG', msg)
197-
)
198-
message_queue.put(
199-
TaskMsg(job_d, now_str, 'DEBUG', TASK_STATUS_SUCCEEDED)
287+
task_events_manager.process_message(
288+
itask, 'DEBUG', TASK_STATUS_SUCCEEDED,
289+
flag=task_events_manager.FLAG_RECEIVED
200290
)
291+
# Simulate message outputs.
292+
for msg in itask.tdef.rtconfig['outputs'].values():
293+
task_events_manager.process_message(
294+
itask, 'DEBUG', msg,
295+
flag=task_events_manager.FLAG_RECEIVED
296+
)
297+
298+
# We've finished this pseudo job, so delete all the mode settings.
299+
itask.mode_settings = None
201300
sim_task_state_changed = True
202301
return sim_task_state_changed
203302

0 commit comments

Comments
 (0)