Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Sim mode at runtime #5721

Merged
merged 52 commits into from
Mar 15, 2024
Merged
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
7c4c1b9
wip on simulation broadcastable
wxtim Nov 16, 2023
c5f3a25
Allow Broadcasting modification of sim mode tasks
wxtim Nov 17, 2023
6f5320f
return to get_simulation. Cache call to time()
wxtim Jan 18, 2024
00c29a5
Cached timeout time for sim tasks on the sim_modes object.
wxtim Jan 19, 2024
f6b13b1
de-flake tests
wxtim Jan 22, 2024
ff6037c
Ensure that mode_settings are deleted from the task proxy when
wxtim Jan 29, 2024
0cd47a9
rationalize tests
wxtim Jan 30, 2024
c356bdf
r2r
wxtim Jan 31, 2024
d6e5aef
Save correct datetime format to the DB.
wxtim Feb 1, 2024
695e5a8
Response to review
wxtim Feb 6, 2024
a6f7606
small test fix
wxtim Feb 6, 2024
69b03c9
Update cylc/flow/task_job_mgr.py
wxtim Feb 8, 2024
2665d42
Apply suggestions from code review
wxtim Feb 9, 2024
89e5041
response to review
wxtim Feb 9, 2024
b097d33
Pass `task_events_manager` wholesale to sim_time_check.
wxtim Feb 12, 2024
7217c00
Rationalize existing tests and add a test for a bug discovered by Ronnie
wxtim Feb 13, 2024
970fa1d
Update tests/functional/modes/05-sim-trigger.t
wxtim Feb 13, 2024
312cf28
add a warning that fail try 1 only does not change on resubmit, only …
wxtim Feb 13, 2024
e2392af
Update cylc/flow/simulation.py
wxtim Feb 13, 2024
08f4345
Update cylc/flow/cfgspec/workflow.py
wxtim Feb 20, 2024
62d5897
fix flake8 issues
wxtim Feb 20, 2024
3832e7b
refactor test broken by sim-mode change
wxtim Feb 20, 2024
8c3bf61
Update tests/flakyfunctional/cylc-get-config/04-dummy-mode-output.t
wxtim Feb 20, 2024
c8d5aea
Apply suggestions from code review
wxtim Feb 26, 2024
42aa4b2
Fallback created for lack of start time in database.
wxtim Feb 26, 2024
42ad8cc
Ensure that broadcasts to fail cycle points triggers a re-parse
wxtim Feb 27, 2024
38db5dc
Use itask summary started time as sole arbiter of simulation
wxtim Feb 27, 2024
95d3dc4
ensure that broadcast checks and rejects unparsable fail cycle points
wxtim Feb 27, 2024
1058023
Fix a broken test, and some linting and typing issues
wxtim Feb 28, 2024
96db826
Update cylc/flow/network/resolvers.py
wxtim Feb 28, 2024
0b41424
Apply suggestions from code review
wxtim Mar 4, 2024
d93de69
Update changes.d/5721.feat.md
wxtim Mar 4, 2024
6268809
Update cylc/flow/simulation.py
wxtim Mar 4, 2024
5447530
Update changes.d/5721.feat.md
wxtim Mar 4, 2024
ab53227
fix broken test
wxtim Mar 4, 2024
d2b372e
make sim mode messages _look_ external
wxtim Mar 5, 2024
ceb3063
removed bc manager changes - one test is failing...
wxtim Mar 5, 2024
89db69c
Prevent totally invalid fail cycle points being accepted for simulation.
wxtim Mar 5, 2024
cd2fe55
Update cylc/flow/simulation.py
wxtim Mar 6, 2024
6773246
ensure than changing `fail try 1 only` doesn't cause failure
wxtim Mar 6, 2024
d147b0f
Prevent repeated use of sim_task_failed giving different answers (bas…
wxtim Mar 6, 2024
269078a
fix
wxtim Mar 6, 2024
b4f4bb1
Broadcast changes to simulated tasks in task_job_manager
wxtim Mar 11, 2024
370d2b6
test that clearing broadcasts works for sim tasks
wxtim Mar 11, 2024
b9e1d15
Update cylc/flow/simulation.py
wxtim Mar 11, 2024
b355ae2
Update cylc/flow/simulation.py
wxtim Mar 11, 2024
44abd5a
Update tests/integration/test_simulation.py
wxtim Mar 11, 2024
cb064a0
fix test
wxtim Mar 11, 2024
ef2fc58
Merge branch 'master' into feature.sim_mode_at_runtime
hjoliver Mar 14, 2024
6be7372
Update changes.d/5721.feat.md
wxtim Mar 14, 2024
389a0be
Update cylc/flow/simulation.py
wxtim Mar 14, 2024
9186ad1
Fix func test, after update on master.
hjoliver Mar 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Response to review
- Re-order the ModeSettings.__init__ method to allow
  db loading before setting the start time.
- Add `try_num` to database.
- Fix/streamline the tests - add explicit broadcast test.
  • Loading branch information
wxtim committed Feb 28, 2024
commit 695e5a84953a31683fd6c4389f6f9dfdc9df3aa8
46 changes: 27 additions & 19 deletions cylc/flow/simulation.py
Original file line number Diff line number Diff line change
@@ -53,36 +53,50 @@ class ModeSettings:
"""
simulated_run_length: float = 0.0
sim_task_fails: bool = False
timeout: float = 0.0

def __init__(
self,
itask: 'TaskProxy',
broadcast_mgr: 'BroadcastMgr',
db_mgr: 'Optional[WorkflowDatabaseManager]' = None
):

# itask.summary['started_time'] and mode_settings.timeout need
# repopulating from the DB on workflow restart:
started_time = itask.summary['started_time']
try_num = None
if started_time is None and db_mgr:
# Get DB info
db_info = db_mgr.pub_dao.select_task_job(
*itask.tokens.relative_id.split("/"))

# Get the started time:
started_time = get_unix_time_from_time_string(
db_info["time_submit"])
itask.summary['started_time'] = started_time

# Get the try number:
try_num = db_info["try_num"]

# Update anything changed by broadcast:
overrides = broadcast_mgr.get_broadcast(itask.tokens)
if overrides:
rtconfig = pdeepcopy(itask.tdef.rtconfig)
poverride(rtconfig, overrides, prepend=True)
else:
rtconfig = itask.tdef.rtconfig

# Calculate simulation info:
self.simulated_run_length = (
get_simulated_run_len(rtconfig))
self.sim_task_fails = sim_task_failed(
rtconfig['simulation'],
itask.point,
itask.get_try_num()
try_num or itask.get_try_num()
)

# itask.summary['started_time'] and mode_settings.timeout need
# repopulating from the DB on workflow restart:
started_time = itask.summary['started_time']
if started_time is None and db_mgr:
started_time_str = db_mgr.pub_dao.select_task_job(
*itask.tokens.relative_id.split("/"))["time_submit"]
started_time = get_unix_time_from_time_string(
started_time_str)
itask.summary['started_time'] = started_time
from cylc.flow import LOG
LOG.critical(try_num or itask.get_try_num())
self.timeout = started_time + self.simulated_run_length


@@ -217,12 +231,7 @@ def sim_time_check(
sim_task_state_changed: bool = False

for itask in itasks:
if (
itask.state.status != TASK_STATUS_RUNNING
or itask.state.is_queued
or itask.state.is_held
or itask.state.is_runahead
):
if itask.state.status != TASK_STATUS_RUNNING:
continue

if itask.mode_settings is None:
@@ -250,7 +259,6 @@ def sim_time_check(
# We've finished this psuedojob, so delete all the mode settings.
itask.mode_settings = None
sim_task_state_changed = True
itask.mode_settings = None
return sim_task_state_changed


@@ -267,5 +275,5 @@ def sim_task_failed(
sim_conf['fail cycle points'] is None # i.e. "all"
or point in sim_conf['fail cycle points']
) and (
try_num == 0 or not sim_conf['fail try 1 only']
try_num == 1 or not sim_conf['fail try 1 only']
)
10 changes: 7 additions & 3 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
@@ -1002,11 +1002,11 @@ def _simulation_submit_task_jobs(self, itasks, workflow):
now_str = get_time_string_from_unix_time(now)
for itask in itasks:
itask.summary['started_time'] = now
self._set_retry_timers(itask, itask.tdef.rtconfig)
itask.mode_settings = ModeSettings(
itask, self.task_events_mgr.broadcast_mgr)
itask.waiting_on_job_prep = False
itask.submit_num += 1
self._set_retry_timers(itask)

itask.platform = {'name': 'SIMULATION'}
itask.summary['job_runner_name'] = 'SIMULATION'
@@ -1020,8 +1020,12 @@ def _simulation_submit_task_jobs(self, itasks, workflow):
itask, INFO, TASK_OUTPUT_SUBMITTED,
)
self.workflow_db_mgr.put_insert_task_jobs(
itask, {'time_submit': now_str})

itask, {
'time_submit': now_str,
'try_num': itask.get_try_num(),
}
)
self.workflow_db_mgr.process_queued_ops()
return itasks

def _submit_task_jobs_callback(self, ctx, workflow, itasks):
118 changes: 67 additions & 51 deletions tests/integration/test_simulation.py
Original file line number Diff line number Diff line change
@@ -184,7 +184,7 @@ def test_fail_once(sim_time_check_setup, itask, point, results, monkeypatch):
ISO8601Point(point), itask)

for i, result in enumerate(results):
itask.try_timers['execution-retry'].num = i - 1
itask.try_timers['execution-retry'].num = i
schd.task_job_mgr._simulation_submit_task_jobs(
[itask], schd.workflow)
assert itask.mode_settings.sim_task_fails is result
@@ -256,7 +256,7 @@ def test_task_sped_up(sim_time_check_setup, monkeytime):
) is True


async def test_simulation_mode_settings_restart(
async def test_settings_restart(
monkeytime, flow, scheduler, start
):
"""Check that simulation mode settings are correctly restored
@@ -286,22 +286,19 @@ async def test_simulation_mode_settings_restart(
}
})
schd = scheduler(id_)
msg_q = Queue()

