Skip to content

Commit ee70398

Browse files
committed
Response to review
co-authored-by: Oliver Sanders <[email protected]>
1 parent e8ca0c2 commit ee70398

File tree

3 files changed

+22
-67
lines changed

3 files changed

+22
-67
lines changed

cylc/flow/simulation.py

+15-64
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@
3737
from cylc.flow.task_proxy import TaskProxy
3838

3939

40-
# Exotic: Recursive Type hint.
41-
NestedDict = Dict[str, Union['NestedDict', Any]]
40+
SIMULATION_CONFIGS = ['simulation', 'execution time limit']
4241

4342

4443
def configure_sim_modes(taskdefs, sim_mode):
@@ -55,7 +54,6 @@ def configure_rtc_sim_mode(rtc, dummy_mode):
5554
"""Change a task proxy's runtime config to simulation mode settings.
5655
"""
5756
sleep_sec = get_simulated_run_len(rtc)
58-
5957
rtc['execution time limit'] = (
6058
sleep_sec + DurationParser().parse(str(
6159
rtc['simulation']['time limit buffer'])).get_seconds()
@@ -70,10 +68,9 @@ def configure_rtc_sim_mode(rtc, dummy_mode):
7068
rtc['env-script'] = ""
7169
rtc['pre-script'] = ""
7270
rtc['post-script'] = ""
71+
rtc['err-script'] = ""
7372
rtc['script'] = build_dummy_script(
74-
rtc, sleep_sec) if dummy_mode else ""
75-
else:
76-
rtc['script'] = ""
73+
rtc, sleep_sec)
7774

7875
disable_platforms(rtc)
7976

@@ -111,7 +108,9 @@ def get_simulated_run_len(rtc: Dict[str, Any]) -> int:
111108
else:
112109
sleep_sec = rtc['simulation']['default run length']
113110
else:
114-
if limit and speedup:
111+
if limit and speedup and isinstance(limit, float):
112+
sleep_sec = limit / speedup
113+
elif limit and speedup:
115114
sleep_sec = (DurationParser().parse(
116115
str(limit)).get_seconds() / speedup)
117116
else:
@@ -183,60 +182,6 @@ def parse_fail_cycle_points(
183182
return f_pts
184183

185184

186-
def unpack_dict(dict_: NestedDict, parent_key: str = '') -> Dict[str, Any]:
187-
"""Unpack a nested dict into a single layer.
188-
189-
Examples:
190-
>>> unpack_dict({'foo': 1, 'bar': {'baz': 2, 'qux':3}})
191-
{'foo': 1, 'bar.baz': 2, 'bar.qux': 3}
192-
>>> unpack_dict({'foo': {'example': 42}, 'bar': {"1":2, "3":4}})
193-
{'foo.example': 42, 'bar.1': 2, 'bar.3': 4}
194-
195-
"""
196-
output = {}
197-
for key, value in dict_.items():
198-
new_key = parent_key + '.' + key if parent_key else key
199-
if isinstance(value, dict):
200-
output.update(unpack_dict(value, new_key))
201-
else:
202-
output[new_key] = value
203-
204-
return output
205-
206-
207-
def nested_dict_path_update(
208-
dict_: NestedDict, path: List[Any], value: Any
209-
) -> NestedDict:
210-
"""Set a value in a nested dict.
211-
212-
Examples:
213-
>>> nested_dict_path_update({'foo': {'bar': 1}}, ['foo', 'bar'], 42)
214-
{'foo': {'bar': 42}}
215-
"""
216-
this = dict_
217-
for i in range(len(path)):
218-
if isinstance(this[path[i]], dict):
219-
this = this[path[i]]
220-
else:
221-
this[path[i]] = value
222-
return dict_
223-
224-
225-
def update_nested_dict(rtc: NestedDict, dict_: NestedDict) -> None:
226-
"""Update one config nested dictionary with the contents of another.
227-
228-
Examples:
229-
>>> x = {'foo': {'bar': 12}, 'qux': 77}
230-
>>> y = {'foo': {'bar': 42}}
231-
>>> update_nested_dict(x, y)
232-
>>> print(x)
233-
{'foo': {'bar': 42}, 'qux': 77}
234-
"""
235-
for keylist, value in unpack_dict(dict_).items():
236-
keys = keylist.split('.')
237-
rtc = nested_dict_path_update(rtc, keys, value)
238-
239-
240185
def sim_time_check(
241186
message_queue: 'Queue[TaskMsg]',
242187
itasks: 'List[TaskProxy]',
@@ -250,16 +195,22 @@ def sim_time_check(
250195
251196
Returns:
252197
True if _any_ simulated task state has changed.
253-
"""
254198
199+
"""
255200
sim_task_state_changed = False
256201
now = time()
257202
for itask in itasks:
258203
if broadcast_mgr:
259204
broadcast = broadcast_mgr.get_broadcast(itask.tokens)
260205
if broadcast:
261-
update_nested_dict(
262-
itask.tdef.rtconfig, broadcast)
206+
for config in SIMULATION_CONFIGS:
207+
if (
208+
config in broadcast
209+
and isinstance(broadcast[config], dict)
210+
):
211+
itask.tdef.rtconfig[config].update(broadcast[config])
212+
elif config in broadcast:
213+
itask.tdef.rtconfig[config] = broadcast[config]
263214
configure_rtc_sim_mode(itask.tdef.rtconfig, False)
264215
if itask.state.status != TASK_STATUS_RUNNING:
265216
continue

tests/functional/modes/04-simulation-runtime.t

+5-2
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,11 @@ workflow_run_ok "${TEST_NAME_BASE}-run" \
2727
SCHD_LOG="${WORKFLOW_RUN_DIR}/log/scheduler/log"
2828

2929
# If we speed up the simulated task we
30-
# can make it finish before workflow timeout:
31-
cylc broadcast "${WORKFLOW_NAME}" -s '[simulation]speedup factor = 600'
30+
# can make it finish before workflow timeout
31+
# (neither change will do this on its own):
32+
cylc broadcast "${WORKFLOW_NAME}" \
33+
-s '[simulation]speedup factor = 60' \
34+
-s 'execution time limit = PT60S'
3235

3336
# Wait for the workflow to finish (it wasn't run in no-detach mode):
3437
poll_grep "INFO - DONE" "${SCHD_LOG}"

tests/functional/modes/04-simulation-runtime/flow.cylc

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[scheduler]
22
[[events]]
3-
workflow timeout = PT30S
3+
# workflow timeout = PT30S
44

55
[scheduling]
66
initial cycle point = 2359
@@ -9,6 +9,7 @@
99

1010
[runtime]
1111
[[get_observations]]
12+
execution time limit = P2D
1213
execution retry delays = PT10M
1314
[[[simulation]]]
1415
speedup factor = 1

0 commit comments

Comments
 (0)