Skip to content

Commit d95a5a5

Browse files
committed
Implement "cylc set" command.
Long, messy dev history squashed flat like a bug.
1 parent aa100ef commit d95a5a5

File tree

173 files changed

+4281
-1697
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

173 files changed

+4281
-1697
lines changed

changes.d/5658.feat.md

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
New "cylc set" command for setting task prerequisites and outputs.

cylc/flow/data_store_mgr.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -2185,8 +2185,9 @@ def update_workflow(self, reloaded=False):
21852185
w_delta.n_edge_distance = self.n_edge_distance
21862186
delta_set = True
21872187

2188-
if self.schd.pool.main_pool:
2189-
pool_points = set(self.schd.pool.main_pool)
2188+
if self.schd.pool.active_tasks:
2189+
pool_points = set(self.schd.pool.active_tasks)
2190+
21902191
oldest_point = str(min(pool_points))
21912192
if w_data.oldest_active_cycle_point != oldest_point:
21922193
w_delta.oldest_active_cycle_point = oldest_point

cylc/flow/etc/cylc

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
44
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
5-
#
5+
#
66
# This program is free software: you can redistribute it and/or modify
77
# it under the terms of the GNU General Public License as published by
88
# the Free Software Foundation, either version 3 of the License, or

cylc/flow/flow_mgr.py

+138-24
Original file line numberDiff line numberDiff line change
@@ -20,50 +20,164 @@
2020
import datetime
2121

2222
from cylc.flow import LOG
23+
from cylc.flow.exceptions import InputError
2324

2425

2526
if TYPE_CHECKING:
2627
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
2728

28-
2929
FlowNums = Set[int]
3030
# Flow constants
3131
FLOW_ALL = "all"
3232
FLOW_NEW = "new"
3333
FLOW_NONE = "none"
3434

35+
# For flow-related CLI options:
36+
ERR_OPT_FLOW_VAL = "Flow values must be an integer, or 'all', 'new', or 'none'"
37+
ERR_OPT_FLOW_INT = "Multiple flow options must all be integer valued"
38+
ERR_OPT_FLOW_WAIT = (
39+
f"--wait is not compatible with --flow={FLOW_NEW} or --flow={FLOW_NONE}"
40+
)
41+
42+
43+
def add_flow_opts(parser):
44+
parser.add_option(
45+
"--flow", action="append", dest="flow", metavar="FLOW",
46+
help=f'Assign new tasks to all active flows ("{FLOW_ALL}");'
47+
f' no flow ("{FLOW_NONE}"); a new flow ("{FLOW_NEW}");'
48+
f' or a specific flow (e.g. "2"). The default is "{FLOW_ALL}".'
49+
' Specific flow numbers can be new or existing.'
50+
' Reuse the option to assign multiple flow numbers.'
51+
)
52+
53+
parser.add_option(
54+
"--meta", metavar="DESCRIPTION", action="store",
55+
dest="flow_descr", default=None,
56+
help=f"description of new flow (with --flow={FLOW_NEW})."
57+
)
58+
59+
parser.add_option(
60+
"--wait", action="store_true", default=False, dest="flow_wait",
61+
help="Wait for merge with current active flows before flowing on."
62+
" Note you can use 'cylc set --pre=all' to unset a flow-wait."
63+
)
64+
65+
66+
def validate_flow_opts(options):
67+
"""Check validity of flow-related CLI options."""
68+
if options.flow is None:
69+
# Default to all active flows
70+
options.flow = [FLOW_ALL]
71+
72+
for val in options.flow:
73+
val = val.strip()
74+
if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]:
75+
if len(options.flow) != 1:
76+
raise InputError(ERR_OPT_FLOW_INT)
77+
else:
78+
try:
79+
int(val)
80+
except ValueError:
81+
raise InputError(ERR_OPT_FLOW_VAL.format(val))
82+
83+
if options.flow_wait and options.flow[0] in [FLOW_NEW, FLOW_NONE]:
84+
raise InputError(ERR_OPT_FLOW_WAIT)
85+
86+
87+
def stringify_flow_nums(flow_nums: Set[int], full: bool = False) -> str:
88+
"""Return a string representation of a set of flow numbers
89+
90+
Return:
91+
- "none" for no flow
92+
- "" for the original flow (flows only matter if there are several)
93+
- otherwise e.g. "(flow=1,2,3)"
94+
95+
Examples:
96+
>>> stringify_flow_nums({})
97+
'(flows=none)'
98+
99+
>>> stringify_flow_nums({1})
100+
''
101+
102+
>>> stringify_flow_nums({1}, True)
103+
'(flows=1)'
104+
105+
>>> stringify_flow_nums({1,2,3})
106+
'(flows=1,2,3)'
107+
108+
"""
109+
if not full and flow_nums == {1}:
110+
return ""
111+
else:
112+
return (
113+
"(flows="
114+
f"{','.join(str(i) for i in flow_nums) or 'none'}"
115+
")"
116+
)
117+
35118