# Start the workflow:
async with start(schd):
# Pick the task proxy, Mock its start time, set state to running:
itask = schd.pool.get_tasks()[0]
itask.summary['started_time'] = 0
itask.state.status = 'running'

# Submit it, then mock the wallclock and assert that it's not finshed.
schd.task_job_mgr._simulation_submit_task_jobs(
[itask], schd.workflow)
monkeytime(0)

og_timeout = itask.mode_settings.timeout

# Mock wallclock < sim end timeout
monkeytime(itask.mode_settings.timeout - 1)
assert sim_time_check(
msg_q, [itask], schd.task_events_mgr.broadcast_mgr,
schd.message_queue, [itask], schd.task_events_mgr.broadcast_mgr,
schd.workflow_db_mgr
) is False

@@ -310,60 +307,27 @@ async def test_simulation_mode_settings_restart(
async with start(schd):
# Get our tasks and fix wallclock:
itask = schd.pool.get_tasks()[0]
monkeytime(12)
itask.state.status = 'running'

# Check that we haven't got started time & mode settings back:
assert itask.summary['started_time'] is None
assert itask.mode_settings is None

# Set the start time in the database to 0 to make the
# test simpler:
schd.workflow_db_mgr.put_insert_task_jobs(
itask, {'time_submit': '1970-01-01T00:00:00Z'})
schd.workflow_db_mgr.process_queued_ops()

# Set the current time:
monkeytime(12)
monkeytime(og_timeout - 1)
assert sim_time_check(
msg_q, [itask], schd.task_events_mgr.broadcast_mgr,
schd.message_queue, [itask], schd.task_events_mgr.broadcast_mgr,
schd.workflow_db_mgr
) is False

# Check that the itask.mode_settings is now re-created
assert itask.mode_settings.__dict__ == {
'simulated_run_length': 60.0,
'sim_task_fails': False,
'timeout': 60.0
}

# Set the current time > timeout
monkeytime(61)
assert sim_time_check(
msg_q, [itask], schd.task_events_mgr.broadcast_mgr,
schd.workflow_db_mgr
) is True

assert itask.mode_settings is None

schd.task_events_mgr.broadcast_mgr.put_broadcast(
['1066'], ['one'], [{
'execution time limit': 'PT1S'}])

assert itask.mode_settings is None

schd.task_job_mgr._simulation_submit_task_jobs(
[itask], schd.workflow)

assert itask.submit_num == 2
assert itask.mode_settings.__dict__ == {
'simulated_run_length': 1.0,
'sim_task_fails': False,
'timeout': 62.0
'sim_task_fails': True,
'timeout': float(int(og_timeout))
}


async def test_simulation_mode_settings_reload(
async def test_settings_reload(
flow, scheduler, start, run_simjob
):
"""Check that simulation mode settings are changed for future
@@ -374,9 +338,7 @@ async def test_simulation_mode_settings_reload(
'scheduler': {'cycle point format': '%Y'},
'scheduling': {
'initial cycle point': '1066',
'graph': {
'R1': 'one'
}
'graph': {'R1': 'one'}
},
'runtime': {
'one': {
@@ -405,3 +367,57 @@ async def test_simulation_mode_settings_reload(

# Submit second psuedo-job and "run" to success:
assert run_simjob(schd) == 'succeeded'


async def test_settings_broadcast(
flow, scheduler, start, complete, monkeytime
):
"""Assert that broadcasting a change in the settings for a task
affects subsequent psuedo-submissions.
"""
id_ = flow({
'scheduler': {'cycle point format': '%Y'},
'scheduling': {
'initial cycle point': '1066',
'graph': {'R1': 'one'}
},
'runtime': {
'one': {
'execution time limit': 'PT1S',
'simulation': {
'speedup factor': 1,
'fail cycle points': '1066',
}
},
}
}, defaults=False)
schd = scheduler(id_, paused_start=False, run_mode='simulation')
async with start(schd):
itask = schd.pool.get_tasks()[0]
itask.state.is_queued = False

# Submit the first - the sim task will fail:
schd.task_job_mgr._simulation_submit_task_jobs(
[itask], schd.workflow)
assert itask.mode_settings.sim_task_fails is True

# Let task finish.
monkeytime(itask.mode_settings.timeout + 1)
assert sim_time_check(
schd.message_queue, [itask], schd.task_events_mgr.broadcast_mgr,
schd.workflow_db_mgr
) is True

# The mode_settings object has been cleared:
assert itask.mode_settings is None

# Change a setting using broadcast:
schd.task_events_mgr.broadcast_mgr.put_broadcast(
['1066'], ['one'], [{
'simulation': {'fail cycle points': ''}
}])

# Submit again - result is different:
schd.task_job_mgr._simulation_submit_task_jobs(
[itask], schd.workflow)
assert itask.mode_settings.sim_task_fails is False