Skip to content

Commit 166fb53

Browse files
authored
Merge pull request #6583 from wxtim/fix.no_validation_of_outputs_from_graph_singletons
Fix.no validation of outputs from graph singletons
2 parents c252734 + d3f53d1 commit 166fb53

File tree

9 files changed

+108
-31
lines changed

9 files changed

+108
-31
lines changed

changes.d/6583.fix.md

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix bug where undefined outputs were missed by validation if no tasks trigger off of them.

cylc/flow/config.py

+29
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
get_trigger_completion_variable_maps,
100100
trigger_to_completion_variable,
101101
)
102+
from cylc.flow.task_qualifiers import TASK_QUALIFIERS
102103
from cylc.flow.run_modes import RunMode
103104
from cylc.flow.task_trigger import TaskTrigger, Dependency
104105
from cylc.flow.taskdef import TaskDef
@@ -1844,6 +1845,7 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
18441845

18451846
triggers = {}
18461847
xtrig_labels = set()
1848+
18471849
for left in left_nodes:
18481850
if left.startswith('@'):
18491851
xtrig_labels.add(left[1:])
@@ -2266,6 +2268,10 @@ def load_graph(self):
22662268
parser.workflow_state_polling_tasks)
22672269
self._proc_triggers(parser, seq, task_triggers)
22682270

2271+
# Checking for undefined outputs for terminal tasks. Tasks with
2272+
# dependencies are checked in generate_triggers:
2273+
self.check_terminal_outputs(parser.terminals)
2274+
22692275
self.set_required_outputs(task_output_opt)
22702276

22712277
# Detect use of xtrigger names with '@' prefix (creates a task).
@@ -2278,6 +2284,29 @@ def load_graph(self):
22782284
for tdef in self.taskdefs.values():
22792285
tdef.tweak_outputs()
22802286

2287+
def check_terminal_outputs(self, terminals: Iterable[str]) -> None:
2288+
"""Check that task outputs have been registered with tasks.
2289+
2290+
2291+
Where a "terminal output" is an output for a task at the end of a
2292+
graph string, such as "end" in `start => middle => end`.
2293+
2294+
Raises: WorkflowConfigError if a custom output is not defined.
2295+
"""
2296+
# BACK COMPAT: (On drop 3.7): Can be simplified with walrus :=
2297+
# if (b := a[1].strip("?")) not in TASK_QUALIFIERS
2298+
terminal_outputs = [
2299+
(a[0].strip("!"), a[1].strip("?"))
2300+
for a in (t.split(':') for t in terminals if ":" in t)
2301+
if (a[1].strip("?")) not in TASK_QUALIFIERS
2302+
]
2303+
2304+
for task, output in terminal_outputs:
2305+
if output not in self.cfg['runtime'][task]['outputs']:
2306+
raise WorkflowConfigError(
2307+
f"Undefined custom output: {task}:{output}"
2308+
)
2309+
22812310
def _proc_triggers(self, parser, seq, task_triggers):
22822311
"""Define graph edges, taskdefs, and triggers, from graph sections."""
22832312
suicides = 0

cylc/flow/graph_parser.py

+25-10
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ class GraphParser:
168168
_RE_OFFSET = r'\[[\w\-\+\^:]+\]'
169169
_RE_QUAL = QUALIFIER + r'[\w\-]+' # task or fam trigger
170170
_RE_OPT = r'\??' # optional output indicator
171+
_RE_ANDOR = re.compile(r'\s*[&|]\s*')
171172

172173
REC_QUAL = re.compile(_RE_QUAL)
173174

@@ -470,10 +471,23 @@ def parse_graph(self, graph_string: str) -> None:
470471
pairs.add((chain[i], chain[i + 1]))
471472

472473
# Get a set of RH nodes which are not at the LH of another pair:
473-
terminals = {p[1] for p in pairs}.difference({p[0] for p in pairs})
474+
# terminals = {p[1] for p in pairs}.difference({p[0] for p in pairs})
475+
476+
check_terminals: Dict[str, str] = {}
477+
lefts: Set[str] = set()
478+
rights: Set[str] = set()
474479

475480
for pair in sorted(pairs, key=lambda p: str(p[0])):
476-
self._proc_dep_pair(pair, terminals)
481+
self._proc_dep_pair(pair, check_terminals, lefts, rights)
482+
self.terminals = rights.difference(lefts)
483+
for right in self.terminals:
484+
left = check_terminals.get(right)
485+
if left:
486+
raise GraphParseError(
487+
'Invalid cycle point offsets only on right hand'
488+
' side of dependency (must be on left hand side):'
489+
f'{left} => {right}'
490+
)
477491