36119
class FlowMgr:
37120
"""Logic to manage flow counter and flow metadata."""
38121

39-
def __init__(self, db_mgr: "WorkflowDatabaseManager") -> None:
122+
def __init__(
123+
self,
124+
db_mgr: "WorkflowDatabaseManager",
125+
utc: bool = True
126+
) -> None:
40127
"""Initialise the flow manager."""
41128
self.db_mgr = db_mgr
42129
self.flows: Dict[int, Dict[str, str]] = {}
43130
self.counter: int = 0
131+
self._timezone = datetime.timezone.utc if utc else None
44132

45-
def get_new_flow(self, description: Optional[str] = None) -> int:
46-
"""Increment flow counter, record flow metadata."""
47-
self.counter += 1
48-
# record start time to nearest second
49-
now = datetime.datetime.now()
50-
now_sec: str = str(
51-
now - datetime.timedelta(microseconds=now.microsecond))
52-
description = description or "no description"
53-
self.flows[self.counter] = {
54-
"description": description,
55-
"start_time": now_sec
56-
}
57-
LOG.info(
58-
f"New flow: {self.counter} "
59-
f"({description}) "
60-
f"{now_sec}"
61-
)
62-
self.db_mgr.put_insert_workflow_flows(
63-
self.counter,
64-
self.flows[self.counter]
65-
)
66-
return self.counter
133+
def get_flow_num(
134+
self,
135+
flow_num: Optional[int] = None,
136+
meta: Optional[str] = None
137+
) -> int:
138+
"""Return a valid flow number, and record a new flow if necessary.
139+
140+
If asked for a new flow:
141+
- increment the automatic counter until we find an unused number
142+
143+
If given a flow number:
144+
- record a new flow if the number is unused
145+
- else return it, as an existing flow number.
146+
147+
The metadata string is only used if it is a new flow.
148+
149+
"""
150+
if flow_num is None:
151+
self.counter += 1
152+
while self.counter in self.flows:
153+
# Skip manually-created out-of-sequence flows.
154+
self.counter += 1
155+
flow_num = self.counter
156+
157+
if flow_num in self.flows:
158+
if meta is not None:
159+
LOG.warning(
160+
f'Ignoring flow metadata "{meta}":'
161+
f' {flow_num} is not a new flow'
162+
)
163+
else:
164+
# Record a new flow.
165+
now_sec = datetime.datetime.now(tz=self._timezone).isoformat(
166+
timespec="seconds"
167+
)
168+
meta = meta or "no description"
169+
self.flows[flow_num] = {
170+
"description": meta,
171+
"start_time": now_sec
172+
}
173+
LOG.info(
174+
f"New flow: {flow_num} ({meta}) {now_sec}"
175+
)
176+
self.db_mgr.put_insert_workflow_flows(
177+
flow_num,
178+
self.flows[flow_num]
179+
)
180+
return flow_num
67181

