Skip to content

Commit 09d4a36

Browse files
committed
Allow broadcasts to modify sim mode tasks.
1 parent 8606f93 commit 09d4a36

File tree

7 files changed

+179
-27
lines changed

7 files changed

+179
-27
lines changed

changes.d/5721.feat.md

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Allow users to broadcast run_mode to tasks.

cylc/flow/scheduler.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -1740,11 +1740,13 @@ async def main_loop(self) -> None:
17401740

17411741
self.pool.set_expired_tasks()
17421742
self.release_queued_tasks()
1743-
17441743
if (
17451744
self.pool.config.run_mode('simulation')
17461745
and sim_time_check(
1747-
self.message_queue, self.pool.get_tasks())
1746+
self.message_queue,
1747+
self.pool.get_tasks(),
1748+
self.broadcast_mgr
1749+
)
17481750
):
17491751
# A simulated task state change occurred.
17501752
self.reset_inactivity_timer()

cylc/flow/simulation.py

+110-23
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@
3737
from cylc.flow.task_proxy import TaskProxy
3838

3939

40+
# Exotic: Recursive Type hint.
41+
NestedDict = Dict[str, Union['NestedDict', Any]]
42+
43+
4044
def configure_sim_modes(taskdefs, sim_mode):
4145
"""Adjust task defs for simulation and dummy mode.
4246
@@ -45,51 +49,71 @@ def configure_sim_modes(taskdefs, sim_mode):
4549

4650
for tdef in taskdefs:
4751
# Compute simulated run time by scaling the execution limit.
48-
rtc = tdef.rtconfig
49-
sleep_sec = get_simulated_run_len(rtc)
52+
configure_sim_modes_rtc(tdef.rtconfig, dummy_mode)
5053

51-
rtc['execution time limit'] = (
52-
sleep_sec + DurationParser().parse(str(
53-
rtc['simulation']['time limit buffer'])).get_seconds()
54-
)
5554

56-
rtc['simulation']['simulated run length'] = sleep_sec
57-
rtc['submission retry delays'] = [1]
55+
def configure_sim_modes_rtc(rtc, dummy_mode):
56+
sleep_sec = get_simulated_run_len(rtc)
57+
58+
rtc['execution time limit'] = (
59+
sleep_sec + DurationParser().parse(str(
60+
rtc['simulation']['time limit buffer'])).get_seconds()
61+
)
62+
63+
rtc['simulation']['simulated run length'] = sleep_sec
64+
rtc['submission retry delays'] = [1]
5865

66+
if dummy_mode:
5967
# Generate dummy scripting.
6068
rtc['init-script'] = ""
6169
rtc['env-script'] = ""
6270
rtc['pre-script'] = ""
6371
rtc['post-script'] = ""
6472
rtc['script'] = build_dummy_script(
6573
rtc, sleep_sec) if dummy_mode else ""
74+
else:
75+
rtc['script'] = ""
76+
77+
disable_platforms(rtc)
6678

67-
disable_platforms(rtc)
79+
rtc['platform'] = 'localhost'
6880

69-
# Disable environment, in case it depends on env-script.
70-
rtc['environment'] = {}
81+
# Disable environment, in case it depends on env-script.
82+
rtc['environment'] = {}
7183

72-
rtc["simulation"][
73-
"fail cycle points"
74-
] = parse_fail_cycle_points(
75-
rtc["simulation"]["fail cycle points"]
76-
)
84+
rtc["simulation"][
85+
"fail cycle points"
86+
] = parse_fail_cycle_points(
87+
rtc["simulation"]["fail cycle points"]
88+
)
7789

7890

7991
def get_simulated_run_len(rtc: Dict[str, Any]) -> int:
8092
"""Get simulated run time.
8193
82-
rtc = run time config
94+
Args:
95+
rtc: run time config
96+
97+
Returns:
98+
Number of seconds to sleep for in sim mode.
8399
"""
100+
# Simulated run length acts as a flag that this is at runtime:
101+
# If durations have already been parsed, trying to parse them
102+
# again will result in failures.
103+
recalc = bool(rtc['simulation'].get('simulated run length', ''))
84104
limit = rtc['execution time limit']
85105
speedup = rtc['simulation']['speedup factor']
86-
if limit and speedup:
106+
107+
if limit and speedup and recalc:
108+
sleep_sec = limit / speedup
109+
elif limit and speedup:
87110
sleep_sec = (DurationParser().parse(
88111
str(limit)).get_seconds() / speedup)
112+
elif recalc:
113+
sleep_sec = rtc['simulation']['default run length']
89114
else:
90-
sleep_sec = DurationParser().parse(
91-
str(rtc['simulation']['default run length'])
92-
).get_seconds()
115+
default_run_len = str(rtc['simulation']['default run length'])
116+
sleep_sec = DurationParser().parse(default_run_len).get_seconds()
93117

94118
return sleep_sec
95119

@@ -147,7 +171,7 @@ def parse_fail_cycle_points(
147171
[]
148172
"""
149173
f_pts: 'Optional[List[PointBase]]'
150-
if 'all' in f_pts_orig:
174+
if f_pts_orig is None or 'all' in f_pts_orig:
151175
f_pts = None
152176
else:
153177
f_pts = []
@@ -156,8 +180,64 @@ def parse_fail_cycle_points(
156180
return f_pts
157181

158182

183+
def unpack_dict(dict_: NestedDict, parent_key: str = '') -> Dict[str, Any]:
184+
"""Unpack a nested dict into a single layer.
185+
186+
Examples:
187+
>>> unpack_dict({'foo': 1, 'bar': {'baz': 2, 'qux':3}})
188+
{'foo': 1, 'bar.baz': 2, 'bar.qux': 3}
189+
>>> unpack_dict({'foo': {'example': 42}, 'bar': {"1":2, "3":4}})
190+
{'foo.example': 42, 'bar.1': 2, 'bar.3': 4}
191+
192+
"""
193+
output = {}
194+
for key, value in dict_.items():
195+
new_key = parent_key + '.' + key if parent_key else key
196+
if isinstance(value, dict):
197+
output.update(unpack_dict(value, new_key))
198+
else:
199+
output[new_key] = value
200+
201+
return output
202+
203+
204+
def nested_dict_path_update(
205+
dict_: NestedDict, path: List[Any], value: Any
206+
) -> NestedDict:
207+
"""Set a value in a nested dict.
208+
209+
Examples:
210+
>>> nested_dict_path_update({'foo': {'bar': 1}}, ['foo', 'bar'], 42)
211+
{'foo': {'bar': 42}}
212+
"""
213+
this = dict_
214+
for i in range(len(path)):
215+
if isinstance(this[path[i]], dict):
216+
this = this[path[i]]
217+
else:
218+
this[path[i]] = value
219+
return dict_
220+
221+
222+
def update_nested_dict(rtc: NestedDict, dict_: NestedDict) -> None:
223+
"""Update one config nested dictionary with the contents of another.
224+
225+
Examples:
226+
>>> x = {'foo': {'bar': 12}, 'qux': 77}
227+
>>> y = {'foo': {'bar': 42}}
228+
>>> update_nested_dict(x, y)
229+
>>> print(x)
230+
{'foo': {'bar': 42}, 'qux': 77}
231+
"""
232+
for keylist, value in unpack_dict(dict_).items():
233+
keys = keylist.split('.')
234+
rtc = nested_dict_path_update(rtc, keys, value)
235+
236+
159237
def sim_time_check(
160-
message_queue: 'Queue[TaskMsg]', itasks: 'List[TaskProxy]'
238+
message_queue: 'Queue[TaskMsg]',
239+
itasks: 'List[TaskProxy]',
240+
broadcast_mgr: Optional[Any] = None
161241
) -> bool:
162242
"""Check if sim tasks have been "running" for as long as required.
163243
@@ -166,9 +246,16 @@ def sim_time_check(
166246
Returns:
167247
True if _any_ simulated task state has changed.
168248
"""
249+
169250
sim_task_state_changed = False
170251
now = time()
171252
for itask in itasks:
253+
if broadcast_mgr:
254+
broadcast = broadcast_mgr.get_broadcast(itask.tokens)
255+
if broadcast:
256+
update_nested_dict(
257+
itask.tdef.rtconfig, broadcast)
258+
configure_sim_modes_rtc(itask.tdef.rtconfig, False)
172259
if itask.state.status != TASK_STATUS_RUNNING:
173260
continue
174261
# Started time is not set on restart
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#!/usr/bin/env bash
2+
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
3+
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
4+
#
5+
# This program is free software: you can redistribute it and/or modify
6+
# it under the terms of the GNU General Public License as published by
7+
# the Free Software Foundation, either version 3 of the License, or
8+
# (at your option) any later version.
9+
#
10+
# This program is distributed in the hope that it will be useful,
11+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
# GNU General Public License for more details.
14+
#
15+
# You should have received a copy of the GNU General Public License
16+
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17+
18+
# Test that we can broadcast an alteration to simulation mode.
19+
20+
. "$(dirname "$0")/test_header"
21+
set_test_number 3
22+
23+
install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
24+
run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
25+
workflow_run_ok "${TEST_NAME_BASE}-run" \
26+
cylc play "${WORKFLOW_NAME}" --mode=simulation
27+
SCHD_LOG="${WORKFLOW_RUN_DIR}/log/scheduler/log"
28+
29+
# If we speed up the simulated task we
30+
# can make it finish before workflow timeout:
31+
cylc broadcast "${WORKFLOW_NAME}" -s '[simulation]speedup factor = 600'
32+
33+
# Wait for the workflow to finish (it wasn't run in no-detach mode):
34+
poll_grep "INFO - DONE" "${SCHD_LOG}"
35+
36+
# If we hadn't changed the speedup factor using broadcast
37+
# The workflow timeout would have been hit:
38+
grep_fail "WARNING - Orphaned tasks" "${SCHD_LOG}"
39+
40+
purge
41+
exit
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[scheduler]
2+
[[events]]
3+
workflow timeout = PT30S
4+
5+
[scheduling]
6+
initial cycle point = 2359
7+
[[graph]]
8+
R1 = get_observations
9+
10+
[runtime]
11+
[[get_observations]]
12+
execution retry delays = PT10M
13+
[[[simulation]]]
14+
speedup factor = 1
15+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
23590101T0000Z/get_observations -triggered off [] in flow 1
2+
23590101T0000Z/get_observations -triggered off [] in flow 1

tests/unit/test_simulation.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
param(None, 10, 'PT1H', id='speedup-factor-alone'),
3737
param('PT1H', None, 'PT1H', id='execution-time-limit-alone'),
3838
param('P1D', 24, 'PT1M', id='speed-up-and-execution-tl'),
39+
param(60 * 60 * 24, 24, 'PT1M', id='recalculation'),
40+
param(1, None, 3600, id='recalculation'),
3941
)
4042
)
4143
def test_get_simulated_run_len(
@@ -49,9 +51,11 @@ def test_get_simulated_run_len(
4951
'execution time limit': execution_time_limit,
5052
'simulation': {
5153
'speedup factor': speedup_factor,
52-
'default run length': default_run_length
53-
}
54+
'default run length': default_run_length,
55+
},
5456
}
57+
if isinstance(execution_time_limit, int):
58+
rtc['simulation']['simulated run length'] = 30
5559
assert get_simulated_run_len(rtc) == 3600
5660

5761

0 commit comments

Comments
 (0)