diff --git a/changes.d/6395.feat.md b/changes.d/6395.feat.md new file mode 100644 index 00000000000..bac3b984c22 --- /dev/null +++ b/changes.d/6395.feat.md @@ -0,0 +1,4 @@ +When triggering a group of tasks, off-group prerequisites will be satisfied +automatically and in-group prerequisites will be left to be satisfied by the +flow. This allows retriggering a sub-graph without having to separately +identify and handle initial tasks and off-flow prerequisites. diff --git a/cylc/flow/commands.py b/cylc/flow/commands.py index 0f47029f751..05114e6e9ff 100644 --- a/cylc/flow/commands.py +++ b/cylc/flow/commands.py @@ -322,8 +322,10 @@ async def remove_tasks( validate.is_tasks(tasks) validate.flow_opts(flow, flow_wait=False, allow_new_or_none=False) yield - flow_nums = get_flow_nums_set(flow) - schd.remove_tasks(tasks, flow_nums) + schd.remove_tasks( + tasks, + get_flow_nums_set(flow) + ) @_command('reload_workflow') @@ -457,6 +459,17 @@ async def force_trigger_tasks( "at Cylc 8.5." ) yield + + if not flow: + # default case: erase flow history before trigger + schd.remove_tasks( + tasks, + get_flow_nums_set(flow), + default_remove=True + ) + # make sure the removal gets stored before triggering + schd.workflow_db_mgr.process_queued_ops() + yield schd.pool.force_trigger_tasks( tasks, flow, flow_wait, flow_descr, on_resume ) diff --git a/cylc/flow/flow_mgr.py b/cylc/flow/flow_mgr.py index 67f816982ec..64ca069dd22 100644 --- a/cylc/flow/flow_mgr.py +++ b/cylc/flow/flow_mgr.py @@ -160,7 +160,7 @@ def get_flow_num( if flow_num in self.flows: if meta is not None: - LOG.warning( + LOG.debug( f'Ignoring flow metadata "{meta}":' f' {flow_num} is not a new flow' ) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 888280b2681..30b5e626ae4 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1105,7 +1105,10 @@ def kill_tasks( return len(unkillable) def remove_tasks( - self, items: Iterable[str], flow_nums: Optional['FlowNums'] = None + self, + items: Iterable[str], + flow_nums: 'FlowNums', + default_remove: bool = False, ) -> None: """Remove tasks (`cylc remove` command). @@ -1113,15 +1116,19 @@ def remove_tasks( items: Relative IDs or globs. flow_nums: Flows to remove the tasks from. If empty or None, it means 'all'. + default_remove: if True, called via `cylc trigger` command to + auto-erase flow history of past tasks before triggering """ active, inactive, _unmatched = self.pool.filter_task_proxies( items, warn_no_active=False, inactive=True ) + if default_remove: + # auto-remove via trigger only applies to past tasks + active = [] + if not (active or inactive): return - if flow_nums is None: - flow_nums = set() # Mapping of *relative* task IDs to removed flow numbers: removed: Dict[Tokens, FlowNums] = {} not_removed: Set[Tokens] = set() @@ -1546,11 +1553,14 @@ def start_job_submission(self, itasks: 'Iterable[TaskProxy]') -> bool: for itask in submitted: flow = stringify_flow_nums(itask.flow_nums) or FLOW_NONE - LOG.log( - log_lvl, - f"{itask.identity} -triggered off " - f"{itask.state.get_resolved_dependencies()} in flow {flow}" - ) + if itask.is_manual_submit: + off = f"[] in flow {flow}" + else: + off = ( + f"{itask.state.get_resolved_dependencies()}" + f" in flow {flow}" + ) + LOG.log(log_lvl, f"{itask.identity} -triggered off {off}") # one or more tasks were passed through the submission pipeline return True diff --git a/cylc/flow/scripts/trigger.py b/cylc/flow/scripts/trigger.py index 2d353b4d8ce..9f006e0ee1e 100755 --- a/cylc/flow/scripts/trigger.py +++ b/cylc/flow/scripts/trigger.py @@ -17,18 +17,30 @@ """cylc trigger [OPTIONS] ARGS -Force task(s) to run regardless of prerequisites, even in a paused workflow. +Force task(s) to run, even in a paused workflow. -Triggering a task that is not yet queued will queue it. +Triggering a task that is not yet queued will queue it; triggering a queued +task will run it - so un-queued tasks may need to be triggered twice. -Triggering a queued task runs it immediately. +Tasks can't be triggered if already active (preparing, submitted, running). -Cylc queues restrict the number of jobs that can be active (submitted or -running) at once. They release tasks to run when their active task count -drops below the queue limit. +Triggering a group of tasks at once: + Dependence on tasks beyond the group ("off-group prerequisites") will be + satisfied automatically; in-group prerequisites will be left to the flow. -Attempts to trigger active (preparing, submitted, running) -tasks will be ignored. +Triggering a sub-graph: + * Group approach: trigger all sub-graph tasks as a group. The forced + satisfaction of off-group prerequisites will automatically avoid a stall. + * Fundamental approach: trigger the initial tasks of the sub-graph to start + the flow, and manually set off-flow prerequisites to prevent a stall. + +Triggering past tasks: + If flows are not specified (i.e., no use of --flow) the flow-history of + target tasks will be erased (see also "cylc remove") so that the graph can + be re-traversed without starting a new flow. + Rarely, you may want to retain flow history even if not starting a new flow. + If so, use `--flow=all` avoid the erasure. Example: triggering a task twice + with `--wait` to complete different outputs. Examples: # trigger task foo in cycle 1234 in test @@ -40,6 +52,11 @@ # start a new flow by triggering 1234/foo in test $ cylc trigger --flow=new test//1234/foo +Cylc queues: + Queues limit how many tasks can be active (preparing, submitted, running) at + once. Tasks that are ready to run will remained queued until the active task + count drops below the queue limit. + Flows: Waiting tasks in the active window (n=0) already belong to a flow. * by default, if triggered, they run in the same flow diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index e0cecad623c..93ef7d0f7c0 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -425,7 +425,7 @@ def submit_livelike_task_jobs( 'platform_name': itask.platform['name'], 'job_runner_name': itask.summary['job_runner_name'], }) - + # reset the is_manual_submit flag in case of retries itask.is_manual_submit = False if ri_map[install_target] == REMOTE_FILE_INSTALL_255: diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index f827a57d64d..fbd37f2433b 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1345,10 +1345,12 @@ def hold_tasks(self, items: Iterable[str]) -> int: ) for itask in itasks: self.hold_active_task(itask) + # Set inactive tasks to be held: for tdef, cycle in inactive_tasks: self.data_store_mgr.delta_task_held(tdef.name, cycle, True) self.tasks_to_hold.add((tdef.name, cycle)) + self.workflow_db_mgr.put_tasks_to_hold(self.tasks_to_hold) LOG.debug(f"Tasks to hold: {self.tasks_to_hold}") return len(unmatched) @@ -1717,6 +1719,7 @@ def spawn_task( point: 'PointBase', flow_nums: Set[int], flow_wait: bool = False, + force: bool = False ) -> Optional[TaskProxy]: """Return a new task proxy for the given flow if possible. @@ -1751,7 +1754,8 @@ def spawn_task( return None if ( - prev_status is not None + not force + and prev_status is not None and not itask.state.outputs.get_completed_outputs() ): # If itask has any history in this flow but no completed outputs @@ -1762,7 +1766,7 @@ def spawn_task( LOG.debug(f"Not respawning {point}/{name} - task was removed") return None - if prev_status in TASK_STATUSES_FINAL: + if not force and prev_status in TASK_STATUSES_FINAL: # Task finished previously. msg = f"[{point}/{name}:{prev_status}] already finished" if itask.is_complete(): @@ -1921,7 +1925,8 @@ def set_prereqs_and_outputs( prereqs: List[str], flow: List[str], flow_wait: bool = False, - flow_descr: Optional[str] = None + flow_descr: Optional[str] = None, + flow_nums: Optional[Set[int]] = None, ): """Set prerequisites or outputs of target tasks. @@ -1955,11 +1960,19 @@ def set_prereqs_and_outputs( items: task ID match patterns prereqs: prerequisites to set outputs: outputs to set - flow: flow numbers for spawned or merged tasks + flow: raw input flow numbers for spawned or merged tasks + flow_nums: if actual flow numbers have already been computed flow_wait: wait for flows to catch up before continuing flow_descr: description of new flow """ + if not flow_nums: + if not flow: + # default: assign to all active flows + flow_nums = self._get_active_flow_nums() + else: + flow_nums = self._get_flow_nums(flow, flow_descr) + # Get matching pool tasks and inactive task definitions. itasks, inactive_tasks, unmatched = self.filter_task_proxies( items, @@ -1967,17 +1980,19 @@ def set_prereqs_and_outputs( warn_no_active=False, ) - flow_nums = self._get_flow_nums(flow, flow_descr) - # Set existing task proxies. for itask in itasks: + # TODO can flow be 'none' now? if flow == ['none'] and itask.flow_nums != set(): LOG.error( f"[{itask}] ignoring 'flow=none' set: task already has" f" {repr_flow_nums(itask.flow_nums, full=True)}" ) continue - self.merge_flows(itask, flow_nums) + if flow: + self.merge_flows(itask, flow_nums) + # else keep existing flows + if prereqs: self._set_prereqs_itask(itask, prereqs, flow_nums) else: @@ -1987,9 +2002,6 @@ def set_prereqs_and_outputs( self._set_outputs_itask(itask, outputs) # Spawn and set inactive tasks. - if not flow: - # default: assign to all active flows - flow_nums = self._get_active_flow_nums() for tdef, point in inactive_tasks: if prereqs: self._set_prereqs_tdef( @@ -2002,8 +2014,9 @@ def set_prereqs_and_outputs( if trans is not None: self._set_outputs_itask(trans, outputs) - if self.compute_runahead(): - self.release_runahead_tasks() + # for "cylc play --start-tasks" compute runahead after spawning + self.compute_runahead() + self.release_runahead_tasks() def _set_outputs_itask( self, @@ -2087,26 +2100,40 @@ def _set_prereqs_itask( # No prereqs matched. return False if ( - self.runahead_limit_point is not None + itask.state.is_runahead + and self.runahead_limit_point is not None and itask.point <= self.runahead_limit_point ): - self.rh_release_and_queue(itask) - self.data_store_mgr.delta_task_prerequisite(itask) + # Release from runahead, and queue it. + # TODO? self.rh_release_and_queue(itask) + self.spawn_to_rh_limit( + itask.tdef, + itask.tdef.next_point(itask.point), + itask.flow_nums + ) + return True def _set_prereqs_tdef( self, point, taskdef, prereqs, flow_nums, flow_wait - ): + ) -> Optional[TaskProxy]: """Spawn an inactive task and set prerequisites on it.""" itask = self.spawn_task( - taskdef.name, point, flow_nums, flow_wait=flow_wait + taskdef.name, point, flow_nums, flow_wait=flow_wait, force=True ) if itask is None: - return + # TODO is this possible? + return None + + self.db_add_new_flow_rows(itask) + + # TODO check flow-wait (see master force_trigger_tasks) if self._set_prereqs_itask(itask, prereqs, flow_nums): self.add_to_pool(itask) + return itask + def _get_active_flow_nums(self) -> 'FlowNums': """Return all active flow numbers. @@ -2136,7 +2163,8 @@ def _get_flow_nums( active tasks. """ - if flow == [FLOW_NONE]: + # TODO check is None possible or should be prevented getting here? + if flow is None or flow == [FLOW_NONE]: return set() if flow == [FLOW_ALL]: return self._get_active_flow_nums() @@ -2170,6 +2198,7 @@ def _force_trigger(self, itask: 'TaskProxy', on_resume: bool = False): itask.is_manual_submit = True itask.reset_try_timers() + LOG.info(f"[{itask}] - force trigger") if itask.state_reset(TASK_STATUS_WAITING): # (could also be unhandled failed) self.data_store_mgr.delta_task_state(itask) @@ -2213,6 +2242,15 @@ def _force_trigger(self, itask: 'TaskProxy', on_resume: bool = False): # if so check/spawn if xtrigger sequential. self.check_spawn_psx_task(itask) + def _force_trigger_if_ready(self, itask, on_resume): + if not itask.prereqs_are_satisfied(): + return + itask.is_manual_submit = True + itask.reset_try_timers() + self.data_store_mgr.delta_task_prerequisite(itask) + # TODO this is only called here now + self._force_trigger(itask, on_resume) + def force_trigger_tasks( self, items: Iterable[str], @@ -2221,10 +2259,16 @@ def force_trigger_tasks( flow_descr: Optional[str] = None, on_resume: bool = False ): - """Manually trigger tasks. + """Manually trigger a selected group of tasks. + + Satisfy any off-group prerequisites, in the group. Tasks with only + off-group prerequisites will run immediately. Other prerequisites + will be respected within the group. + + # TODO: check the following (presumably there are tests): If a task did not run before in the flow: - - trigger it, and spawn on outputs unless flow-wait is set. + - run it, and spawn on outputs unless flow-wait is set. (but load the previous outputs from the DB) If a task ran before in the flow: @@ -2235,11 +2279,18 @@ def force_trigger_tasks( - just spawn (if not already spawned in this flow) unless flow-wait is set. + flow: [] - default + ['none'] - no-flow + """ # Get matching tasks proxies, and matching inactive task IDs. existing_tasks, inactive, unmatched = self.filter_task_proxies( items, inactive=True, warn_no_active=False, ) + all_ids = ( + [(tdef.name, point) for (tdef, point) in inactive] + + [(itask.tdef.name, itask.point) for itask in existing_tasks] + ) flow_nums = self._get_flow_nums(flow, flow_descr) @@ -2254,8 +2305,32 @@ def force_trigger_tasks( if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE): LOG.error(f"[{itask}] ignoring trigger - already active") continue + + for pre in itask.state.prerequisites: + # satisfy off-group prerequisites + for ( + p_point, p_name, p_out + ), p_state in pre._satisfied.items(): + if ( + not p_state and + (p_name, get_point(p_point)) not in all_ids + ): + # off-group + LOG.info( + f"[{itask}] - force satisfying off-group" + f" prerequisite {p_point}/{p_name}:{p_out}" + ) + itask.satisfy_me( + [ + Tokens( + cycle=p_point, + task=p_name, + task_sel=p_out + ) + ] + ) self.merge_flows(itask, flow_nums) - self._force_trigger(itask, on_resume) + self._force_trigger_if_ready(itask, on_resume) # Spawn and trigger inactive tasks. if not flow: @@ -2263,34 +2338,32 @@ def force_trigger_tasks( flow_nums = self._get_active_flow_nums() for tdef, point in inactive: - if not self.can_be_spawned(tdef.name, point): - continue - submit_num, _, prev_fwait = ( - self._get_task_history(tdef.name, point, flow_nums) - ) - itask = TaskProxy( - self.tokens, - tdef, - point, - flow_nums, - flow_wait=flow_wait, - submit_num=submit_num, - sequential_xtrigger_labels=( - self.xtrigger_mgr.xtriggers.sequential_xtrigger_labels - ), - ) - if itask is None: - continue - - self.db_add_new_flow_rows(itask) - - if prev_fwait: - # update completed outputs from the DB - self._load_historical_outputs(itask) + jtask: Optional[TaskProxy] = None + if tdef.is_parentless(point): + # parentless: set pre=all to spawn into task pool + jtask = self._set_prereqs_tdef( + point, tdef, ["all"], flow_nums, flow_wait + ) + else: + # set off-group prerequisites + off_flow_prereqs = [] + for pid in tdef.get_triggers(point): + p_point = pid.get_point(point) + p_name = pid.task_name + p_out = pid.output + if (p_name, get_point(p_point)) not in all_ids: + off_flow_prereqs.append(f"{p_point}/{p_name}:{p_out}") + LOG.info( + f"[{point}/{tdef.name}] - force satisfying off-" + f"group prerequisite {p_point}/{p_name}:{p_out}" + ) - # run it (or run it again for incomplete flow-wait) - self.add_to_pool(itask) - self._force_trigger(itask, on_resume) + if off_flow_prereqs: + jtask = self._set_prereqs_tdef( + point, tdef, off_flow_prereqs, flow_nums, flow_wait + ) + if jtask is not None: + self._force_trigger_if_ready(jtask, on_resume) def spawn_parentless_sequential_xtriggers(self): """Spawn successor(s) of parentless wall clock satisfied tasks.""" @@ -2469,18 +2542,6 @@ def match_inactive_tasks( continue point_str = cast('str', tokens['cycle']) - name_str = cast('str', tokens['task']) - if name_str not in self.config.taskdefs: - if self.config.find_taskdefs(name_str): - # It's a family name; was not matched by active tasks - LOG.warning( - f"No active tasks in the family {name_str}" - f' matching: {id_}' - ) - else: - LOG.warning(self.ERR_TMPL_NO_TASKID_MATCH.format(name_str)) - unmatched_tasks.append(id_) - continue try: point_str = standardise_point_string(point_str) except PointParsingError as exc: @@ -2488,18 +2549,28 @@ def match_inactive_tasks( f"{id_} - invalid cycle point: {point_str} ({exc})") unmatched_tasks.append(id_) continue - point = get_point(point_str) - taskdef = self.config.taskdefs[name_str] - if taskdef.is_valid_point(point): - matched_tasks.add((taskdef, point)) - else: - LOG.warning( - self.ERR_PREFIX_TASK_NOT_ON_SEQUENCE.format( - taskdef.name, point - ) - ) + + name_str = cast('str', tokens['task']) + + members = self.config.find_taskdefs(name_str) + if not members: + LOG.warning(self.ERR_TMPL_NO_TASKID_MATCH.format(name_str)) unmatched_tasks.append(id_) continue + + point = get_point(point_str) + for name in [m.name for m in members]: + taskdef = self.config.taskdefs[name] + if taskdef.is_valid_point(point): + matched_tasks.add((taskdef, point)) + else: + LOG.warning( + self.ERR_PREFIX_TASK_NOT_ON_SEQUENCE.format( + taskdef.name, point + ) + ) + unmatched_tasks.append(id_) + continue return matched_tasks, unmatched_tasks def match_taskdefs( diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 620b647f909..69f38f62abc 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -334,6 +334,15 @@ def __str__(self) -> str: f"{id_}{repr_flow_nums(self.flow_nums)}:{self.state}" ) + def __eq__(self, other) -> bool: + """Task proxy equality is based point/name only.""" + # (Needed e.g. for adding to task_pool.tasks_to_trigger sets) + return self.identity == other.identity + + def __hash__(self): + """Task proxy equality is based point/name only.""" + return hash(self.identity) + def copy_to_reload_successor( self, reload_successor: 'TaskProxy', diff --git a/cylc/flow/taskdef.py b/cylc/flow/taskdef.py index 69d400304e6..d4fc52e6957 100644 --- a/cylc/flow/taskdef.py +++ b/cylc/flow/taskdef.py @@ -325,6 +325,22 @@ def get_parent_points(self, point): parent_points.add(trig.get_parent_point(point)) return parent_points + def get_triggers(self, point): + """Return my triggers, at point.""" + triggers = set() + for seq in self.sequences: + if not seq.is_valid(point): + continue + if seq in self.dependencies: + # task has prereqs in this sequence + for dep in self.dependencies[seq]: + # TODO? + if dep.suicide: + continue + for trig in dep.task_triggers: + triggers.add(trig) + return triggers + def has_only_abs_triggers(self, point): """Return whether I have only absolute triggers at point.""" if not self.has_abs_triggers: diff --git a/tests/functional/cylc-show/06-past-present-future.t b/tests/functional/cylc-show/06-past-present-future.t index a67636bc613..73a749f523b 100644 --- a/tests/functional/cylc-show/06-past-present-future.t +++ b/tests/functional/cylc-show/06-past-present-future.t @@ -42,11 +42,12 @@ state: succeeded prerequisites: (n/a for past tasks) __END__ +# Note trigger command satisfies off-flow prerequisites. TEST_NAME="${TEST_NAME_BASE}-show.present" contains_ok "${WORKFLOW_RUN_DIR}/show-c.txt" <<__END__ state: running prerequisites: ('⨯': not satisfied) - ⨯ 1/b succeeded + ✓ 1/b succeeded __END__ TEST_NAME="${TEST_NAME_BASE}-show.future" diff --git a/tests/functional/cylc-trigger/00-compat/reference.log b/tests/functional/cylc-trigger/00-compat/reference.log index 499c919d40d..1a9f846c98d 100644 --- a/tests/functional/cylc-trigger/00-compat/reference.log +++ b/tests/functional/cylc-trigger/00-compat/reference.log @@ -1,4 +1,4 @@ Initial point: 1 Final point: 1 1/foo -triggered off [] -1/bar -triggered off [] +1/bar -triggered off ['1/foo'] diff --git a/tests/functional/cylc-trigger/02-filter-failed/flow.cylc b/tests/functional/cylc-trigger/02-filter-failed/flow.cylc index 7416cf5790d..fd56261a255 100644 --- a/tests/functional/cylc-trigger/02-filter-failed/flow.cylc +++ b/tests/functional/cylc-trigger/02-filter-failed/flow.cylc @@ -18,10 +18,10 @@ [[fixer]] script = """ cylc__job__wait_cylc_message_started - cylc__job__poll_grep_workflow_log -E '1/fixable1/01:running.* \(received\)failed' - cylc__job__poll_grep_workflow_log -E '1/fixable2/01:running.* \(received\)failed' - cylc__job__poll_grep_workflow_log -E '1/fixable3/01:running.* \(received\)failed' - cylc trigger "${CYLC_WORKFLOW_ID}//1/fixable*" + cylc__job__poll_grep_workflow_log -E '1/fixable1/01.* => failed' + cylc__job__poll_grep_workflow_log -E '1/fixable2/01.* => failed' + cylc__job__poll_grep_workflow_log -E '1/fixable3/01.* => failed' + cylc trigger "${CYLC_WORKFLOW_ID}//1/fixable*:failed" """ [[Z]] script = true diff --git a/tests/functional/cylc-trigger/06-already-active/flow.cylc b/tests/functional/cylc-trigger/06-already-active/flow.cylc index 9f02110b207..27c1392bbee 100644 --- a/tests/functional/cylc-trigger/06-already-active/flow.cylc +++ b/tests/functional/cylc-trigger/06-already-active/flow.cylc @@ -9,7 +9,7 @@ [runtime] [[triggerer]] script = """ - cylc__job__poll_grep_workflow_log "1/triggeree/01:running" + cylc__job__poll_grep_workflow_log "1/triggeree.* => running" -E cylc trigger "$CYLC_WORKFLOW_ID//1/triggeree" cylc__job__poll_grep_workflow_log \ "1/triggeree.* ignoring trigger - already active" -E diff --git a/tests/functional/cylc-trigger/08-group-trigger.t b/tests/functional/cylc-trigger/08-group-trigger.t new file mode 100644 index 00000000000..2ec810b8860 --- /dev/null +++ b/tests/functional/cylc-trigger/08-group-trigger.t @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- +# Test group trigger. +# See https://github.com/cylc/cylc-flow/issues/6395 + +. "$(dirname "$0")/test_header" + +set_test_number 2 + +install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" + +run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" + +workflow_run_ok "${TEST_NAME_BASE}-run" \ + cylc play --debug --no-detach --reference-test "${WORKFLOW_NAME}" + +purge diff --git a/tests/functional/cylc-trigger/08-group-trigger/flow.cylc b/tests/functional/cylc-trigger/08-group-trigger/flow.cylc new file mode 100644 index 00000000000..11429921cb6 --- /dev/null +++ b/tests/functional/cylc-trigger/08-group-trigger/flow.cylc @@ -0,0 +1,22 @@ +[scheduler] + allow implicit tasks = True + [[events]] + expected task failures = 1/fail +[scheduling] + [[graph]] + R1 = """ + x => a # spawn a into active pool + fail => a => b => c + fail => b + """ +[runtime] + [[fail]] + script = """ + # this should automatically set off-flow prerequisites + # 1/x (already satisfied) and 1/fail (not satisfied). + # But internal deps "a => b => c" should be respected. + cylc trigger ${CYLC_WORKFLOW_ID} //1/a //1/b //1/c + false + """ + [[c]] + script = "cylc stop $CYLC_WORKFLOW_ID" diff --git a/tests/functional/cylc-trigger/08-group-trigger/reference.log b/tests/functional/cylc-trigger/08-group-trigger/reference.log new file mode 100644 index 00000000000..a58f6e38251 --- /dev/null +++ b/tests/functional/cylc-trigger/08-group-trigger/reference.log @@ -0,0 +1,5 @@ +1/fail -triggered off [] in flow 1 +1/x -triggered off [] in flow 1 +1/a -triggered off ['1/fail', '1/x'] in flow 1 +1/b -triggered off ['1/a', '1/fail'] in flow 1 +1/c -triggered off ['1/b'] in flow 1 diff --git a/tests/functional/flow-triggers/00-new-future/reference.log b/tests/functional/flow-triggers/00-new-future/reference.log index 653bd6b94d2..3f851d9d39e 100644 --- a/tests/functional/flow-triggers/00-new-future/reference.log +++ b/tests/functional/flow-triggers/00-new-future/reference.log @@ -1,7 +1,7 @@ Initial point: 1 Final point: 1 1/a -triggered off [] in flow 1 -1/d -triggered off [] in flow 2 +1/d -triggered off ['1/c'] in flow 2 1/e -triggered off ['1/d'] in flow 2 1/b -triggered off ['1/a'] in flow 1 1/f -triggered off ['1/e'] in flow 2 diff --git a/tests/functional/flow-triggers/01-all-future/reference.log b/tests/functional/flow-triggers/01-all-future/reference.log index fdf4f91cd04..b5cfcf3a950 100644 --- a/tests/functional/flow-triggers/01-all-future/reference.log +++ b/tests/functional/flow-triggers/01-all-future/reference.log @@ -3,6 +3,6 @@ Final point: 1 1/a -triggered off [] in flow 1 1/b -triggered off ['1/a'] in flow 1 1/c -triggered off ['1/b'] in flow 1 -1/d -triggered off [] in flow 1 +1/d -triggered off ['1/c'] in flow 1 1/e -triggered off ['1/d'] in flow 1 1/f -triggered off ['1/e'] in flow 1 diff --git a/tests/functional/flow-triggers/02-none-future/reference.log b/tests/functional/flow-triggers/02-none-future/reference.log index 358e488fe10..ec2dc18e788 100644 --- a/tests/functional/flow-triggers/02-none-future/reference.log +++ b/tests/functional/flow-triggers/02-none-future/reference.log @@ -1,7 +1,7 @@ Initial point: 1 Final point: 1 1/a -triggered off [] in flow 1 -1/d -triggered off [] in flow none +1/d -triggered off ['1/c'] in flow none 1/b -triggered off ['1/a'] in flow 1 1/c -triggered off ['1/b'] in flow 1 1/d -triggered off ['1/c'] in flow 1 diff --git a/tests/functional/flow-triggers/04-all-past.t b/tests/functional/flow-triggers/04-all-past.t index 026d0d6b888..91cb0cc2f53 100644 --- a/tests/functional/flow-triggers/04-all-past.t +++ b/tests/functional/flow-triggers/04-all-past.t @@ -28,14 +28,14 @@ reftest_run # NOTE task_states table only keeps the final submit number of a task for each flow TEST_NAME="${TEST_NAME_BASE}-order-no-wait" -QUERY="SELECT name,submit_num,flow_nums,flow_wait FROM task_states ORDER BY time_updated;" +QUERY="SELECT name,submit_num,flow_wait FROM task_states WHERE flow_nums is '[1]' ORDER BY time_updated;" # (Ordering by time_updated, 'c' comes before 'a' job:02) run_ok "${TEST_NAME}" sqlite3 "${DB}" "$QUERY" cmp_ok "${TEST_NAME}.stdout" <<\__END__ -b|1|[1]|0 -c|1|[1]|0 -a|2|[1]|0 -d|1|[1]|0 +b|1|0 +c|1|0 +a|2|0 +d|1|0 __END__ export REFTEST_OPTS=--set="WAIT=1" @@ -43,15 +43,14 @@ install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" true reftest_run TEST_NAME="${TEST_NAME_BASE}-order-wait" -QUERY="SELECT name,submit_num,flow_nums,flow_wait FROM task_states ORDER BY time_updated" +QUERY="SELECT name,submit_num,flow_wait FROM task_states WHERE flow_nums is '[1]' ORDER BY time_updated" # (Ordering by time_updated, 'c' comes before 'a' job:02) run_ok "${TEST_NAME}" sqlite3 "${DB}" "$QUERY" cmp_ok "${TEST_NAME}.stdout" <<\__END__ -b|1|[1]|0 -c|1|[1]|0 -a|2|[1]|1 -d|1|[1]|0 +b|1|0 +c|1|0 +a|2|1 +d|1|0 __END__ purge - diff --git a/tests/functional/flow-triggers/07-all-past-switch.t b/tests/functional/flow-triggers/07-all-past-switch.t index cdcc33861b4..8ecffffd134 100644 --- a/tests/functional/flow-triggers/07-all-past-switch.t +++ b/tests/functional/flow-triggers/07-all-past-switch.t @@ -29,16 +29,16 @@ reftest_run # NOTE task_states table only keeps the final submit number of a task for each flow TEST_NAME="${TEST_NAME_BASE}-order-no-wait" -QUERY="SELECT name,submit_num,flow_nums,flow_wait FROM task_states ORDER BY time_updated;" +QUERY="SELECT name,submit_num,flow_wait FROM task_states WHERE flow_nums is '[1]' ORDER BY time_updated;" # Ordering by time_updated a(job:02) comes before c, which triggers it and # waits for it to finish. run_ok "${TEST_NAME}" sqlite3 "${DB}" "$QUERY" cmp_ok "${TEST_NAME}.stdout" <<\__END__ -b|1|[1]|0 -a|2|[1]|0 -c|1|[1]|0 -d|1|[1]|0 -e|1|[1]|0 +b|1|0 +a|2|0 +c|1|0 +d|1|0 +e|1|0 __END__ export REFTEST_OPTS=--set="WAIT=1" @@ -47,14 +47,14 @@ cp "${WORKFLOW_RUN_DIR}/runN/reflog-wait.log" "${WORKFLOW_RUN_DIR}/runN/referenc reftest_run TEST_NAME="${TEST_NAME_BASE}-order-wait" -QUERY="SELECT name,submit_num,flow_nums,flow_wait FROM task_states ORDER BY time_updated" +QUERY="SELECT name,submit_num,flow_wait FROM task_states WHERE flow_nums is '[1]' ORDER BY time_updated" # Ordering by time_updated a(job:02) comes before c, which triggers it and # waits for it to finish. run_ok "${TEST_NAME}" sqlite3 "${DB}" "$QUERY" cmp_ok "${TEST_NAME}.stdout" <<\__END__ -b|1|[1]|0 -a|2|[1]|1 -c|1|[1]|0 +b|1|0 +a|2|1 +c|1|0 __END__ purge diff --git a/tests/functional/flow-triggers/09-retrigger/flow.cylc b/tests/functional/flow-triggers/09-retrigger/flow.cylc index a0e1341c06e..a7c9dfa0fec 100644 --- a/tests/functional/flow-triggers/09-retrigger/flow.cylc +++ b/tests/functional/flow-triggers/09-retrigger/flow.cylc @@ -14,7 +14,8 @@ script = """ cylc trigger --wait ${CYLC_WORKFLOW_ID}//1/baz cylc__job__poll_grep_workflow_log "1/baz/01:running.*succeeded" - cylc trigger --wait ${CYLC_WORKFLOW_ID}//1/baz + # Use --flow=all to avoid erasing flow history of the first trigger. + cylc trigger --flow=all --wait ${CYLC_WORKFLOW_ID}//1/baz cylc__job__poll_grep_workflow_log "1/baz/02:running.*succeeded" """ [[baz]] diff --git a/tests/functional/flow-triggers/09-retrigger/reference.log b/tests/functional/flow-triggers/09-retrigger/reference.log index 00afaf9c6fe..090cabeac22 100644 --- a/tests/functional/flow-triggers/09-retrigger/reference.log +++ b/tests/functional/flow-triggers/09-retrigger/reference.log @@ -1,8 +1,8 @@ Initial point: 1 Final point: 1 1/foo -triggered off [] in flow 1 -1/baz -triggered off [] in flow 1 -1/baz -triggered off [] in flow 1 +1/baz -triggered off ['1/bar'] in flow 1 +1/baz -triggered off ['1/bar'] in flow 1 1/bar -triggered off ['1/foo'] in flow 1 1/x -triggered off ['1/baz'] in flow 1 1/y -triggered off ['1/baz'] in flow 1 diff --git a/tests/functional/flow-triggers/10-specific-flow/reference.log b/tests/functional/flow-triggers/10-specific-flow/reference.log index 64261d2b0ad..cc6f057af81 100644 --- a/tests/functional/flow-triggers/10-specific-flow/reference.log +++ b/tests/functional/flow-triggers/10-specific-flow/reference.log @@ -2,12 +2,12 @@ Initial point: 1 Final point: 1 1/a -triggered off [] in flow 1 1/trigger-happy -triggered off [] in flow 1 -1/f -triggered off [] in flow 2 +1/f -triggered off ['1/e'] in flow 2 1/b -triggered off ['1/a'] in flow 1 1/c -triggered off ['1/b'] in flow 1 1/d -triggered off ['1/c'] in flow 1 1/e -triggered off ['1/d'] in flow 1 -1/b -triggered off [] in flow 2 +1/b -triggered off ['1/a'] in flow 2 1/f -triggered off ['1/e'] in flow 1 1/c -triggered off ['1/b'] in flow 2 1/g -triggered off ['1/f'] in flow 1 diff --git a/tests/functional/flow-triggers/11-wait-merge/reference.log b/tests/functional/flow-triggers/11-wait-merge/reference.log index 6d90cf31ff2..6cf0d81563d 100644 --- a/tests/functional/flow-triggers/11-wait-merge/reference.log +++ b/tests/functional/flow-triggers/11-wait-merge/reference.log @@ -3,7 +3,7 @@ Final point: 1 1/a -triggered off [] in flow 1 1/b -triggered off ['1/a'] in flow 1 1/a -triggered off [] in flow 2 -1/c -triggered off [] in flow 2 +1/c -triggered off ['1/b'] in flow 2 1/x -triggered off ['1/c'] in flow 1,2 1/d -triggered off ['1/c'] in flow 1,2 1/b -triggered off ['1/a'] in flow 2 diff --git a/tests/functional/flow-triggers/12-all-future-multi/reference.log b/tests/functional/flow-triggers/12-all-future-multi/reference.log index 3581dd3965e..4e8a947fd27 100644 --- a/tests/functional/flow-triggers/12-all-future-multi/reference.log +++ b/tests/functional/flow-triggers/12-all-future-multi/reference.log @@ -6,9 +6,9 @@ Final point: 1 3/a -triggered off ['2/a'] in flow 1 4/a -triggered off ['3/a'] in flow 1 # the second flow -3/a -triggered off [] in flow 2 +3/a -triggered off ['2/a'] in flow 2 4/a -triggered off ['3/a'] in flow 2 # the joined flow -5/a -triggered off [] in flow 1,2 +5/a -triggered off ['4/a'] in flow 1,2 6/a -triggered off ['5/a'] in flow 1,2 7/a -triggered off ['6/a'] in flow 1,2 diff --git a/tests/functional/restart/58-waiting-manual-triggered.t b/tests/functional/restart/58-waiting-manual-triggered.t index fd47201e9f7..3337c6883cd 100644 --- a/tests/functional/restart/58-waiting-manual-triggered.t +++ b/tests/functional/restart/58-waiting-manual-triggered.t @@ -32,7 +32,7 @@ DB_FILE="${WORKFLOW_RUN_DIR}/log/db" # It should have shut down with 2/foo waiting with the is_manual_submit flag on. TEST_NAME="${TEST_NAME_BASE}-db-task-states" -QUERY='SELECT status, is_manual_submit FROM task_states WHERE cycle IS 2;' +QUERY='SELECT status, is_manual_submit FROM task_states WHERE cycle IS 2 AND flow_nums is "[1]";' run_ok "$TEST_NAME" sqlite3 "$DB_FILE" "$QUERY" cmp_ok "${TEST_NAME}.stdout" << '__EOF__' waiting|1 diff --git a/tests/functional/spawn-on-demand/00-no-reflow/reference.log b/tests/functional/spawn-on-demand/00-no-reflow/reference.log index 176e79f124a..3726fee4c3c 100644 --- a/tests/functional/spawn-on-demand/00-no-reflow/reference.log +++ b/tests/functional/spawn-on-demand/00-no-reflow/reference.log @@ -6,5 +6,5 @@ Final point: 2 1/baz -triggered off ['1/bar'] 2/bar -triggered off ['2/foo'] 2/triggerer -triggered off ['2/foo'] -1/bar -triggered off [] +1/bar -triggered off ['1/foo'] 2/baz -triggered off ['2/bar'] diff --git a/tests/functional/spawn-on-demand/01-reflow/reference.log b/tests/functional/spawn-on-demand/01-reflow/reference.log index 48550ec827a..e35bca62982 100644 --- a/tests/functional/spawn-on-demand/01-reflow/reference.log +++ b/tests/functional/spawn-on-demand/01-reflow/reference.log @@ -5,5 +5,5 @@ Final point: 2 1/baz -triggered off ['1/bar'] 2/foo -triggered off ['1/foo'] 2/triggerer -triggered off ['2/foo'] -1/bar -triggered off [] +1/bar -triggered off ['1/foo'] 1/baz -triggered off ['1/bar'] diff --git a/tests/functional/spawn-on-demand/16-parents-yes-no/reference.log b/tests/functional/spawn-on-demand/16-parents-yes-no/reference.log index 0f4e225300e..1f491e7d73c 100644 --- a/tests/functional/spawn-on-demand/16-parents-yes-no/reference.log +++ b/tests/functional/spawn-on-demand/16-parents-yes-no/reference.log @@ -5,7 +5,7 @@ Final point: 4 3/foo -triggered off [] in flow 1 1/bar -triggered off ['1/foo'] in flow 1 3/bar -triggered off ['3/foo'] in flow 1 -2/baz -triggered off [] in flow 1 +2/baz -triggered off ['1/failer'] in flow 1 2/foo -triggered off ['2/baz'] in flow 1 4/foo -triggered off [] in flow 1 4/bar -triggered off ['4/foo'] in flow 1 diff --git a/tests/integration/test_sequential_xtriggers.py b/tests/integration/test_sequential_xtriggers.py index d6a764c361d..effbdf5ae97 100644 --- a/tests/integration/test_sequential_xtriggers.py +++ b/tests/integration/test_sequential_xtriggers.py @@ -74,7 +74,7 @@ async def test_remove(sequential: Scheduler, start): ] # remove all tasks in the pool - sequential.remove_tasks(['*']) + sequential.remove_tasks(['*'], [1]) # the next cycle should be automatically spawned assert list_cycles(sequential) == ['2004'] diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 880046611a3..54254d96335 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -338,9 +338,9 @@ async def test_match_taskdefs( id="Name globs hold active tasks only" # (active means n=0 here) ), param( - ['1/FAM', '2/FAM', '6/FAM'], ['1/bar', '2/bar'], - ["No active tasks in the family FAM matching: 6/FAM"], - id="Family names hold active tasks only" + ['1/FAM', '2/FAM', '6/FAM'], ['1/bar', '2/bar', '6/bar'], + [], + id="Family names hold active and future tasks" ), param( ['1/grogu', 'H/foo', '20/foo', '1/pub'], [], @@ -493,7 +493,7 @@ async def test_trigger_states( ): """It should only trigger tasks in compatible states.""" - async with start(one): + async with start(one): #, level=logging.DEBUG): task = one.pool.filter_task_proxies(['1/one'])[0][0] # reset task a to the provided state @@ -573,11 +573,11 @@ async def test_runahead_after_remove( assert int(task_pool.runahead_limit_point) == 4 # No change after removing an intermediate cycle. - example_flow.remove_tasks(['3/*']) + example_flow.remove_tasks(['3/*'], [1]) assert int(task_pool.runahead_limit_point) == 4 # Should update after removing the first point. - example_flow.remove_tasks(['1/*']) + example_flow.remove_tasks(['1/*'], [1]) assert int(task_pool.runahead_limit_point) == 5 diff --git a/tests/unit/test_flow_mgr.py b/tests/unit/test_flow_mgr.py index e73ec35fb2c..a32c56408c3 100644 --- a/tests/unit/test_flow_mgr.py +++ b/tests/unit/test_flow_mgr.py @@ -48,7 +48,7 @@ def test_all( db_mgr = WorkflowDatabaseManager() flow_mgr = FlowMgr(db_mgr) - caplog.set_level(logging.INFO, CYLC_LOG) + caplog.set_level(logging.DEBUG, CYLC_LOG) meta = "the quick brown fox" assert flow_mgr.get_flow_num(None, meta) == 1