68182
def load_from_db(self, flow_nums: FlowNums) -> None:
69183
"""Load flow data for scheduler restart.

cylc/flow/graph_parser.py

+11
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from cylc.flow.task_id import TaskID
3434
from cylc.flow.task_trigger import TaskTrigger
3535
from cylc.flow.task_outputs import (
36+
TASK_OUTPUT_EXPIRED,
3637
TASK_OUTPUT_SUCCEEDED,
3738
TASK_OUTPUT_STARTED,
3839
TASK_OUTPUT_FAILED,
@@ -41,6 +42,8 @@
4142
TASK_OUTPUT_SUBMIT_FAILED
4243
)
4344
from cylc.flow.task_qualifiers import (
45+
QUAL_FAM_EXPIRE_ALL,
46+
QUAL_FAM_EXPIRE_ANY,
4447
QUAL_FAM_SUCCEED_ALL,
4548
QUAL_FAM_SUCCEED_ANY,
4649
QUAL_FAM_FAIL_ALL,
@@ -124,6 +127,8 @@ class GraphParser:
124127
# E.g. QUAL_FAM_START_ALL: (TASK_OUTPUT_STARTED, True) simply maps
125128
# "FAM:start-all" to "MEMBER:started" and "-all" (all members).
126129
fam_to_mem_trigger_map: Dict[str, Tuple[str, bool]] = {
130+
QUAL_FAM_EXPIRE_ALL: (TASK_OUTPUT_EXPIRED, True),
131+
QUAL_FAM_EXPIRE_ANY: (TASK_OUTPUT_EXPIRED, False),
127132
QUAL_FAM_START_ALL: (TASK_OUTPUT_STARTED, True),
128133
QUAL_FAM_START_ANY: (TASK_OUTPUT_STARTED, False),
129134
QUAL_FAM_SUCCEED_ALL: (TASK_OUTPUT_SUCCEEDED, True),
@@ -140,6 +145,8 @@ class GraphParser:
140145

141146
# Map family pseudo triggers to affected member outputs.
142147
fam_to_mem_output_map: Dict[str, List[str]] = {
148+
QUAL_FAM_EXPIRE_ANY: [TASK_OUTPUT_EXPIRED],
149+
QUAL_FAM_EXPIRE_ALL: [TASK_OUTPUT_EXPIRED],
143150
QUAL_FAM_START_ANY: [TASK_OUTPUT_STARTED],
144151
QUAL_FAM_START_ALL: [TASK_OUTPUT_STARTED],
145152
QUAL_FAM_SUCCEED_ANY: [TASK_OUTPUT_SUCCEEDED],
@@ -738,6 +745,10 @@ def _set_output_opt(
738745
if suicide:
739746
return
740747

748+
if output == TASK_OUTPUT_EXPIRED and not optional:
749+
raise GraphParseError(
750+
f"Expired-output {name}:{output} must be optional")
751+
741752
if output == TASK_OUTPUT_FINISHED:
742753
# Interpret :finish pseudo-output
743754
if optional:

cylc/flow/id_match.py

+41-46
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676

7777

7878
def filter_ids(
79-
pools: 'List[Pool]',
79+
pool: 'Pool',
8080
ids: 'Iterable[str]',
8181
*,
8282
warn: 'bool' = True,
@@ -145,28 +145,25 @@ def filter_ids(
145145
if tokens.get(lowest_token.value):
146146
break
147147

148-
# This needs to be a set to avoid getting two copies of matched tasks
149-
# in cycle points that appear in both pools:
150148
cycles = set()
151149
tasks = []
152150

153151
# filter by cycle
154152
if lowest_token == IDTokens.Cycle:
155153
cycle = tokens[IDTokens.Cycle.value]
156154
cycle_sel = tokens.get(IDTokens.Cycle.value + '_sel') or '*'
157-
for pool in pools:
158-
for icycle, itasks in pool.items():
159-
if not itasks:
160-
continue
161-
if not point_match(icycle, cycle, pattern_match):
162-
continue
163-
if cycle_sel == '*':
155+
for icycle, itasks in pool.items():
156+
if not itasks:
157+
continue
158+
if not point_match(icycle, cycle, pattern_match):
159+
continue
160+
if cycle_sel == '*':
161+
cycles.add(icycle)
162+
continue
163+
for itask in itasks.values():
164+
if match(itask.state.status, cycle_sel):
164165
cycles.add(icycle)
165-
continue
166-
for itask in itasks.values():
167-
if match(itask.state.status, cycle_sel):
168-
cycles.add(icycle)
169-
break
166+
break
170167

171168
# filter by task
172169
elif lowest_token == IDTokens.Task: # noqa SIM106
@@ -176,36 +173,35 @@ def filter_ids(
176173
task = tokens[IDTokens.Task.value]
177174
task_sel_raw = tokens.get(IDTokens.Task.value + '_sel')
178175
task_sel = task_sel_raw or '*'
179-
for pool in pools:
180-
for icycle, itasks in pool.items():
181-
if not point_match(icycle, cycle, pattern_match):
182-
continue
183-
for itask in itasks.values():
184-
if (
185-
# check cycle selector
176+
for icycle, itasks in pool.items():
177+
if not point_match(icycle, cycle, pattern_match):
178+
continue
179+
for itask in itasks.values():
180+
if (
181+
# check cycle selector
182+
(
186183
(
187-
(
188-
# disable cycle_sel if not defined if
189-
# pattern matching is turned off
190-
pattern_match is False
191-
and cycle_sel_raw is None
192-
)
193-
or match(itask.state.status, cycle_sel)
184+
# disable cycle_sel if not defined if
185+
# pattern matching is turned off
186+
pattern_match is False
187+
and cycle_sel_raw is None
194188
)
195-
# check namespace name
196-
and itask.name_match(task, match_func=match)
197-
# check task selector
198-
and (
199-
(
200-
# disable task_sel if not defined if
201-
# pattern matching is turned off
202-
pattern_match is False
203-
and task_sel_raw is None
204-
)
205-
or match(itask.state.status, task_sel)
189+
or match(itask.state.status, cycle_sel)
190+
)
191+
# check namespace name
192+
and itask.name_match(task, match_func=match)
193+
# check task selector
194+
and (
195+
(
196+
# disable task_sel if not defined if
197+
# pattern matching is turned off
198+
pattern_match is False
199+
and task_sel_raw is None
206200
)
207-
):
208-
tasks.append(itask)
201+
or match(itask.state.status, task_sel)
202+
)
203+
):
204+
tasks.append(itask)
209205

210206
else:
211207
raise NotImplementedError
@@ -226,10 +222,9 @@ def filter_ids(
226222
})
227223
ret = _cycles
228224
elif out == IDTokens.Task:
229-
for pool in pools:
230-
for icycle in _cycles:
231-
if icycle in pool:
232-
_tasks.extend(pool[icycle].values())
225+
for icycle in _cycles:
226+
if icycle in pool:
227+
_tasks.extend(pool[icycle].values())
233228
ret = _tasks
234229
return ret, _not_matched
235230

0 commit comments

Comments
 (0)