478492
@classmethod
479493
def _report_invalid_lines(cls, lines: List[str]) -> None:
@@ -504,7 +518,9 @@ def _report_invalid_lines(cls, lines: List[str]) -> None:
504518
def _proc_dep_pair(
505519
self,
506520
pair: Tuple[Optional[str], str],
507-
terminals: Set[str],
521+
check_terminals: Dict[str, str],
522+
_lefts: Set[str],
523+
_rights: Set[str],
508524
) -> None:
509525
"""Process a single dependency pair 'left => right'.
510526
@@ -540,12 +556,8 @@ def _proc_dep_pair(
540556
raise GraphParseError(mismatch_msg.format(right))
541557

542558
# Raise error for cycle point offsets at the end of chains
543-
if '[' in right and left and (right in terminals):
544-
# This right hand side is at the end of a chain:
545-
raise GraphParseError(
546-
'Invalid cycle point offsets only on right hand '
547-
'side of a dependency (must be on left hand side):'
548-
f' {left} => {right}')
559+
if '[' in right and left:
560+
check_terminals[right] = left
549561

550562
# Split right side on AND.
551563
rights = right.split(self.__class__.OP_AND)
@@ -566,12 +578,15 @@ def _proc_dep_pair(
566578
raise GraphParseError(
567579
f"Null task name in graph: {left} => {right}")
568580

581+
_rights.update(*([rights] or []))
582+
569583
for left in lefts:
570584
# Extract information about all nodes on the left.
571-
572585
if left:
573586
info = self.__class__.REC_NODES.findall(left)
574587
expr = left
588+
for _left in info:
589+
_lefts.add(''.join(_left))
575590

576591
else:
577592
# There is no left-hand-side task.

tests/integration/reftests/test_pre_initial.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ async def test_drop(flow, scheduler, reftest):
9494
}
9595

9696

97-
async def test_over_bracketed(flow, scheduler, reftest):
97+
async def test_over_bracketed(flow, scheduler, reftest, validate):
9898
"""Test nested conditional simplification for pre-initial cycling."""
9999
wid = flow({
100100
'scheduling': {
@@ -108,6 +108,7 @@ async def test_over_bracketed(flow, scheduler, reftest):
108108
},
109109
},
110110
})
111+
validate(wid)
111112
schd = scheduler(wid, paused_start=False)
112113

113114
assert await reftest(schd) == {

tests/integration/test_dbstatecheck.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ async def checker(
4545
},
4646
'runtime': {
4747
'bad': {'simulation': {'fail cycle points': '1000'}},
48-
'output': {'outputs': {'trigger': 'message'}}
48+
'output': {'outputs': {'trigger': 'message', 'custom_output': 'foo'}}
4949
}
5050
})
5151
schd: Scheduler = mod_scheduler(wid, paused_start=False)
@@ -119,13 +119,13 @@ def test_output(checker):
119119
'output',
120120
'10000101T0000Z',
121121
"{'submitted': 'submitted', 'started': 'started', 'succeeded': "
122-
"'succeeded', 'trigger': 'message'}",
122+
"'succeeded', 'trigger': 'message', 'custom_output': 'foo'}",
123123
],
124124
[
125125
'output',
126126
'10010101T0000Z',
127127
"{'submitted': 'submitted', 'started': 'started', 'succeeded': "
128-
"'succeeded', 'trigger': 'message'}",
128+
"'succeeded', 'trigger': 'message', 'custom_output': 'foo'}",
129129
],
130130
]
131131
assert result == expect

tests/integration/test_optional_outputs.py

+1-5
Original file line numberDiff line numberDiff line change
@@ -288,11 +288,7 @@ def implicit_completion_config(mod_flow, mod_validate):
288288
},
289289
'runtime': {
290290
'root': {
291-
'outputs': {
292-
'x': 'xxx',
293-
'y': 'yyy',
294-
'z': 'zzz',
295-
}
291+
'outputs': {x: f'{x * 3}' for x in 'abcdefghijklxyz'}
296292
}
297293
}
298294
})

tests/integration/validate/test_outputs.py

+1
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ def test_completion_expression_invalid(
276276
'outputs': {
277277
'x': 'xxx',
278278
'y': 'yyy',
279+
'file-1': 'asdf'
279280
},
280281
},
281282
},

tests/unit/test_config.py

+41-5
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616

1717
import os
18-
import sys
1918
from optparse import Values
2019
from typing import (
2120
TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Type)
2221
import pytest
2322
import logging
23+
from textwrap import dedent
2424
from types import SimpleNamespace
2525
from contextlib import suppress
2626

@@ -47,6 +47,10 @@
4747

4848
from cylc.flow.cycling.iso8601 import ISO8601Point
4949

50+
51+
param = pytest.param
52+
53+
5054
if TYPE_CHECKING:
5155
from pathlib import Path
5256
Fixture = Any
@@ -1175,13 +1179,16 @@ def WorkflowConfig__assert_err_raised():
11751179
WorkflowConfig__assert_err_raised()
11761180

11771181

1178-
def test_undefined_custom_output(tmp_flow_config: Callable):
1182+
@pytest.mark.parametrize(
1183+
'graph', (('foo:x => bar'), ('foo:x'))
1184+
)
1185+
def test_undefined_custom_output(graph: str, tmp_flow_config: Callable):
11791186
"""Test error on undefined custom output referenced in graph."""
11801187
id_ = 'custom_out1'
1181-
flow_file = tmp_flow_config(id_, """
1188+
flow_file = tmp_flow_config(id_, f"""
11821189
[scheduling]
11831190
[[graph]]
1184-
R1 = "foo:x => bar"
1191+
R1 = "{graph}"
11851192
[runtime]
11861193
[[foo, bar]]
11871194
""")
@@ -1700,7 +1707,6 @@ def test_cylc_env_at_parsing(
17001707

17011708
def test_force_workflow_compat_mode(tmp_path):
17021709
fpath = (tmp_path / 'flow.cylc')
1703-
from textwrap import dedent
17041710
fpath.write_text(dedent("""
17051711
[scheduler]
17061712
allow implicit tasks = true
@@ -1713,3 +1719,33 @@ def test_force_workflow_compat_mode(tmp_path):
17131719
WorkflowConfig('foo', str(fpath), {})
17141720
# It succeeds with compat mode:
17151721
WorkflowConfig('foo', str(fpath), {}, force_compat_mode=True)
1722+
1723+
1724+
@pytest.mark.parametrize(
1725+
'registered_outputs, tasks_and_outputs, fails',
1726+
(
1727+
param([], ['foo:x'], True, id='output-unregistered'),
1728+
param([], ['foo:x?'], True, id='optional-output-unregistered'),
1729+
param([], ['foo'], False, id='no-modifier-unregistered'),
1730+
param(['x'], ['foo:x'], False, id='output-registered'),
1731+
param([], ['foo:succeed'], False, id='alt-default-ok'),
1732+
param([], ['foo:failed'], False, id='default-ok'),
1733+
)
1734+
)
1735+
def test_check_outputs(tmp_path, registered_outputs, tasks_and_outputs, fails):
1736+
(tmp_path / 'flow.cylc').write_text(dedent("""
1737+
[scheduler]
1738+
allow implicit tasks = true
1739+
[scheduling]
1740+
[[graph]]
1741+
R1 = foo
1742+
"""))
1743+
cfg = WorkflowConfig('', tmp_path / 'flow.cylc', '')
1744+
cfg.cfg['runtime']['foo']['outputs'] = registered_outputs
1745+
if fails:
1746+
with pytest.raises(
1747+
WorkflowConfigError, match='Undefined custom output'
1748+
):
1749+
cfg.check_terminal_outputs(tasks_and_outputs)
1750+
else:
1751+
assert cfg.check_terminal_outputs(tasks_and_outputs) is None

tests/unit/test_graph_parser.py

+5-7
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
from cylc.flow.graph_parser import GraphParser
2828
from cylc.flow.task_outputs import (
2929
TASK_OUTPUT_SUBMITTED,
30-
TASK_OUTPUT_SUBMIT_FAILED,
3130
TASK_OUTPUT_STARTED,
3231
TASK_OUTPUT_SUCCEEDED,
3332
TASK_OUTPUT_FAILED
@@ -810,7 +809,6 @@ def test_cannot_be_required():
810809
gp.parse_graph('a:submit-failed => b')
811810

812811

813-
814812
@pytest.mark.parametrize(
815813
'graph, error',
816814
[
@@ -969,13 +967,13 @@ def test_RHS_AND(graph: str, expected_triggers: Dict[str, List[str]]):
969967
@pytest.mark.parametrize(
970968
'args, err',
971969
(
972-
# Error if offset in terminal RHS:
973-
param((('a', 'b[-P42M]'), {'b[-P42M]'}), 'Invalid cycle point offset'),
974970
# No error if offset in NON-terminal RHS:
975-
param((('a', 'b[-P42M]'), {}), None),
971+
param((('a', 'b[-P42M]'), {}, set(), set()), None),
976972
# Check the left hand side if this has a non-terminal RHS:
977-
param((('a &', 'b[-P42M]'), {}), 'Null task name in graph'),
978-
)
973+
param(
974+
(('a &', 'b[-P42M]'), {}, set(), set()), 'Null task name in graph'
975+
),
976+
),
979977
)
980978
def test_proc_dep_pair(args, err):
981979
"""

0 commit comments

Comments
 (0)