From 0f1894bf070133b7c09de231dffa34cb999b607f Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 15 Feb 2023 09:13:36 +1300 Subject: [PATCH 1/4] Fix expire trigger. Update change log. Add new func test. --- CHANGES.md | 5 ++- cylc/flow/task_pool.py | 31 ++++++++++--------- tests/functional/hold-release/05-release.t | 2 +- tests/functional/triggering/21-expire.t | 22 +++++++++++++ .../functional/triggering/21-expire/flow.cylc | 11 +++++++ .../triggering/21-expire/reference.log | 1 + 6 files changed, 56 insertions(+), 16 deletions(-) create mode 100644 tests/functional/triggering/21-expire.t create mode 100644 tests/functional/triggering/21-expire/flow.cylc create mode 100644 tests/functional/triggering/21-expire/reference.log diff --git a/CHANGES.md b/CHANGES.md index d381efb7d79..f10400a6803 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -17,8 +17,11 @@ ones in. --> [5398](https://github.com/cylc/cylc-flow/pull/5398) - Fix platform from group selection order bug. +[#5412](https://github.com/cylc/cylc-flow/pull/5412) - +Fix task expire trigger. + [#5384](https://github.com/cylc/cylc-flow/pull/5384) - -Fixes `cylc set-verbosity`. +Fix `cylc set-verbosity`. [#5386](https://github.com/cylc/cylc-flow/pull/5386) Fix bug where absence of `job name length maximum` in PBS platform settings would cause diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 700727ba892..973245b5c85 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -686,6 +686,7 @@ def remove(self, itask, reason=""): if not self.hidden_pool[itask.point]: del self.hidden_pool[itask.point] LOG.debug(f"[{itask}] {msg}") + self.task_queue_mgr.remove_task(itask) return try: @@ -696,9 +697,9 @@ def remove(self, itask, reason=""): self.main_pool_changed = True if not self.main_pool[itask.point]: del self.main_pool[itask.point] - self.task_queue_mgr.remove_task(itask) - if itask.tdef.max_future_prereq_offset is not None: - self.set_max_future_offset() + self.task_queue_mgr.remove_task(itask) + if itask.tdef.max_future_prereq_offset is not None: + self.set_max_future_offset() # Notify the data-store manager of their removal # (the manager uses window boundary tracking for pruning). @@ -815,8 +816,8 @@ def release_queued_tasks(self): for itask in released: itask.state_reset(is_queued=False) - itask.waiting_on_job_prep = True self.data_store_mgr.delta_task_queued(itask) + itask.waiting_on_job_prep = True if cylc.flow.flags.cylc7_back_compat: # Cylc 7 Back Compat: spawn downstream to cause Cylc 7 style @@ -994,8 +995,7 @@ def can_stop(self, stop_mode): and itask.state(*TASK_STATUSES_ACTIVE) and not itask.state.kill_failed ) - # we don't need to check for preparing tasks because they will be - # reset to waiting on restart + # preparing tasks because they will be reset to waiting on restart for itask in self.get_tasks() ) @@ -1321,7 +1321,7 @@ def remove_if_complete(self, itask): ) else: # Remove as completed. - self.remove(itask, 'finished') + self.remove(itask, 'completed') if itask.identity == self.stop_task_id: self.stop_task_finished = True if self.compute_runahead(): @@ -1701,21 +1701,24 @@ def _set_expired_task(self, itask): or itask.tdef.expiration_offset is None ): return False + if itask.expire_time is None: itask.expire_time = ( itask.get_point_as_seconds() + itask.get_offset_as_seconds(itask.tdef.expiration_offset)) - if time() > itask.expire_time: - msg = 'Task expired (skipping job).' + + if ( + time() > itask.expire_time and + itask.state_reset(TASK_STATUS_EXPIRED) + ): + msg = 'Task expired: will not submit job.' LOG.warning(f"[{itask}] {msg}") self.task_events_mgr.setup_event_handlers(itask, "expired", msg) - # TODO succeeded and expired states are useless due to immediate - # removal under all circumstances (unhandled failed is still used). - if itask.state_reset(TASK_STATUS_EXPIRED, is_held=False): - self.data_store_mgr.delta_task_state(itask) - self.data_store_mgr.delta_task_held(itask) + self.data_store_mgr.delta_task_state(itask) + self.spawn_on_output(itask, "expired") self.remove(itask, 'expired') return True + return False def task_succeeded(self, id_): diff --git a/tests/functional/hold-release/05-release.t b/tests/functional/hold-release/05-release.t index 0dd50fd3f39..238dce5ba09 100755 --- a/tests/functional/hold-release/05-release.t +++ b/tests/functional/hold-release/05-release.t @@ -65,7 +65,7 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__' inherit = STOP script = """ cylc__job__poll_grep_workflow_log -E \ - '1/dog1 succeeded .* task proxy removed \(finished\)' + '1/dog1 succeeded .* task proxy removed \(completed\)' cylc stop "${CYLC_WORKFLOW_ID}" """ __FLOW_CONFIG__ diff --git a/tests/functional/triggering/21-expire.t b/tests/functional/triggering/21-expire.t new file mode 100644 index 00000000000..aaacdf807b0 --- /dev/null +++ b/tests/functional/triggering/21-expire.t @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- +# Test expire triggering +. "$(dirname "$0")/test_header" +set_test_number 2 +reftest +exit diff --git a/tests/functional/triggering/21-expire/flow.cylc b/tests/functional/triggering/21-expire/flow.cylc new file mode 100644 index 00000000000..162b05d3365 --- /dev/null +++ b/tests/functional/triggering/21-expire/flow.cylc @@ -0,0 +1,11 @@ +[scheduling] + initial cycle point = 1999 + [[special tasks]] + clock-expire = foo(PT0S) + [[graph]] + R1 = """ + foo:expire? => bar + foo => baz + """ +[runtime] + [[foo, bar, baz]] diff --git a/tests/functional/triggering/21-expire/reference.log b/tests/functional/triggering/21-expire/reference.log new file mode 100644 index 00000000000..ff564d4db2f --- /dev/null +++ b/tests/functional/triggering/21-expire/reference.log @@ -0,0 +1 @@ +19990101T0000Z/bar -triggered off ['19990101T0000Z/foo'] in flow 1 From f5a19f96af5f2a7b251b8518bdce63acae57a3e6 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 16 Mar 2023 16:08:01 +1300 Subject: [PATCH 2/4] Add expire-all trigger. --- CHANGES.md | 2 +- cylc/flow/graph_parser.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index f10400a6803..5eaea2cbdbd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -18,7 +18,7 @@ ones in. --> group selection order bug. [#5412](https://github.com/cylc/cylc-flow/pull/5412) - -Fix task expire trigger. +Fix task expire trigger, and add expire-all trigger. [#5384](https://github.com/cylc/cylc-flow/pull/5384) - Fix `cylc set-verbosity`. diff --git a/cylc/flow/graph_parser.py b/cylc/flow/graph_parser.py index e09d3d8d319..20740b46bf5 100644 --- a/cylc/flow/graph_parser.py +++ b/cylc/flow/graph_parser.py @@ -32,6 +32,7 @@ from cylc.flow.task_id import TaskID from cylc.flow.task_trigger import TaskTrigger from cylc.flow.task_outputs import ( + TASK_OUTPUT_EXPIRED, TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_STARTED, TASK_OUTPUT_FAILED, @@ -114,6 +115,8 @@ class GraphParser: QUAL_FAM_SUBMIT_ANY = "submit-any" QUAL_FAM_SUBMIT_FAIL_ALL = "submit-fail-all" QUAL_FAM_SUBMIT_FAIL_ANY = "submit-fail-any" + QUAL_FAM_EXPIRE_ALL = "expire-all" + QUAL_FAM_EXPIRE_ANY = "expire-any" # Map family trigger type to (member-trigger, any/all), for use in # expanding family trigger expressions to member trigger expressions. @@ -122,6 +125,8 @@ class GraphParser: # E.g. QUAL_FAM_START_ALL: (TASK_OUTPUT_STARTED, True) simply maps # "FAM:start-all" to "MEMBER:started" and "-all" (all members). fam_to_mem_trigger_map: Dict[str, Tuple[str, bool]] = { + QUAL_FAM_EXPIRE_ALL: (TASK_OUTPUT_EXPIRED, True), + QUAL_FAM_EXPIRE_ANY: (TASK_OUTPUT_EXPIRED, False), QUAL_FAM_START_ALL: (TASK_OUTPUT_STARTED, True), QUAL_FAM_START_ANY: (TASK_OUTPUT_STARTED, False), QUAL_FAM_SUCCEED_ALL: (TASK_OUTPUT_SUCCEEDED, True), @@ -138,6 +143,8 @@ class GraphParser: # Map family pseudo triggers to affected member outputs. fam_to_mem_output_map: Dict[str, List[str]] = { + QUAL_FAM_EXPIRE_ANY: [TASK_OUTPUT_EXPIRED], + QUAL_FAM_EXPIRE_ALL: [TASK_OUTPUT_EXPIRED], QUAL_FAM_START_ANY: [TASK_OUTPUT_STARTED], QUAL_FAM_START_ALL: [TASK_OUTPUT_STARTED], QUAL_FAM_SUCCEED_ANY: [TASK_OUTPUT_SUCCEEDED], @@ -735,6 +742,10 @@ def _set_output_opt( if suicide: return + if output == TASK_OUTPUT_EXPIRED and not optional: + raise GraphParseError( + f"Expired-output {name}:{output} must be optional") + if output == TASK_OUTPUT_FINISHED: # Interpret :finish pseudo-output if optional: From 1c09063e6a5bad533910aa5e3e3a4754052427ff Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 16 Mar 2023 16:17:48 +1300 Subject: [PATCH 3/4] Add unit test. --- tests/unit/test_graph_parser.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/unit/test_graph_parser.py b/tests/unit/test_graph_parser.py index 97b7fb45483..c84b6507323 100644 --- a/tests/unit/test_graph_parser.py +++ b/tests/unit/test_graph_parser.py @@ -763,6 +763,10 @@ def test_family_optional_outputs(qual, task_output): "FAM => foo", # bare family on LHS "Illegal family trigger" ], + [ + "FAM:expire-all => foo", + "must be optional" + ], ] ) def test_family_trigger_errors(graph, error): @@ -804,6 +808,10 @@ def test_family_trigger_errors(graph, error): "a:finish? => b", "Pseudo-output a:finished can't be optional", ], + [ + "a:expire => b", + "must be optional", + ], ] ) def test_task_optional_output_errors_order( From 7a7b879f79d6eb194aa21855fb1e15fb6027fb5d Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 16 Mar 2023 16:25:22 +1300 Subject: [PATCH 4/4] Extend new func test. --- .../functional/triggering/21-expire/flow.cylc | 26 ++++++++++++------- .../triggering/21-expire/reference.log | 5 +++- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/tests/functional/triggering/21-expire/flow.cylc b/tests/functional/triggering/21-expire/flow.cylc index 162b05d3365..d5c78bdf0b2 100644 --- a/tests/functional/triggering/21-expire/flow.cylc +++ b/tests/functional/triggering/21-expire/flow.cylc @@ -1,11 +1,19 @@ [scheduling] - initial cycle point = 1999 - [[special tasks]] - clock-expire = foo(PT0S) - [[graph]] - R1 = """ - foo:expire? => bar - foo => baz - """ + initial cycle point = 1999 + [[special tasks]] + clock-expire = foo1(PT0S), foo2(PT0S), bar1(PT0S), x(PT0S) + [[graph]] + # Expire: foo1, foo2, bar1, x + # Run: y, bar2, baz, qux + R1 = """ + x:expire? => y + FOO:expire-all? => baz + BAR:expire-any? => qux + """ [runtime] - [[foo, bar, baz]] + [[FOO, BAR]] + [[foo1, foo2]] + inherit = FOO + [[bar1, bar2]] + inherit = BAR + [[x, y, baz, qux]] diff --git a/tests/functional/triggering/21-expire/reference.log b/tests/functional/triggering/21-expire/reference.log index ff564d4db2f..8ba5edca688 100644 --- a/tests/functional/triggering/21-expire/reference.log +++ b/tests/functional/triggering/21-expire/reference.log @@ -1 +1,4 @@ -19990101T0000Z/bar -triggered off ['19990101T0000Z/foo'] in flow 1 +19990101T0000Z/bar2 -triggered off [] in flow 1 +19990101T0000Z/baz -triggered off ['19990101T0000Z/foo1', '19990101T0000Z/foo2'] in flow 1 +19990101T0000Z/qux -triggered off ['19990101T0000Z/bar1'] in flow 1 +19990101T0000Z/y -triggered off ['19990101T0000Z/x'] in flow 1