Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix expire triggers. #5412

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ ones in. -->
[5398](https://github.com/cylc/cylc-flow/pull/5398) - Fix platform from
group selection order bug.

[#5412](https://github.com/cylc/cylc-flow/pull/5412) -
Fix task expire trigger, and add expire-all trigger.

[#5384](https://github.com/cylc/cylc-flow/pull/5384) -
Fixes `cylc set-verbosity`.
Fix `cylc set-verbosity`.

[#5386](https://github.com/cylc/cylc-flow/pull/5386) Fix bug where
absence of `job name length maximum` in PBS platform settings would cause
Expand Down
11 changes: 11 additions & 0 deletions cylc/flow/graph_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from cylc.flow.task_id import TaskID
from cylc.flow.task_trigger import TaskTrigger
from cylc.flow.task_outputs import (
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_FAILED,
Expand Down Expand Up @@ -114,6 +115,8 @@ class GraphParser:
QUAL_FAM_SUBMIT_ANY = "submit-any"
QUAL_FAM_SUBMIT_FAIL_ALL = "submit-fail-all"
QUAL_FAM_SUBMIT_FAIL_ANY = "submit-fail-any"
QUAL_FAM_EXPIRE_ALL = "expire-all"
QUAL_FAM_EXPIRE_ANY = "expire-any"

# Map family trigger type to (member-trigger, any/all), for use in
# expanding family trigger expressions to member trigger expressions.
Expand All @@ -122,6 +125,8 @@ class GraphParser:
# E.g. QUAL_FAM_START_ALL: (TASK_OUTPUT_STARTED, True) simply maps
# "FAM:start-all" to "MEMBER:started" and "-all" (all members).
fam_to_mem_trigger_map: Dict[str, Tuple[str, bool]] = {
QUAL_FAM_EXPIRE_ALL: (TASK_OUTPUT_EXPIRED, True),
QUAL_FAM_EXPIRE_ANY: (TASK_OUTPUT_EXPIRED, False),
QUAL_FAM_START_ALL: (TASK_OUTPUT_STARTED, True),
QUAL_FAM_START_ANY: (TASK_OUTPUT_STARTED, False),
QUAL_FAM_SUCCEED_ALL: (TASK_OUTPUT_SUCCEEDED, True),
Expand All @@ -138,6 +143,8 @@ class GraphParser:

# Map family pseudo triggers to affected member outputs.
fam_to_mem_output_map: Dict[str, List[str]] = {
QUAL_FAM_EXPIRE_ANY: [TASK_OUTPUT_EXPIRED],
QUAL_FAM_EXPIRE_ALL: [TASK_OUTPUT_EXPIRED],
QUAL_FAM_START_ANY: [TASK_OUTPUT_STARTED],
QUAL_FAM_START_ALL: [TASK_OUTPUT_STARTED],
QUAL_FAM_SUCCEED_ANY: [TASK_OUTPUT_SUCCEEDED],
Expand Down Expand Up @@ -735,6 +742,10 @@ def _set_output_opt(
if suicide:
return

if output == TASK_OUTPUT_EXPIRED and not optional:
raise GraphParseError(
f"Expired-output {name}:{output} must be optional")

Comment on lines +745 to +748
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now covered by the optional outputs stuff.

Suggested change
if output == TASK_OUTPUT_EXPIRED and not optional:
raise GraphParseError(
f"Expired-output {name}:{output} must be optional")

if output == TASK_OUTPUT_FINISHED:
# Interpret :finish pseudo-output
if optional:
Expand Down
31 changes: 17 additions & 14 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,7 @@ def remove(self, itask, reason=""):
if not self.hidden_pool[itask.point]:
del self.hidden_pool[itask.point]
LOG.debug(f"[{itask}] {msg}")
self.task_queue_mgr.remove_task(itask)
return

try:
Expand All @@ -696,9 +697,9 @@ def remove(self, itask, reason=""):
self.main_pool_changed = True
if not self.main_pool[itask.point]:
del self.main_pool[itask.point]
self.task_queue_mgr.remove_task(itask)
if itask.tdef.max_future_prereq_offset is not None:
self.set_max_future_offset()
self.task_queue_mgr.remove_task(itask)
if itask.tdef.max_future_prereq_offset is not None:
self.set_max_future_offset()

# Notify the data-store manager of their removal
# (the manager uses window boundary tracking for pruning).
Expand Down Expand Up @@ -815,8 +816,8 @@ def release_queued_tasks(self):

for itask in released:
itask.state_reset(is_queued=False)
itask.waiting_on_job_prep = True
self.data_store_mgr.delta_task_queued(itask)
itask.waiting_on_job_prep = True

if cylc.flow.flags.cylc7_back_compat:
# Cylc 7 Back Compat: spawn downstream to cause Cylc 7 style
Expand Down Expand Up @@ -994,8 +995,7 @@ def can_stop(self, stop_mode):
and itask.state(*TASK_STATUSES_ACTIVE)
and not itask.state.kill_failed
)
# we don't need to check for preparing tasks because they will be
# reset to waiting on restart
# preparing tasks because they will be reset to waiting on restart
for itask in self.get_tasks()
)

Expand Down Expand Up @@ -1321,7 +1321,7 @@ def remove_if_complete(self, itask):
)
else:
# Remove as completed.
self.remove(itask, 'finished')
self.remove(itask, 'completed')
if itask.identity == self.stop_task_id:
self.stop_task_finished = True
if self.compute_runahead():
Expand Down Expand Up @@ -1701,21 +1701,24 @@ def _set_expired_task(self, itask):
or itask.tdef.expiration_offset is None
):
return False

if itask.expire_time is None:
itask.expire_time = (
itask.get_point_as_seconds() +
itask.get_offset_as_seconds(itask.tdef.expiration_offset))
if time() > itask.expire_time:
msg = 'Task expired (skipping job).'

if (
time() > itask.expire_time and
itask.state_reset(TASK_STATUS_EXPIRED)
):
msg = 'Task expired: will not submit job.'
LOG.warning(f"[{itask}] {msg}")
self.task_events_mgr.setup_event_handlers(itask, "expired", msg)
# TODO succeeded and expired states are useless due to immediate
# removal under all circumstances (unhandled failed is still used).
if itask.state_reset(TASK_STATUS_EXPIRED, is_held=False):
self.data_store_mgr.delta_task_state(itask)
self.data_store_mgr.delta_task_held(itask)
self.data_store_mgr.delta_task_state(itask)
self.spawn_on_output(itask, "expired")
self.remove(itask, 'expired')
return True

return False

def task_succeeded(self, id_):
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/hold-release/05-release.t
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
inherit = STOP
script = """
cylc__job__poll_grep_workflow_log -E \
'1/dog1 succeeded .* task proxy removed \(finished\)'
'1/dog1 succeeded .* task proxy removed \(completed\)'
cylc stop "${CYLC_WORKFLOW_ID}"
"""
__FLOW_CONFIG__
Expand Down
22 changes: 22 additions & 0 deletions tests/functional/triggering/21-expire.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
# Test expire triggering
. "$(dirname "$0")/test_header"
set_test_number 2
reftest
exit
19 changes: 19 additions & 0 deletions tests/functional/triggering/21-expire/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[scheduling]
initial cycle point = 1999
[[special tasks]]
clock-expire = foo1(PT0S), foo2(PT0S), bar1(PT0S), x(PT0S)
[[graph]]
# Expire: foo1, foo2, bar1, x
# Run: y, bar2, baz, qux
R1 = """
x:expire? => y
FOO:expire-all? => baz
BAR:expire-any? => qux
"""
[runtime]
[[FOO, BAR]]
[[foo1, foo2]]
inherit = FOO
[[bar1, bar2]]
inherit = BAR
[[x, y, baz, qux]]
4 changes: 4 additions & 0 deletions tests/functional/triggering/21-expire/reference.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
19990101T0000Z/bar2 -triggered off [] in flow 1
19990101T0000Z/baz -triggered off ['19990101T0000Z/foo1', '19990101T0000Z/foo2'] in flow 1
19990101T0000Z/qux -triggered off ['19990101T0000Z/bar1'] in flow 1
19990101T0000Z/y -triggered off ['19990101T0000Z/x'] in flow 1
8 changes: 8 additions & 0 deletions tests/unit/test_graph_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,10 @@ def test_family_optional_outputs(qual, task_output):
"FAM => foo", # bare family on LHS
"Illegal family trigger"
],
[
"FAM:expire-all => foo",
"must be optional"
],
]
)
def test_family_trigger_errors(graph, error):
Expand Down Expand Up @@ -804,6 +808,10 @@ def test_family_trigger_errors(graph, error):
"a:finish? => b",
"Pseudo-output a:finished can't be optional",
],
[
"a:expire => b",
"must be optional",
],
]
)
def test_task_optional_output_errors_order(
Expand Down