Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group trigger #6395

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changes.d/6395.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
When triggering a group of tasks, off-group prerequisites will be satisfied
automatically and in-group prerequisites will be left to be satisfied by the
flow. This allows retriggering a sub-graph without having to separately
identify and handle initial tasks and off-flow prerequisites.
17 changes: 15 additions & 2 deletions cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,10 @@ async def remove_tasks(
validate.is_tasks(tasks)
validate.flow_opts(flow, flow_wait=False, allow_new_or_none=False)
yield
flow_nums = get_flow_nums_set(flow)
schd.remove_tasks(tasks, flow_nums)
schd.remove_tasks(
tasks,
get_flow_nums_set(flow)
)


@_command('reload_workflow')
Expand Down Expand Up @@ -457,6 +459,17 @@ async def force_trigger_tasks(
"at Cylc 8.5."
)
yield

if not flow:
# default case: erase flow history before trigger
schd.remove_tasks(
tasks,
get_flow_nums_set(flow),
default_remove=True
)
# make sure the removal gets stored before triggering
schd.workflow_db_mgr.process_queued_ops()

yield schd.pool.force_trigger_tasks(
tasks, flow, flow_wait, flow_descr, on_resume
)
2 changes: 1 addition & 1 deletion cylc/flow/flow_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def get_flow_num(

if flow_num in self.flows:
if meta is not None:
LOG.warning(
LOG.debug(
f'Ignoring flow metadata "{meta}":'
f' {flow_num} is not a new flow'
)
Expand Down
26 changes: 18 additions & 8 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1105,23 +1105,30 @@ def kill_tasks(
return len(unkillable)

def remove_tasks(
self, items: Iterable[str], flow_nums: Optional['FlowNums'] = None
self,
items: Iterable[str],
flow_nums: 'FlowNums',
default_remove: bool = False,
) -> None:
"""Remove tasks (`cylc remove` command).

Args:
items: Relative IDs or globs.
flow_nums: Flows to remove the tasks from. If empty or None, it
means 'all'.
default_remove: if True, called via `cylc trigger` command to
auto-erase flow history of past tasks before triggering
"""
active, inactive, _unmatched = self.pool.filter_task_proxies(
items, warn_no_active=False, inactive=True
)
if default_remove:
# auto-remove via trigger only applies to past tasks
active = []

if not (active or inactive):
return

if flow_nums is None:
flow_nums = set()
# Mapping of *relative* task IDs to removed flow numbers:
removed: Dict[Tokens, FlowNums] = {}
not_removed: Set[Tokens] = set()
Expand Down Expand Up @@ -1546,11 +1553,14 @@ def start_job_submission(self, itasks: 'Iterable[TaskProxy]') -> bool:

for itask in submitted:
flow = stringify_flow_nums(itask.flow_nums) or FLOW_NONE
LOG.log(
log_lvl,
f"{itask.identity} -triggered off "
f"{itask.state.get_resolved_dependencies()} in flow {flow}"
)
if itask.is_manual_submit:
off = f"[] in flow {flow}"
else:
off = (
f"{itask.state.get_resolved_dependencies()}"
f" in flow {flow}"
)
LOG.log(log_lvl, f"{itask.identity} -triggered off {off}")

# one or more tasks were passed through the submission pipeline
return True
Expand Down
33 changes: 25 additions & 8 deletions cylc/flow/scripts/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,30 @@

"""cylc trigger [OPTIONS] ARGS

Force task(s) to run regardless of prerequisites, even in a paused workflow.
Force task(s) to run, even in a paused workflow.

Triggering a task that is not yet queued will queue it.
Triggering a task that is not yet queued will queue it; triggering a queued
task will run it - so un-queued tasks may need to be triggered twice.

Triggering a queued task runs it immediately.
Tasks can't be triggered if already active (preparing, submitted, running).

Cylc queues restrict the number of jobs that can be active (submitted or
running) at once. They release tasks to run when their active task count
drops below the queue limit.
Triggering a group of tasks at once:
Dependence on tasks beyond the group ("off-group prerequisites") will be
satisfied automatically; in-group prerequisites will be left to the flow.

Attempts to trigger active (preparing, submitted, running)
tasks will be ignored.
Triggering a sub-graph:
* Group approach: trigger all sub-graph tasks as a group. The forced
satisfaction of off-group prerequisites will automatically avoid a stall.
* Fundamental approach: trigger the initial tasks of the sub-graph to start
the flow, and manually set off-flow prerequisites to prevent a stall.

Triggering past tasks:
If flows are not specified (i.e., no use of --flow) the flow-history of
target tasks will be erased (see also "cylc remove") so that the graph can
be re-traversed without starting a new flow.
Rarely, you may want to retain flow history even if not starting a new flow.
If so, use `--flow=all` avoid the erasure. Example: triggering a task twice
with `--wait` to complete different outputs.

Examples:
# trigger task foo in cycle 1234 in test
Expand All @@ -40,6 +52,11 @@
# start a new flow by triggering 1234/foo in test
$ cylc trigger --flow=new test//1234/foo

Cylc queues:
Queues limit how many tasks can be active (preparing, submitted, running) at
once. Tasks that are ready to run will remained queued until the active task
count drops below the queue limit.

Flows:
Waiting tasks in the active window (n=0) already belong to a flow.
* by default, if triggered, they run in the same flow
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ def submit_livelike_task_jobs(
'platform_name': itask.platform['name'],
'job_runner_name': itask.summary['job_runner_name'],
})

# reset the is_manual_submit flag in case of retries
itask.is_manual_submit = False

if ri_map[install_target] == REMOTE_FILE_INSTALL_255:
Expand Down
Loading
Loading