From bbe591e610f0787c61dd15554794de930b94da4b Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Thu, 9 Jan 2020 18:57:10 -0800 Subject: [PATCH 01/33] doesn't work --- README.md | 12 ++++ flytekit/clis/flyte_cli/main.py | 2 + flytekit/common/nodes.py | 101 +++++++++++++++++++++++++++++--- flytekit/common/workflow.py | 7 ++- 4 files changed, 111 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index f56985c4bd..78c30d64d9 100644 --- a/README.md +++ b/README.md @@ -81,3 +81,15 @@ source ~/.virtualenvs/flytekit/bin/activate python -m pytest tests/flytekit/unit shellcheck **/*.sh ``` + + + +flyte-cli -h localhost:30081 -p flytetester -d development -i list-workflow-versions -n cookbook.sample_workflows.formula_1.outer.StaticLaunchPlanCaller + +wf:flytetester:development:cookbook.sample_workflows.formula_1.outer.StaticLaunchPlanCaller:7be6342b4d5d95f5e31e6ad89636ad48925643ab + +from flytekit.clis.flyte_cli import main +from flytekit.common.workflow import SdkWorkflow +SdkWorkflow.fetch('flytetester', 'development', 'cookbook.sample_workflows.formula_1.outer.StaticLaunchPlanCaller', '7be6342b4d5d95f5e31e6ad89636ad48925643ab') + + diff --git a/flytekit/clis/flyte_cli/main.py b/flytekit/clis/flyte_cli/main.py index 6abce7c404..16b3d380f9 100644 --- a/flytekit/clis/flyte_cli/main.py +++ b/flytekit/clis/flyte_cli/main.py @@ -53,10 +53,12 @@ def _welcome_message(): def _get_user_filepath_home(): return _os.path.expanduser("~") + def _get_config_file_path(): home = _get_user_filepath_home() return _os.path.join(home, _default_config_file_path) + def _detect_default_config_file(): config_file = _get_config_file_path() if _get_user_filepath_home() and _os.path.exists(config_file): diff --git a/flytekit/common/nodes.py b/flytekit/common/nodes.py index 29a94e02e4..8bbdef3dc3 100644 --- a/flytekit/common/nodes.py +++ b/flytekit/common/nodes.py @@ -7,7 +7,8 @@ from flytekit.common import sdk_bases as _sdk_bases, promise as _promise from flytekit.common.exceptions import scopes as _exception_scopes, user as _user_exceptions from flytekit.common.mixins import hash as _hash_mixin, artifact as _artifact_mixin -from flytekit.common.tasks import executions as _task_executions +from flytekit.common.tasks import executions as _task_executions, task as _task +from flytekit.common import workflow as _workflow, launch_plan as _launch_plan from flytekit.common.types import helpers as _type_helpers from flytekit.common.utils import _dnsify from flytekit.engines import loader as _engine_loader @@ -110,17 +111,35 @@ def reference_id(self): """ return self._sdk_task.id + @property + def sdk_task(self): + """ + :rtype: flytekit.common.tasks.task.SdkTask + """ + return self._sdk_task + @classmethod def promote_from_model(cls, base_model): - # TODO: Hydrate using identifier and querying the engine - pass + """ + Takes the idl wrapper for a TaskNode and returns the hydrated Flytekit object for it by fetching it from the + engine. + + :param flytekit.models.core.workflow.TaskNode base_model: + :rtype: SdkTaskNode + """ + project = base_model.reference_id.project + domain = base_model.reference_id.domain + name = base_model.reference_id.name + version = base_model.reference_id.version + sdk_task = _task.SdkTask.fetch(project, domain, name, version) + return cls(sdk_task) class SdkWorkflowNode(_six.with_metaclass(_sdk_bases.ExtendedSdkType, _workflow_model.WorkflowNode)): def __init__(self, sdk_workflow=None, sdk_launch_plan=None): """ :param flytekit.common.workflow.SdkWorkflow sdk_workflow: - :param flytekit.common.launch_plan.SdkRunnableLaunchPlan sdk_launch_plan: + :param flytekit.common.launch_plan.SdkLaunchPlan sdk_launch_plan: """ self._sdk_workflow = sdk_workflow self._sdk_launch_plan = sdk_launch_plan @@ -142,10 +161,38 @@ def sub_workflow_ref(self): """ return self._sdk_workflow.id if self._sdk_workflow else None + @property + def sdk_launch_plan(self): + """ + :rtype: flytekit.common.launch_plan.SdkLaunchPlan + """ + return self._sdk_launch_plan + + @property + def sdk_workflow(self): + """ + :rtype: flytekit.common.workflow.SdkWorkflow + """ + return self._sdk_workflow + @classmethod def promote_from_model(cls, base_model): - # TODO: Hydrate using identifier and querying the engine - pass + """ + :param flytekit.models.core.workflow.WorkflowNode base_model: + :rtype: SdkWorkflowNode + """ + project = base_model.reference.project + domain = base_model.reference.domain + name = base_model.reference.name + version = base_model.reference.version + if base_model.launchplan_ref is not None: + sdk_launch_plan = _launch_plan.SdkLaunchPlan.fetch(project, domain, name, version) + return cls(sdk_launch_plan=sdk_launch_plan) + elif base_model.sub_workflow_ref is not None: + sdk_workflow = _workflow.SdkWorkflow.fetch(project, domain, name, version) + return cls(sdk_workflow=sdk_workflow) + else: + raise Exception("Bad workflow node model") class SdkNode(_six.with_metaclass(_sdk_bases.ExtendedSdkType, _hash_mixin.HashOnReferenceMixin, _workflow_model.Node)): @@ -173,6 +220,7 @@ def __init__( :param flytekit.common.tasks.task.SdkTask sdk_task: The task to execute in this node. :param flytekit.common.workflow.SdkWorkflow sdk_workflow: The workflow to execute in this node. + Question: does this really need to be the sdkrunnable launch plan? :param flytekit.common.launch_plan.SdkRunnableLaunchPlan sdk_launch_plan: The launch plan to execute in this node. :param TODO sdk_branch: TODO @@ -219,8 +267,44 @@ def promote_from_model(cls, model): :param flytekit.models.core.workflow.Node model: :rtype: SdkNode """ - raise _user_exceptions.FlyteAssertion("An SDK node cannot be instantiated merely from a data model object " - "because it must be contextualized within a workflow.") + id = model.id + sdk_task_node, sdk_workflow_node = None, None + if model.task_node is not None: + sdk_task_node = SdkTaskNode.promote_from_model(model.task_node) + elif model.workflow_node is not None: + sdk_workflow_node = SdkWorkflowNode.promote_from_model(model.workflow_node) + else: + raise Exception("bad Node model") + + if sdk_task_node is not None: + return cls( + id=id, + upstream_nodes=[], # set downstream, model doesn't contain this information + bindings=model.inputs, + metadata=model.metadata, + sdk_task=sdk_task_node.sdk_task, + ) + elif sdk_workflow_node is not None: + if sdk_workflow_node.sdk_workflow is not None: + return cls( + id=id, + upstream_nodes=[], # set downstream, model doesn't contain this information + bindings=model.inputs, + metadata=model.metadata, + sdk_workflow=sdk_workflow_node.sdk_workflow, + ) + elif sdk_workflow_node.sdk_launch_plan is not None: + return cls( + id=id, + upstream_nodes=[], # set downstream, model doesn't contain this information + bindings=model.inputs, + metadata=model.metadata, + sdk_launch_plan=sdk_workflow_node.sdk_launch_plan, + ) + else: + raise Exception("Bad SdkWorkflowNode model - both lp and workflow are None") + else: + raise Exception("Bad SdkNode model - both task and workflow nodes are empty") @property def upstream_nodes(self): @@ -254,6 +338,7 @@ def assign_id_and_return(self, id): "workflow already?".format(id, self) ) self._id = _dnsify(id) if id else None + self._metadata._name = id return self def with_overrides(self, *args, **kwargs): diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index 4dd120c5fe..03d899cce6 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -188,14 +188,15 @@ def promote_from_model(cls, base_model): ) for n in base_model.nodes } + node_map_2 = {n.id: _nodes.SdkNode.promote_from_model(n) for n in base_model.nodes} - for v in _six.itervalues(node_map): - v.upstream_nodes[:] = [node_map[k] for k in v.upstream_node_ids] + for v in _six.itervalues(node_map_2): + v.upstream_nodes[:] = [node_map_2[k] for k in v.upstream_node_ids] return cls( metadata=base_model.metadata, interface=_interface.TypedInterface.promote_from_model(base_model.interface), - nodes=list(node_map.values()), + nodes=list(node_map_2.values()), outputs=base_model.outputs, failure_node=None # TODO: Implement failure node ) From 370a019bd83102d8f372c25cbdd38a669bba5c8f Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 13 Jan 2020 12:49:30 -0800 Subject: [PATCH 02/33] wip --- flytekit/common/nodes.py | 10 ++++++++++ flytekit/common/workflow.py | 2 ++ 2 files changed, 12 insertions(+) diff --git a/flytekit/common/nodes.py b/flytekit/common/nodes.py index 8bbdef3dc3..1c4386dbfc 100644 --- a/flytekit/common/nodes.py +++ b/flytekit/common/nodes.py @@ -2,6 +2,7 @@ import abc as _abc import six as _six +import logging as _logging from sortedcontainers import SortedDict as _SortedDict from flytekit.common import sdk_bases as _sdk_bases, promise as _promise @@ -181,15 +182,19 @@ def promote_from_model(cls, base_model): :param flytekit.models.core.workflow.WorkflowNode base_model: :rtype: SdkWorkflowNode """ + project = base_model.reference.project domain = base_model.reference.domain name = base_model.reference.name version = base_model.reference.version + print('1. Project {} domain {} name {}'.format(project, domain, name)) if base_model.launchplan_ref is not None: sdk_launch_plan = _launch_plan.SdkLaunchPlan.fetch(project, domain, name, version) + print('fetched LP {}'.format(sdk_launch_plan)) return cls(sdk_launch_plan=sdk_launch_plan) elif base_model.sub_workflow_ref is not None: sdk_workflow = _workflow.SdkWorkflow.fetch(project, domain, name, version) + print('fetched wf {}'.format(sdk_workflow)) return cls(sdk_workflow=sdk_workflow) else: raise Exception("Bad workflow node model") @@ -268,6 +273,11 @@ def promote_from_model(cls, model): :rtype: SdkNode """ id = model.id + # This should never be called + if id == "start-node" or id == "end-node": + _logging.warning("Should not call promote from model on a start node or end node {}".format(model)) + return None + sdk_task_node, sdk_workflow_node = None, None if model.task_node is not None: sdk_task_node = SdkTaskNode.promote_from_model(model.task_node) diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index 03d899cce6..e63601c7ff 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -166,6 +166,7 @@ def fetch(cls, project, domain, name, version=None): version = version or _internal_config.VERSION.get() workflow_id = _identifier.Identifier(_identifier_model.ResourceType.WORKFLOW, project, domain, name, version) admin_workflow = _engine_loader.get_engine().fetch_workflow(workflow_id) + print('2. here {}'.format(admin_workflow.closure.compiled_workflow.primary.template)) sdk_workflow = cls.promote_from_model(admin_workflow.closure.compiled_workflow.primary.template) sdk_workflow._id = workflow_id return sdk_workflow @@ -188,6 +189,7 @@ def promote_from_model(cls, base_model): ) for n in base_model.nodes } + print('3. here {}'.format(node_map)) node_map_2 = {n.id: _nodes.SdkNode.promote_from_model(n) for n in base_model.nodes} for v in _six.itervalues(node_map_2): From ae3af6033a62426e7c8243992e031db432691bc0 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 29 Jan 2020 17:48:13 -0800 Subject: [PATCH 03/33] wip --- README.md | 2 +- flytekit/common/launch_plan.py | 8 +++++ flytekit/common/workflow.py | 60 +++++++++++++++++++++++----------- 3 files changed, 50 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 78c30d64d9..75236d5c9d 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,6 @@ wf:flytetester:development:cookbook.sample_workflows.formula_1.outer.StaticLaunc from flytekit.clis.flyte_cli import main from flytekit.common.workflow import SdkWorkflow -SdkWorkflow.fetch('flytetester', 'development', 'cookbook.sample_workflows.formula_1.outer.StaticLaunchPlanCaller', '7be6342b4d5d95f5e31e6ad89636ad48925643ab') +s = SdkWorkflow.fetch('flytetester', 'development', 'cookbook.sample_workflows.formula_1.outer.StaticLaunchPlanCaller', '7be6342b4d5d95f5e31e6ad89636ad48925643ab') diff --git a/flytekit/common/launch_plan.py b/flytekit/common/launch_plan.py index c69b724d09..14ac7e63e6 100644 --- a/flytekit/common/launch_plan.py +++ b/flytekit/common/launch_plan.py @@ -6,6 +6,7 @@ from flytekit.common.exceptions import scopes as _exception_scopes, user as _user_exceptions from flytekit.common.mixins import registerable as _registerable, hash as _hash_mixin, executable as _executable_mixin +from flytekit.common import workflow as _workflow from flytekit.common.types import helpers as _type_helpers from flytekit.configuration import sdk as _sdk_config, internal as _internal_config, auth as _auth_config from flytekit.engines import loader as _engine_loader @@ -29,6 +30,9 @@ def __init__(self, *args, **kwargs): super(SdkLaunchPlan, self).__init__(*args, **kwargs) self._id = None + # The interface is not set explicitly unless fetched in an engine context + self._interface = None + @classmethod def promote_from_model(cls, model): """ @@ -68,6 +72,10 @@ def fetch(cls, project, domain, name, version=None): lp = _engine_loader.get_engine().fetch_launch_plan(launch_plan_id) sdk_lp = cls.promote_from_model(lp.spec) sdk_lp._id = lp.id + + wf_id = sdk_lp.workflow_id + lp_wf = _workflow.SdkWorkflow.fetch(wf_id.project, wf_id.domain, wf_id.name, wf_id.version) + sdk_lp._interface = lp_wf.interface return sdk_lp @_exception_scopes.system_entry_point diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index e63601c7ff..ad1f9d8656 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -177,29 +177,51 @@ def promote_from_model(cls, base_model): :param flytekit.models.core.workflow.WorkflowTemplate base_model: :rtype: SdkWorkflow """ - node_map = { - n.id: _nodes.SdkNode( - n.id, - [], - n.inputs, - n.metadata, - sdk_task=None, - sdk_workflow=None, - sdk_branch=None # TODO: Hydrate these objects by reference from the engine. - ) - for n in base_model.nodes - } - print('3. here {}'.format(node_map)) - node_map_2 = {n.id: _nodes.SdkNode.promote_from_model(n) for n in base_model.nodes} - - for v in _six.itervalues(node_map_2): - v.upstream_nodes[:] = [node_map_2[k] for k in v.upstream_node_ids] - + # node_map = { + # n.id: _nodes.SdkNode( + # n.id, + # [], + # n.inputs, + # n.metadata, + # sdk_task=None, + # sdk_workflow=None, + # sdk_branch=None # TODO: Hydrate these objects by reference from the engine. + # ) + # for n in base_model.nodes + # } + print('======================= Base Model Nodes =========') + print(base_model.nodes) + print('======================= Promote from model =========') + node_map_2 = {} + for n in base_model.nodes: + if n.id == 'start-node': + print('Start node: {}'.format(n)) + print('Start node end ---') + elif n.id == "end-node": + print('End node: {}'.format(n)) + print('End node end ---') + else: + print('3. here {}'.format(n)) + node_map_2[n.id] = _nodes.SdkNode.promote_from_model(n) + print('ID: {}'.format(n.id)) + + # outputs list[flytekit.models.literals.Binding] + + print('4. here {}'.format(node_map_2)) + # raise Exception('fjdskafjdkls') + + # Set upstream nodes for each node + for n in base_model.nodes: + if n.id == 'start-node' or n.id == 'end-node': + continue + child_node = node_map_2[n.id] + child_node.upstream_nodes[:] = [node_map_2[upstream_id] for upstream_id in n.upstream_node_ids] + + # No inputs/outputs specified. return cls( metadata=base_model.metadata, interface=_interface.TypedInterface.promote_from_model(base_model.interface), nodes=list(node_map_2.values()), - outputs=base_model.outputs, failure_node=None # TODO: Implement failure node ) From 899dc0a75cf4d66003baeb0e62477a290b101e44 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 31 Jan 2020 13:34:54 -0800 Subject: [PATCH 04/33] return class --- flytekit/common/workflow.py | 71 ++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 32 deletions(-) diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index ad1f9d8656..2aa2596ed3 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -81,11 +81,22 @@ class SdkWorkflow( ) ): - def __init__(self, inputs, outputs, nodes): + def __init__(self, inputs, outputs, nodes, id=None, metadata=None, interface=None, output_bindings=None): """ :param list[flytekit.common.promise.Input] inputs: :param list[Output] outputs: :param list[flytekit.common.nodes.SdkNode] nodes: + :param flytekit.models.core.identifier.Identifier id: This is an autogenerated id by the system. The id is + globally unique across Flyte. + :param WorkflowMetadata metadata: This contains information on how to run the workflow. + :param flytekit.models.interface.TypedInterface interface: Defines a strongly typed interface for the + Workflow (inputs, outputs). This can include some optional parameters. + :param list[flytekit.models.literals.Binding] output_bindings: A list of output bindings that specify how to construct + workflow outputs. Bindings can pull node outputs or specify literals. All workflow outputs specified in + the interface field must be bound + in order for the workflow to be validated. A workflow has an implicit dependency on all of its nodes + to execute successfully in order to bind final outputs. + """ for n in nodes: for upstream in n.upstream_nodes: @@ -96,21 +107,30 @@ def __init__(self, inputs, outputs, nodes): "list, dict, or tuple which is stored as an attribute in the class." ) - super(SdkWorkflow, self).__init__( - id=_identifier.Identifier( + # Allow overrides if specified for all the arguments to the parent class constructor + id = id if id is not None else _identifier.Identifier( _identifier_model.ResourceType.WORKFLOW, _internal_config.PROJECT.get(), _internal_config.DOMAIN.get(), _uuid.uuid4().hex, _internal_config.VERSION.get() - ), - metadata=_workflow_models.WorkflowMetadata(), - interface=_interface.TypedInterface( + ) + metadata = metadata if metadata is not None else _workflow_models.WorkflowMetadata() + + interface = interface if interface is not None else _interface.TypedInterface( {v.name: v.var for v in inputs}, {v.name: v.var for v in outputs} - ), + ) + + output_bindings = output_bindings if output_bindings is not None else \ + [_literal_models.Binding(v.name, v.binding_data) for v in outputs] + + super(SdkWorkflow, self).__init__( + id=id, + metadata=metadata, + interface=interface, nodes=nodes, - outputs=[_literal_models.Binding(v.name, v.binding_data) for v in outputs], + outputs=output_bindings, ) self._user_inputs = inputs self._upstream_entities = set(n.executable_sdk_object for n in nodes) @@ -177,22 +197,10 @@ def promote_from_model(cls, base_model): :param flytekit.models.core.workflow.WorkflowTemplate base_model: :rtype: SdkWorkflow """ - # node_map = { - # n.id: _nodes.SdkNode( - # n.id, - # [], - # n.inputs, - # n.metadata, - # sdk_task=None, - # sdk_workflow=None, - # sdk_branch=None # TODO: Hydrate these objects by reference from the engine. - # ) - # for n in base_model.nodes - # } print('======================= Base Model Nodes =========') print(base_model.nodes) print('======================= Promote from model =========') - node_map_2 = {} + node_map = {} for n in base_model.nodes: if n.id == 'start-node': print('Start node: {}'.format(n)) @@ -202,27 +210,26 @@ def promote_from_model(cls, base_model): print('End node end ---') else: print('3. here {}'.format(n)) - node_map_2[n.id] = _nodes.SdkNode.promote_from_model(n) + node_map[n.id] = _nodes.SdkNode.promote_from_model(n) print('ID: {}'.format(n.id)) - # outputs list[flytekit.models.literals.Binding] - - print('4. here {}'.format(node_map_2)) - # raise Exception('fjdskafjdkls') - # Set upstream nodes for each node for n in base_model.nodes: if n.id == 'start-node' or n.id == 'end-node': continue - child_node = node_map_2[n.id] - child_node.upstream_nodes[:] = [node_map_2[upstream_id] for upstream_id in n.upstream_node_ids] + child_node = node_map[n.id] + child_node.upstream_nodes[:] = [node_map[upstream_id] for upstream_id in n.upstream_node_ids] + + print('4. Nodes for {} ==== {}'.format(base_model.id, node_map)) + # raise Exception('fjdskafjdkls') - # No inputs/outputs specified. + # No inputs/outputs specified, see the constructor for more information on the overrides. return cls( + inputs=None, outputs=None, nodes=list(node_map.values()), + id=_identifier.Identifier.promote_from_model(base_model.id), metadata=base_model.metadata, interface=_interface.TypedInterface.promote_from_model(base_model.interface), - nodes=list(node_map_2.values()), - failure_node=None # TODO: Implement failure node + output_bindings=base_model.outputs, ) @_exception_scopes.system_entry_point From 846b4eca69339b1223663183652b057260646a08 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Sat, 1 Feb 2020 11:30:47 -0800 Subject: [PATCH 05/33] wip --- flytekit/clis/sdk_in_container/register.py | 6 +++++ flytekit/common/launch_plan.py | 2 +- flytekit/common/nodes.py | 5 ++-- flytekit/common/workflow.py | 26 ++++++++++++++----- tests/flytekit/unit/configuration/conftest.py | 1 + 5 files changed, 30 insertions(+), 10 deletions(-) diff --git a/flytekit/clis/sdk_in_container/register.py b/flytekit/clis/sdk_in_container/register.py index 18326acc6c..5c87a5286d 100644 --- a/flytekit/clis/sdk_in_container/register.py +++ b/flytekit/clis/sdk_in_container/register.py @@ -18,11 +18,17 @@ def register_all(project, domain, pkgs, test, version): click.echo('Running task, workflow, and launch plan registration for {}, {}, {} with version {}'.format( project, domain, pkgs, version)) + # m = module (i.e. python file) + # k = value of dir(m), type str + # o = object (e.g. SdkWorkflow) for m, k, o in iterate_registerable_entities_in_order(pkgs): name = _utils.fqdn(m.__name__, k, entity_type=o.resource_type) if test: click.echo("Would register {:20} {}".format("{}:".format(o.entity_type_text), name)) + if name == 'cookbook.sample_workflows.formula_1.outer.StaticSubWorkflowCaller': + import ipdb; ipdb.set_trace() + else: click.echo("Registering {:20} {}".format("{}:".format(o.entity_type_text), name)) o.register(project, domain, name, version) diff --git a/flytekit/common/launch_plan.py b/flytekit/common/launch_plan.py index 14ac7e63e6..d5ef6f9081 100644 --- a/flytekit/common/launch_plan.py +++ b/flytekit/common/launch_plan.py @@ -364,7 +364,7 @@ def __call__(self, *args, **input_map): """ if len(args) > 0: raise _user_exceptions.FlyteAssertion( - "When adding a task as a node in a workflow, all inputs must be specified with kwargs only. We " + "When adding a launchplan as a node in a workflow, all inputs must be specified with kwargs only. We " "detected {} positional args.".format(self, len(args)) ) diff --git a/flytekit/common/nodes.py b/flytekit/common/nodes.py index 1c4386dbfc..dc21965a91 100644 --- a/flytekit/common/nodes.py +++ b/flytekit/common/nodes.py @@ -200,7 +200,7 @@ def promote_from_model(cls, base_model): raise Exception("Bad workflow node model") -class SdkNode(_six.with_metaclass(_sdk_bases.ExtendedSdkType, _hash_mixin.HashOnReferenceMixin, _workflow_model.Node)): +class SdkNode(_six.with_metaclass(_sdk_bases.ExtendedSdkType, _hash_mixin.HashOnReferenceMixin, _workflow_model.Node)): def __init__( self, @@ -225,8 +225,7 @@ def __init__( :param flytekit.common.tasks.task.SdkTask sdk_task: The task to execute in this node. :param flytekit.common.workflow.SdkWorkflow sdk_workflow: The workflow to execute in this node. - Question: does this really need to be the sdkrunnable launch plan? - :param flytekit.common.launch_plan.SdkRunnableLaunchPlan sdk_launch_plan: The launch plan to execute in this + :param flytekit.common.launch_plan.SdkLaunchPlan sdk_launch_plan: The launch plan to execute in this node. :param TODO sdk_branch: TODO """ diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index 2aa2596ed3..232713fb59 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -4,6 +4,7 @@ import six as _six from six.moves import queue as _queue +import datetime as _datetime from flytekit.common import interface as _interface, nodes as _nodes, sdk_bases as _sdk_bases, \ launch_plan as _launch_plan, promise as _promise @@ -221,8 +222,6 @@ def promote_from_model(cls, base_model): child_node.upstream_nodes[:] = [node_map[upstream_id] for upstream_id in n.upstream_node_ids] print('4. Nodes for {} ==== {}'.format(base_model.id, node_map)) - # raise Exception('fjdskafjdkls') - # No inputs/outputs specified, see the constructor for more information on the overrides. return cls( inputs=None, outputs=None, nodes=list(node_map.values()), @@ -318,10 +317,25 @@ class provided should be a subclass of flytekit.common.launch_plan.SdkLaunchPlan ) @_exception_scopes.system_entry_point - def __call__(self, *args, **kwargs): - # TODO: Create a workflow node - raise NotImplementedError("Embedding a workflow as a node is not supported currently. Please use launch " - "plans.") + def __call__(self, *args, **input_map): + if len(args) > 0: + raise _user_exceptions.FlyteAssertion( + "When adding a workflow as a node in a workflow, all inputs must be specified with kwargs only. We " + "detected {} positional args.".format(len(args)) + ) + + bindings, upstream_nodes = self.interface.create_bindings_for_inputs(input_map) + + node = _nodes.SdkNode( + id=None, + metadata=_workflow_models.NodeMetadata("placeholder", _datetime.timedelta(), _literal_models.RetryStrategy(0)), + upstream_nodes=upstream_nodes, + bindings=sorted(bindings, key=lambda b: b.var), + sdk_workflow=self + ) + import ipdb; + # ipdb.set_trace() + return node def _assign_indexed_attribute_name(attribute_name, index): diff --git a/tests/flytekit/unit/configuration/conftest.py b/tests/flytekit/unit/configuration/conftest.py index 446fb12fac..ba1fe76457 100644 --- a/tests/flytekit/unit/configuration/conftest.py +++ b/tests/flytekit/unit/configuration/conftest.py @@ -3,6 +3,7 @@ import pytest as _pytest import os as _os + @_pytest.fixture(scope="function", autouse=True) def clear_configs(): _set_config(None) From e06e05d328264ec63db9fa56068389344b3ed9d3 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 3 Feb 2020 17:34:58 -0800 Subject: [PATCH 06/33] wip --- README.md | 1 + flytekit/clis/sdk_in_container/register.py | 7 ++++++- flytekit/common/nodes.py | 3 +++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 75236d5c9d..f403d57f0a 100644 --- a/README.md +++ b/README.md @@ -92,4 +92,5 @@ from flytekit.clis.flyte_cli import main from flytekit.common.workflow import SdkWorkflow s = SdkWorkflow.fetch('flytetester', 'development', 'cookbook.sample_workflows.formula_1.outer.StaticLaunchPlanCaller', '7be6342b4d5d95f5e31e6ad89636ad48925643ab') +FLYTE_INTERNAL_IMAGE=docker.io/lyft/flyteexamples:c76320408f58d7e0fe12147bc398b7f027bf0a77 FLYTE_PLATFORM_URL=localhost:30081 pyflyte -c /root/local.config -p flytetester -d staging register workflows diff --git a/flytekit/clis/sdk_in_container/register.py b/flytekit/clis/sdk_in_container/register.py index 5c87a5286d..ec132c9e0f 100644 --- a/flytekit/clis/sdk_in_container/register.py +++ b/flytekit/clis/sdk_in_container/register.py @@ -27,10 +27,15 @@ def register_all(project, domain, pkgs, test, version): if test: click.echo("Would register {:20} {}".format("{}:".format(o.entity_type_text), name)) if name == 'cookbook.sample_workflows.formula_1.outer.StaticSubWorkflowCaller': - import ipdb; ipdb.set_trace() + # import ipdb; ipdb.set_trace() + pass else: click.echo("Registering {:20} {}".format("{}:".format(o.entity_type_text), name)) + if name == 'cookbook.sample_workflows.formula_1.outer.StaticSubWorkflowCaller': + # import ipdb; ipdb.set_trace() + pass + o.register(project, domain, name, version) diff --git a/flytekit/common/nodes.py b/flytekit/common/nodes.py index dc21965a91..14bfa92034 100644 --- a/flytekit/common/nodes.py +++ b/flytekit/common/nodes.py @@ -346,6 +346,9 @@ def assign_id_and_return(self, id): "Error assigning ID: {} because {} is already assigned. Has this node been assigned to another " "workflow already?".format(id, self) ) + print("Assigning ID to node {}".format(id)) + # if id == "identity_wf_execution": + # import ipdb; ipdb.set_trace() self._id = _dnsify(id) if id else None self._metadata._name = id return self From 4df8ef4b0697c3ae4e0243a7624fb2936be616da Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 4 Feb 2020 16:10:05 -0800 Subject: [PATCH 07/33] wip --- flytekit/clis/sdk_in_container/register.py | 4 ++-- flytekit/common/workflow.py | 18 ++++++++++++++++++ flytekit/engines/flyte/engine.py | 10 +++++++++- 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/flytekit/clis/sdk_in_container/register.py b/flytekit/clis/sdk_in_container/register.py index ec132c9e0f..434db78225 100644 --- a/flytekit/clis/sdk_in_container/register.py +++ b/flytekit/clis/sdk_in_container/register.py @@ -27,13 +27,13 @@ def register_all(project, domain, pkgs, test, version): if test: click.echo("Would register {:20} {}".format("{}:".format(o.entity_type_text), name)) if name == 'cookbook.sample_workflows.formula_1.outer.StaticSubWorkflowCaller': - # import ipdb; ipdb.set_trace() + import ipdb; ipdb.set_trace() pass else: click.echo("Registering {:20} {}".format("{}:".format(o.entity_type_text), name)) if name == 'cookbook.sample_workflows.formula_1.outer.StaticSubWorkflowCaller': - # import ipdb; ipdb.set_trace() + import ipdb; ipdb.set_trace() pass o.register(project, domain, name, version) diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index 232713fb59..6125d334a1 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -173,6 +173,24 @@ def user_inputs(self): """ return self._user_inputs + def get_sub_workflows(self): + """ + what should the return type of this be? + :rtype: list[] + """ + result = [] + for n in self.nodes: + if n.workflow_node is not None and n.workflow_node.sub_workflow_ref is not None: + if n.executable_sdk_object is not None and n.executable_sdk_object.entity_type_text == 'Workflow': + import ipdb; ipdb.set_trace() + result.append(n.executable_sdk_object) + result.extend(n.executable_sdk_object.get_sub_workflows()) + else: + print("workflow node with subworkflow found but bad executable object {}".format) + # Ignore other node types (branch, task) + + return result + @classmethod @_exception_scopes.system_entry_point def fetch(cls, project, domain, name, version=None): diff --git a/flytekit/engines/flyte/engine.py b/flytekit/engines/flyte/engine.py index dbccec1b1f..386e2ddda5 100644 --- a/flytekit/engines/flyte/engine.py +++ b/flytekit/engines/flyte/engine.py @@ -232,9 +232,17 @@ class FlyteWorkflow(_common_engine.BaseWorkflowExecutor): def register(self, identifier): client = _FlyteClientManager(_platform_config.URL.get(), insecure=_platform_config.INSECURE.get()).client try: + # import ipdb; ipdb.set_trace() + sub_workflows = self.sdk_workflow.get_sub_workflows() + print("--------------- Main Workflows ---------------") + print(self.sdk_workflow) + print("--------------- Subworkflows ---------------") + print(sub_workflows) return client.create_workflow( identifier, - _workflow_model.WorkflowSpec(self.sdk_workflow) + _workflow_model.WorkflowSpec( + self.sdk_workflow, + ) ) except _user_exceptions.FlyteEntityAlreadyExistsException: pass From 7309717060a5bb56e41c2ad45bef1f0f3e72eed2 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 5 Feb 2020 16:07:52 -0800 Subject: [PATCH 08/33] use real idl release, update WorkflowSpec model --- flytekit/engines/flyte/engine.py | 2 +- flytekit/models/admin/workflow.py | 19 +++++++++++++++---- setup.py | 2 +- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/flytekit/engines/flyte/engine.py b/flytekit/engines/flyte/engine.py index 386e2ddda5..5f993cb8b2 100644 --- a/flytekit/engines/flyte/engine.py +++ b/flytekit/engines/flyte/engine.py @@ -25,7 +25,6 @@ class _FlyteClientManager(object): - _CLIENT = None def __init__(self, *args, **kwargs): @@ -242,6 +241,7 @@ def register(self, identifier): identifier, _workflow_model.WorkflowSpec( self.sdk_workflow, + sub_workflows, ) ) except _user_exceptions.FlyteEntityAlreadyExistsException: diff --git a/flytekit/models/admin/workflow.py b/flytekit/models/admin/workflow.py index 91e82252b4..8d647f4779 100644 --- a/flytekit/models/admin/workflow.py +++ b/flytekit/models/admin/workflow.py @@ -1,17 +1,19 @@ from __future__ import absolute_import from flytekit.models import common as _common -from flytekit.models.core import compiler as _compiler_models, identifier as _identifier +from flytekit.models.core import compiler as _compiler_models, identifier as _identifier, workflow as _core_workflow from flyteidl.admin import workflow_pb2 as _admin_workflow class WorkflowSpec(_common.FlyteIdlEntity): - def __init__(self, template): + def __init__(self, template, sub_workflows): """ This object fully encapsulates the specification of a workflow :param flytekit.models.core.workflow.WorkflowTemplate template: + :param list[flytekit.models.core.workflow.WorkflowTemplate] sub_workflows: """ self._template = template + self._sub_workflows = sub_workflows @property def template(self): @@ -20,12 +22,20 @@ def template(self): """ return self._template + @property + def sub_workflows(self): + """ + :rtype: list[flytekit.models.core.workflow.WorkflowTemplate.WorkflowTemplate] + """ + return self._sub_workflows + def to_flyte_idl(self): """ :rtype: flyteidl.admin.workflow_pb2.WorkflowSpec """ return _admin_workflow.WorkflowSpec( - template=self._template.to_flyte_idl() + template=self._template.to_flyte_idl(), + sub_workflows=[s.to_flyte_idl() for s in self._sub_workflows], ) @classmethod @@ -34,7 +44,8 @@ def from_flyte_idl(cls, pb2_object): :param pb2_object: flyteidl.admin.workflow_pb2.WorkflowSpec :rtype: WorkflowSpec """ - return cls(WorkflowSpec.from_flyte_idl(pb2_object.template)) + return cls(_core_workflow.WorkflowTemplate.from_flyte_idl(pb2_object.template), + [_core_workflow.WorkflowTemplate.from_flyte_idl(s) for s in pb2_object.sub_workflows]) class Workflow(_common.FlyteIdlEntity): diff --git a/setup.py b/setup.py index d7ea1896c3..640b2bc589 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ ] }, install_requires=[ - "flyteidl>=0.16.0,<1.0.0", + "flyteidl>=0.17.2,<1.0.0", "click>=6.6,<8.0", "croniter>=0.3.20,<4.0.0", "deprecation>=2.0,<3.0", From b236bbf4665382f2bea8d9be3cf780058e430409 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Thu, 13 Feb 2020 15:08:44 -0800 Subject: [PATCH 09/33] notes --- tests/flytekit/unit/common_tests/test_workflow.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/flytekit/unit/common_tests/test_workflow.py b/tests/flytekit/unit/common_tests/test_workflow.py index 14c5fb56ca..c3a0342690 100644 --- a/tests/flytekit/unit/common_tests/test_workflow.py +++ b/tests/flytekit/unit/common_tests/test_workflow.py @@ -268,3 +268,11 @@ def my_list_task(wf_params, a, b): assert n.outputs['nested_out'].var == 'nested_out' assert n.outputs['nested_out'].node_id == 'node-id' """ + +# Things to test + +# SdkWorkflow.promote_from_model for both basic workflows and sub workflows and launchplan nodes +# Launchplans now have an interface representing the underlying workflow when fetched from an engine context +# SdkNode promote from model can handle launch plan nodes +# Workflows with two layers of subworkflows return them correctly. +# Call on a workflow produces a correct SdkNode From 6c2e95b6911829b14606a03a3a984693287fa502 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Sat, 15 Feb 2020 15:22:21 -0800 Subject: [PATCH 10/33] uncommenting existing test --- flytekit/common/workflow.py | 9 ++++++++- tests/flytekit/unit/common_tests/test_workflow.py | 14 ++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index 6125d334a1..a80f913f37 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -342,7 +342,14 @@ def __call__(self, *args, **input_map): "detected {} positional args.".format(len(args)) ) - bindings, upstream_nodes = self.interface.create_bindings_for_inputs(input_map) + # Take the default values from the Inputs + compiled_inputs = { + v.name: v.sdk_default + for v in self.user_inputs if not v.sdk_required + } + compiled_inputs.update(input_map) + + bindings, upstream_nodes = self.interface.create_bindings_for_inputs(compiled_inputs) node = _nodes.SdkNode( id=None, diff --git a/tests/flytekit/unit/common_tests/test_workflow.py b/tests/flytekit/unit/common_tests/test_workflow.py index c3a0342690..14d4260c63 100644 --- a/tests/flytekit/unit/common_tests/test_workflow.py +++ b/tests/flytekit/unit/common_tests/test_workflow.py @@ -1,9 +1,10 @@ from __future__ import absolute_import from flytekit.common import workflow, constants, promise -from flytekit.common.types import primitives +from flytekit.common.types import primitives, containers from flytekit.models.core import workflow as _workflow_models, identifier as _identifier from flytekit.sdk.tasks import python_task, inputs, outputs +from flytekit.common.exceptions import user as _user_exceptions import pytest as _pytest @@ -217,11 +218,6 @@ def my_list_task(wf_params, a, b): w = workflow.SdkWorkflow(inputs=input_list, outputs=wf_out, nodes=nodes) - with _pytest.raises(NotImplementedError): - w() - - # TODO: Uncomment when sub-workflows are supported. - """ # Test that required input isn't set with _pytest.raises(_user_exceptions.FlyteAssertion): w() @@ -252,13 +248,13 @@ def my_list_task(wf_params, a, b): assert n.inputs[1].var == 'required' assert n.inputs[1].binding.scalar.primitive.integer == 10 - # Test that launch plan ID ref is flexible + # Test that workflow is saved in the node w._id = 'fake' assert n.workflow_node.sub_workflow_ref == 'fake' w._id = None # Test that outputs are promised - n.assign_id_and_return('node-id') + n.assign_id_and_return('node-id*') # dns'ified assert n.outputs['scalar_out'].sdk_type.to_flyte_literal_type() == primitives.Integer.to_flyte_literal_type() assert n.outputs['scalar_out'].var == 'scalar_out' assert n.outputs['scalar_out'].node_id == 'node-id' @@ -267,7 +263,6 @@ def my_list_task(wf_params, a, b): containers.List(containers.List(primitives.Integer)).to_flyte_literal_type() assert n.outputs['nested_out'].var == 'nested_out' assert n.outputs['nested_out'].node_id == 'node-id' - """ # Things to test @@ -275,4 +270,3 @@ def my_list_task(wf_params, a, b): # Launchplans now have an interface representing the underlying workflow when fetched from an engine context # SdkNode promote from model can handle launch plan nodes # Workflows with two layers of subworkflows return them correctly. -# Call on a workflow produces a correct SdkNode From bb146ac216242dcb77f73b4e3806647e060e876e Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 18 Feb 2020 12:40:54 -0800 Subject: [PATCH 11/33] removing some prints, adding a couple tests --- flytekit/common/launch_plan.py | 1 + flytekit/common/nodes.py | 7 ++ flytekit/common/tasks/task.py | 4 + flytekit/common/workflow.py | 15 +-- .../resources/OneTaskWFForPromote.pb | Bin 0 -> 546 bytes .../unit/common_tests/test_helpers.py | 53 +++++++++ .../unit/common_tests/test_workflow.py | 11 +- .../common_tests/test_workflow_promote.py | 112 ++++++++++++++++++ .../unit/models/test_workflow_closure.py | 2 +- 9 files changed, 184 insertions(+), 21 deletions(-) create mode 100644 tests/flytekit/unit/common_tests/resources/OneTaskWFForPromote.pb create mode 100644 tests/flytekit/unit/common_tests/test_helpers.py create mode 100644 tests/flytekit/unit/common_tests/test_workflow_promote.py diff --git a/flytekit/common/launch_plan.py b/flytekit/common/launch_plan.py index d5ef6f9081..7f22e6f4d1 100644 --- a/flytekit/common/launch_plan.py +++ b/flytekit/common/launch_plan.py @@ -73,6 +73,7 @@ def fetch(cls, project, domain, name, version=None): sdk_lp = cls.promote_from_model(lp.spec) sdk_lp._id = lp.id + # TODO: Add a test for this, and this function as a whole wf_id = sdk_lp.workflow_id lp_wf = _workflow.SdkWorkflow.fetch(wf_id.project, wf_id.domain, wf_id.name, wf_id.version) sdk_lp._interface = lp_wf.interface diff --git a/flytekit/common/nodes.py b/flytekit/common/nodes.py index 14bfa92034..0f405a02d4 100644 --- a/flytekit/common/nodes.py +++ b/flytekit/common/nodes.py @@ -12,6 +12,7 @@ from flytekit.common import workflow as _workflow, launch_plan as _launch_plan from flytekit.common.types import helpers as _type_helpers from flytekit.common.utils import _dnsify +from flytekit.common import constants as _constants from flytekit.engines import loader as _engine_loader from flytekit.models import common as _common_models, node_execution as _node_execution_models from flytekit.models.core import workflow as _workflow_model, execution as _execution_models @@ -285,6 +286,12 @@ def promote_from_model(cls, model): else: raise Exception("bad Node model") + # When WorkflowTemplate models (containing node models) are returned by Admin, they've been compiled with a + # start node. In order to make the promoted SdkWorkflow look the same, we strip the start-node text back out. + for input in model.inputs: + if input.binding.promise is not None and input.binding.promise.node_id == "start-node": + input.binding.promise._node_id = _constants.GLOBAL_INPUT_NODE_ID + if sdk_task_node is not None: return cls( id=id, diff --git a/flytekit/common/tasks/task.py b/flytekit/common/tasks/task.py index 7140cf7f83..2c5ef14c38 100644 --- a/flytekit/common/tasks/task.py +++ b/flytekit/common/tasks/task.py @@ -91,6 +91,10 @@ def promote_from_model(cls, base_model): custom=base_model.custom, container=base_model.container ) + # Override the newly generated name if one exists in the base model + if base_model.id: + t._id = _identifier.Identifier.promote_from_model(base_model.id) + return t def assign_custom_and_return(self, custom): diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index a80f913f37..a3d64baf85 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -216,21 +216,14 @@ def promote_from_model(cls, base_model): :param flytekit.models.core.workflow.WorkflowTemplate base_model: :rtype: SdkWorkflow """ - print('======================= Base Model Nodes =========') - print(base_model.nodes) - print('======================= Promote from model =========') node_map = {} for n in base_model.nodes: - if n.id == 'start-node': - print('Start node: {}'.format(n)) - print('Start node end ---') - elif n.id == "end-node": - print('End node: {}'.format(n)) - print('End node end ---') + if n.id == 'start-node' or n.id == "end-node": + # The workflow compilation process done by Admin/Propeller will add a fake start-node to the graph + # so we need to strip them back out here. + continue else: - print('3. here {}'.format(n)) node_map[n.id] = _nodes.SdkNode.promote_from_model(n) - print('ID: {}'.format(n.id)) # Set upstream nodes for each node for n in base_model.nodes: diff --git a/tests/flytekit/unit/common_tests/resources/OneTaskWFForPromote.pb b/tests/flytekit/unit/common_tests/resources/OneTaskWFForPromote.pb new file mode 100644 index 0000000000000000000000000000000000000000..dfbd040d07504697b589f39f3455b9a9519813a5 GIT binary patch literal 546 zcmb7=L2H9R5QR15AY%{h21;4zWe+6?3Du~WTTZz)heCUkRYxe%-DTZW^27T7su4;p z7TVkH$D5h=9tdYf+_E|wG}su_)GhEGE2(M}hCOhl)~(dXvXX6cd99xWKJ(`IvfoR6 z(6W{Wy=4+?{Y|mKhz9{rk|6OPqmsuw45KLKei&x)*6#?h=ne>6gVUDjn@}f1<}h(hhV44kh4D$gp0?-()%}U4u$(&i9#a_WdQG6qmjrT3+4P4ddmFc_>GCLb< zax$UG*@yjOj`aVu-;?$R`t$T2Y) Date: Tue, 18 Feb 2020 12:47:24 -0800 Subject: [PATCH 12/33] removing more prints --- flytekit/clis/sdk_in_container/register.py | 8 -------- flytekit/common/nodes.py | 6 ------ flytekit/common/workflow.py | 5 +---- 3 files changed, 1 insertion(+), 18 deletions(-) diff --git a/flytekit/clis/sdk_in_container/register.py b/flytekit/clis/sdk_in_container/register.py index 434db78225..4762fc7f40 100644 --- a/flytekit/clis/sdk_in_container/register.py +++ b/flytekit/clis/sdk_in_container/register.py @@ -26,16 +26,8 @@ def register_all(project, domain, pkgs, test, version): if test: click.echo("Would register {:20} {}".format("{}:".format(o.entity_type_text), name)) - if name == 'cookbook.sample_workflows.formula_1.outer.StaticSubWorkflowCaller': - import ipdb; ipdb.set_trace() - pass - else: click.echo("Registering {:20} {}".format("{}:".format(o.entity_type_text), name)) - if name == 'cookbook.sample_workflows.formula_1.outer.StaticSubWorkflowCaller': - import ipdb; ipdb.set_trace() - pass - o.register(project, domain, name, version) diff --git a/flytekit/common/nodes.py b/flytekit/common/nodes.py index 0f405a02d4..5b97b94f2f 100644 --- a/flytekit/common/nodes.py +++ b/flytekit/common/nodes.py @@ -188,14 +188,11 @@ def promote_from_model(cls, base_model): domain = base_model.reference.domain name = base_model.reference.name version = base_model.reference.version - print('1. Project {} domain {} name {}'.format(project, domain, name)) if base_model.launchplan_ref is not None: sdk_launch_plan = _launch_plan.SdkLaunchPlan.fetch(project, domain, name, version) - print('fetched LP {}'.format(sdk_launch_plan)) return cls(sdk_launch_plan=sdk_launch_plan) elif base_model.sub_workflow_ref is not None: sdk_workflow = _workflow.SdkWorkflow.fetch(project, domain, name, version) - print('fetched wf {}'.format(sdk_workflow)) return cls(sdk_workflow=sdk_workflow) else: raise Exception("Bad workflow node model") @@ -353,9 +350,6 @@ def assign_id_and_return(self, id): "Error assigning ID: {} because {} is already assigned. Has this node been assigned to another " "workflow already?".format(id, self) ) - print("Assigning ID to node {}".format(id)) - # if id == "identity_wf_execution": - # import ipdb; ipdb.set_trace() self._id = _dnsify(id) if id else None self._metadata._name = id return self diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index a3d64baf85..b2bcb94f75 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -182,11 +182,10 @@ def get_sub_workflows(self): for n in self.nodes: if n.workflow_node is not None and n.workflow_node.sub_workflow_ref is not None: if n.executable_sdk_object is not None and n.executable_sdk_object.entity_type_text == 'Workflow': - import ipdb; ipdb.set_trace() result.append(n.executable_sdk_object) result.extend(n.executable_sdk_object.get_sub_workflows()) else: - print("workflow node with subworkflow found but bad executable object {}".format) + raise Exception("workflow node with subworkflow found but bad executable object {}".format) # Ignore other node types (branch, task) return result @@ -205,7 +204,6 @@ def fetch(cls, project, domain, name, version=None): version = version or _internal_config.VERSION.get() workflow_id = _identifier.Identifier(_identifier_model.ResourceType.WORKFLOW, project, domain, name, version) admin_workflow = _engine_loader.get_engine().fetch_workflow(workflow_id) - print('2. here {}'.format(admin_workflow.closure.compiled_workflow.primary.template)) sdk_workflow = cls.promote_from_model(admin_workflow.closure.compiled_workflow.primary.template) sdk_workflow._id = workflow_id return sdk_workflow @@ -232,7 +230,6 @@ def promote_from_model(cls, base_model): child_node = node_map[n.id] child_node.upstream_nodes[:] = [node_map[upstream_id] for upstream_id in n.upstream_node_ids] - print('4. Nodes for {} ==== {}'.format(base_model.id, node_map)) # No inputs/outputs specified, see the constructor for more information on the overrides. return cls( inputs=None, outputs=None, nodes=list(node_map.values()), From 438fea2cb6b46cce3bb6c50172ce34dd651dc230 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 18 Feb 2020 12:48:28 -0800 Subject: [PATCH 13/33] optimize imports and remove more prints --- flytekit/clis/sdk_in_container/register.py | 5 ++--- flytekit/engines/flyte/engine.py | 5 ----- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/flytekit/clis/sdk_in_container/register.py b/flytekit/clis/sdk_in_container/register.py index 4762fc7f40..19ecc92789 100644 --- a/flytekit/clis/sdk_in_container/register.py +++ b/flytekit/clis/sdk_in_container/register.py @@ -2,12 +2,11 @@ import click -from flytekit.common.tasks import task as _task +from flytekit.clis.sdk_in_container.constants import CTX_PROJECT, CTX_DOMAIN, CTX_TEST, CTX_PACKAGES, CTX_VERSION from flytekit.common import utils as _utils +from flytekit.common.tasks import task as _task from flytekit.configuration.internal import look_up_version_from_image_tag as _look_up_version_from_image_tag, \ IMAGE as _IMAGE -from flytekit.clis.sdk_in_container.constants import CTX_PROJECT, CTX_DOMAIN, CTX_TEST, CTX_PACKAGES, CTX_VERSION -from flytekit.configuration.sdk import WORKFLOW_PACKAGES as _WORKFLOW_PACKAGES from flytekit.tools.module_loader import iterate_registerable_entities_in_order diff --git a/flytekit/engines/flyte/engine.py b/flytekit/engines/flyte/engine.py index 5f993cb8b2..774aca18a7 100644 --- a/flytekit/engines/flyte/engine.py +++ b/flytekit/engines/flyte/engine.py @@ -231,12 +231,7 @@ class FlyteWorkflow(_common_engine.BaseWorkflowExecutor): def register(self, identifier): client = _FlyteClientManager(_platform_config.URL.get(), insecure=_platform_config.INSECURE.get()).client try: - # import ipdb; ipdb.set_trace() sub_workflows = self.sdk_workflow.get_sub_workflows() - print("--------------- Main Workflows ---------------") - print(self.sdk_workflow) - print("--------------- Subworkflows ---------------") - print(sub_workflows) return client.create_workflow( identifier, _workflow_model.WorkflowSpec( From eb366917ad400bd82e92b7ca13d24093e73a1104 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 26 Feb 2020 15:59:34 -0800 Subject: [PATCH 14/33] use system exceptions --- flytekit/common/nodes.py | 20 ++++++++++++-------- flytekit/common/workflow.py | 28 ++++++++++++++++------------ 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/flytekit/common/nodes.py b/flytekit/common/nodes.py index 5b97b94f2f..a882ded70d 100644 --- a/flytekit/common/nodes.py +++ b/flytekit/common/nodes.py @@ -16,6 +16,7 @@ from flytekit.engines import loader as _engine_loader from flytekit.models import common as _common_models, node_execution as _node_execution_models from flytekit.models.core import workflow as _workflow_model, execution as _execution_models +from flytekit.common.exceptions import system as _system_exceptions class ParameterMapper(_six.with_metaclass(_common_models.FlyteABCMeta, _SortedDict)): @@ -87,6 +88,7 @@ class OutputParameterMapper(ParameterMapper): """ This subclass of ParameterMapper is used to represent outputs for a given node. """ + def _return_mapping_object(self, sdk_node, sdk_type, name): """ :param flytekit.common.nodes.Node sdk_node: @@ -195,7 +197,8 @@ def promote_from_model(cls, base_model): sdk_workflow = _workflow.SdkWorkflow.fetch(project, domain, name, version) return cls(sdk_workflow=sdk_workflow) else: - raise Exception("Bad workflow node model") + raise _system_exceptions.FlyteSystemException("Bad workflow node model, neither subworkflow nor " + "launchplan specified.") class SdkNode(_six.with_metaclass(_sdk_bases.ExtendedSdkType, _hash_mixin.HashOnReferenceMixin, _workflow_model.Node)): @@ -210,7 +213,7 @@ def __init__( sdk_workflow=None, sdk_launch_plan=None, sdk_branch=None - ): + ): """ :param Text id: A workflow-level unique identifier that identifies this node in the workflow. "inputs" and "outputs" are reserved node ids that cannot be used by other nodes. @@ -281,13 +284,13 @@ def promote_from_model(cls, model): elif model.workflow_node is not None: sdk_workflow_node = SdkWorkflowNode.promote_from_model(model.workflow_node) else: - raise Exception("bad Node model") + raise _system_exceptions.FlyteSystemException("Bad Node model, neither task nor workflow detected") # When WorkflowTemplate models (containing node models) are returned by Admin, they've been compiled with a # start node. In order to make the promoted SdkWorkflow look the same, we strip the start-node text back out. - for input in model.inputs: - if input.binding.promise is not None and input.binding.promise.node_id == "start-node": - input.binding.promise._node_id = _constants.GLOBAL_INPUT_NODE_ID + for i in model.inputs: + if i.binding.promise is not None and i.binding.promise.node_id == "start-node": + i.binding.promise._node_id = _constants.GLOBAL_INPUT_NODE_ID if sdk_task_node is not None: return cls( @@ -315,9 +318,10 @@ def promote_from_model(cls, model): sdk_launch_plan=sdk_workflow_node.sdk_launch_plan, ) else: - raise Exception("Bad SdkWorkflowNode model - both lp and workflow are None") + raise _system_exceptions.FlyteSystemException( + "Bad SdkWorkflowNode model, both lp and workflow are None") else: - raise Exception("Bad SdkNode model - both task and workflow nodes are empty") + raise _system_exceptions.FlyteSystemException("Bad SdkNode model, both task and workflow nodes are empty") @property def upstream_nodes(self): diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index b2bcb94f75..a1c9f1d30b 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -14,9 +14,10 @@ from flytekit.common.types import helpers as _type_helpers from flytekit.configuration import internal as _internal_config from flytekit.engines import loader as _engine_loader -from flytekit.models import interface as _interface_models, literals as _literal_models,\ +from flytekit.models import interface as _interface_models, literals as _literal_models, \ launch_plan as _launch_plan_models from flytekit.models.core import workflow as _workflow_models, identifier as _identifier_model +from flytekit.common.exceptions import system as _system_exceptions class Output(object): @@ -110,18 +111,18 @@ def __init__(self, inputs, outputs, nodes, id=None, metadata=None, interface=Non # Allow overrides if specified for all the arguments to the parent class constructor id = id if id is not None else _identifier.Identifier( - _identifier_model.ResourceType.WORKFLOW, - _internal_config.PROJECT.get(), - _internal_config.DOMAIN.get(), - _uuid.uuid4().hex, - _internal_config.VERSION.get() - ) + _identifier_model.ResourceType.WORKFLOW, + _internal_config.PROJECT.get(), + _internal_config.DOMAIN.get(), + _uuid.uuid4().hex, + _internal_config.VERSION.get() + ) metadata = metadata if metadata is not None else _workflow_models.WorkflowMetadata() interface = interface if interface is not None else _interface.TypedInterface( - {v.name: v.var for v in inputs}, - {v.name: v.var for v in outputs} - ) + {v.name: v.var for v in inputs}, + {v.name: v.var for v in outputs} + ) output_bindings = output_bindings if output_bindings is not None else \ [_literal_models.Binding(v.name, v.binding_data) for v in outputs] @@ -185,7 +186,9 @@ def get_sub_workflows(self): result.append(n.executable_sdk_object) result.extend(n.executable_sdk_object.get_sub_workflows()) else: - raise Exception("workflow node with subworkflow found but bad executable object {}".format) + raise _system_exceptions.FlyteSystemException( + "workflow node with subworkflow found but bad executable " + "object {}".format(n.executable_sdk_object)) # Ignore other node types (branch, task) return result @@ -343,7 +346,8 @@ def __call__(self, *args, **input_map): node = _nodes.SdkNode( id=None, - metadata=_workflow_models.NodeMetadata("placeholder", _datetime.timedelta(), _literal_models.RetryStrategy(0)), + metadata=_workflow_models.NodeMetadata("placeholder", _datetime.timedelta(), + _literal_models.RetryStrategy(0)), upstream_nodes=upstream_nodes, bindings=sorted(bindings, key=lambda b: b.var), sdk_workflow=self From 709f11623ffc9f4e1352100d55b7445e296be82f Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 26 Feb 2020 16:05:53 -0800 Subject: [PATCH 15/33] start-node/end-node as constants --- flytekit/common/constants.py | 3 +++ flytekit/common/nodes.py | 4 ++-- flytekit/common/workflow.py | 5 +++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/flytekit/common/constants.py b/flytekit/common/constants.py index 1a41794028..0c03985e6f 100644 --- a/flytekit/common/constants.py +++ b/flytekit/common/constants.py @@ -24,6 +24,9 @@ class SdkTaskType(object): GLOBAL_INPUT_NODE_ID = '' +START_NODE_ID = "start-node" +END_NODE_ID = "end-node" + class CloudProvider(object): AWS = "aws" diff --git a/flytekit/common/nodes.py b/flytekit/common/nodes.py index a882ded70d..4865c4bae4 100644 --- a/flytekit/common/nodes.py +++ b/flytekit/common/nodes.py @@ -274,7 +274,7 @@ def promote_from_model(cls, model): """ id = model.id # This should never be called - if id == "start-node" or id == "end-node": + if id == _constants.START_NODE_ID or id == _constants.END_NODE_ID: _logging.warning("Should not call promote from model on a start node or end node {}".format(model)) return None @@ -289,7 +289,7 @@ def promote_from_model(cls, model): # When WorkflowTemplate models (containing node models) are returned by Admin, they've been compiled with a # start node. In order to make the promoted SdkWorkflow look the same, we strip the start-node text back out. for i in model.inputs: - if i.binding.promise is not None and i.binding.promise.node_id == "start-node": + if i.binding.promise is not None and i.binding.promise.node_id == _constants.START_NODE_ID: i.binding.promise._node_id = _constants.GLOBAL_INPUT_NODE_ID if sdk_task_node is not None: diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index a1c9f1d30b..7e935fc64f 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -18,6 +18,7 @@ launch_plan as _launch_plan_models from flytekit.models.core import workflow as _workflow_models, identifier as _identifier_model from flytekit.common.exceptions import system as _system_exceptions +from flytekit.common import constants as _constants class Output(object): @@ -219,7 +220,7 @@ def promote_from_model(cls, base_model): """ node_map = {} for n in base_model.nodes: - if n.id == 'start-node' or n.id == "end-node": + if n.id == _constants.START_NODE_ID or n.id == _constants.END_NODE_ID: # The workflow compilation process done by Admin/Propeller will add a fake start-node to the graph # so we need to strip them back out here. continue @@ -228,7 +229,7 @@ def promote_from_model(cls, base_model): # Set upstream nodes for each node for n in base_model.nodes: - if n.id == 'start-node' or n.id == 'end-node': + if n.id == _constants.START_NODE_ID or n.id == _constants.END_NODE_ID: continue child_node = node_map[n.id] child_node.upstream_nodes[:] = [node_map[upstream_id] for upstream_id in n.upstream_node_ids] From e6c44465fb8b2981cdf1188be89b537b44d223f2 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 26 Feb 2020 18:27:38 -0800 Subject: [PATCH 16/33] ipdb --- flytekit/common/workflow.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index 7e935fc64f..3d9082db0e 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -353,8 +353,6 @@ def __call__(self, *args, **input_map): bindings=sorted(bindings, key=lambda b: b.var), sdk_workflow=self ) - import ipdb; - # ipdb.set_trace() return node From 447c9eaeb61f3b243cb1bfe5650e5668197d42ad Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 2 Mar 2020 13:09:37 -0800 Subject: [PATCH 17/33] separating out specialized nodes into a separate file --- flytekit/common/component_nodes.py | 111 +++++++++++++++++++++++++ flytekit/common/launch_plan.py | 2 +- flytekit/common/nodes.py | 125 +++-------------------------- 3 files changed, 123 insertions(+), 115 deletions(-) create mode 100644 flytekit/common/component_nodes.py diff --git a/flytekit/common/component_nodes.py b/flytekit/common/component_nodes.py new file mode 100644 index 0000000000..e0eb53010f --- /dev/null +++ b/flytekit/common/component_nodes.py @@ -0,0 +1,111 @@ +from __future__ import absolute_import + +import six as _six + +from flytekit.common import sdk_bases as _sdk_bases +from flytekit.common.exceptions import system as _system_exceptions +from flytekit.models.core import workflow as _workflow_model + + +class SdkTaskNode(_six.with_metaclass(_sdk_bases.ExtendedSdkType, _workflow_model.TaskNode)): + + def __init__(self, sdk_task): + """ + :param flytekit.common.tasks.task.SdkTask sdk_task: + """ + self._sdk_task = sdk_task + super(SdkTaskNode, self).__init__(None) + + @property + def reference_id(self): + """ + A globally unique identifier for the task. + :rtype: flytekit.models.core.identifier.Identifier + """ + return self._sdk_task.id + + @property + def sdk_task(self): + """ + :rtype: flytekit.common.tasks.task.SdkTask + """ + return self._sdk_task + + @classmethod + def promote_from_model(cls, base_model): + """ + Takes the idl wrapper for a TaskNode and returns the hydrated Flytekit object for it by fetching it from the + engine. + + :param flytekit.models.core.workflow.TaskNode base_model: + :rtype: SdkTaskNode + """ + from flytekit.common.tasks import task as _task + project = base_model.reference_id.project + domain = base_model.reference_id.domain + name = base_model.reference_id.name + version = base_model.reference_id.version + sdk_task = _task.SdkTask.fetch(project, domain, name, version) + return cls(sdk_task) + + +class SdkWorkflowNode(_six.with_metaclass(_sdk_bases.ExtendedSdkType, _workflow_model.WorkflowNode)): + def __init__(self, sdk_workflow=None, sdk_launch_plan=None): + """ + :param flytekit.common.workflow.SdkWorkflow sdk_workflow: + :param flytekit.common.launch_plan.SdkLaunchPlan sdk_launch_plan: + """ + self._sdk_workflow = sdk_workflow + self._sdk_launch_plan = sdk_launch_plan + super(SdkWorkflowNode, self).__init__() + + @property + def launchplan_ref(self): + """ + [Optional] A globally unique identifier for the launch plan. Should map to Admin. + :rtype: flytekit.models.core.identifier.Identifier + """ + return self._sdk_launch_plan.id if self._sdk_launch_plan else None + + @property + def sub_workflow_ref(self): + """ + [Optional] Reference to a subworkflow, that should be defined with the compiler context. + :rtype: flytekit.models.core.identifier.Identifier + """ + return self._sdk_workflow.id if self._sdk_workflow else None + + @property + def sdk_launch_plan(self): + """ + :rtype: flytekit.common.launch_plan.SdkLaunchPlan + """ + return self._sdk_launch_plan + + @property + def sdk_workflow(self): + """ + :rtype: flytekit.common.workflow.SdkWorkflow + """ + return self._sdk_workflow + + @classmethod + def promote_from_model(cls, base_model): + """ + :param flytekit.models.core.workflow.WorkflowNode base_model: + :rtype: SdkWorkflowNode + """ + from flytekit.common import workflow as _workflow, launch_plan as _launch_plan + project = base_model.reference.project + domain = base_model.reference.domain + name = base_model.reference.name + version = base_model.reference.version + if base_model.launchplan_ref is not None: + sdk_launch_plan = _launch_plan.SdkLaunchPlan.fetch(project, domain, name, version) + return cls(sdk_launch_plan=sdk_launch_plan) + elif base_model.sub_workflow_ref is not None: + sdk_workflow = _workflow.SdkWorkflow.fetch(project, domain, name, version) + return cls(sdk_workflow=sdk_workflow) + else: + raise _system_exceptions.FlyteSystemException("Bad workflow node model, neither subworkflow nor " + "launchplan specified.") diff --git a/flytekit/common/launch_plan.py b/flytekit/common/launch_plan.py index 7f22e6f4d1..6c8479d036 100644 --- a/flytekit/common/launch_plan.py +++ b/flytekit/common/launch_plan.py @@ -6,7 +6,6 @@ from flytekit.common.exceptions import scopes as _exception_scopes, user as _user_exceptions from flytekit.common.mixins import registerable as _registerable, hash as _hash_mixin, executable as _executable_mixin -from flytekit.common import workflow as _workflow from flytekit.common.types import helpers as _type_helpers from flytekit.configuration import sdk as _sdk_config, internal as _internal_config, auth as _auth_config from flytekit.engines import loader as _engine_loader @@ -66,6 +65,7 @@ def fetch(cls, project, domain, name, version=None): domain, and name. :rtype: SdkLaunchPlan """ + from flytekit.common import workflow as _workflow launch_plan_id = _identifier.Identifier( _identifier_model.ResourceType.LAUNCH_PLAN, project, domain, name, version ) diff --git a/flytekit/common/nodes.py b/flytekit/common/nodes.py index 64c21210d8..859dc74446 100644 --- a/flytekit/common/nodes.py +++ b/flytekit/common/nodes.py @@ -1,22 +1,22 @@ from __future__ import absolute_import import abc as _abc -import six as _six import logging as _logging + +import six as _six from sortedcontainers import SortedDict as _SortedDict -from flytekit.common import sdk_bases as _sdk_bases, promise as _promise +from flytekit.common import constants as _constants +from flytekit.common import sdk_bases as _sdk_bases, promise as _promise, component_nodes as _component_nodes from flytekit.common.exceptions import scopes as _exception_scopes, user as _user_exceptions +from flytekit.common.exceptions import system as _system_exceptions from flytekit.common.mixins import hash as _hash_mixin, artifact as _artifact_mixin -from flytekit.common.tasks import executions as _task_executions, task as _task -from flytekit.common import workflow as _workflow, launch_plan as _launch_plan +from flytekit.common.tasks import executions as _task_executions from flytekit.common.types import helpers as _type_helpers from flytekit.common.utils import _dnsify -from flytekit.common import constants as _constants from flytekit.engines import loader as _engine_loader from flytekit.models import common as _common_models, node_execution as _node_execution_models from flytekit.models.core import workflow as _workflow_model, execution as _execution_models -from flytekit.common.exceptions import system as _system_exceptions class ParameterMapper(_six.with_metaclass(_common_models.FlyteABCMeta, _SortedDict)): @@ -100,109 +100,6 @@ def _return_mapping_object(self, sdk_node, sdk_type, name): return _promise.NodeOutput(sdk_node, sdk_type, name) -class SdkTaskNode(_six.with_metaclass(_sdk_bases.ExtendedSdkType, _workflow_model.TaskNode)): - - def __init__(self, sdk_task): - """ - :param flytekit.common.tasks.task.SdkTask sdk_task: - """ - self._sdk_task = sdk_task - super(SdkTaskNode, self).__init__(None) - - @property - def reference_id(self): - """ - A globally unique identifier for the task. - :rtype: flytekit.models.core.identifier.Identifier - """ - return self._sdk_task.id - - @property - def sdk_task(self): - """ - :rtype: flytekit.common.tasks.task.SdkTask - """ - return self._sdk_task - - @classmethod - def promote_from_model(cls, base_model): - """ - Takes the idl wrapper for a TaskNode and returns the hydrated Flytekit object for it by fetching it from the - engine. - - :param flytekit.models.core.workflow.TaskNode base_model: - :rtype: SdkTaskNode - """ - project = base_model.reference_id.project - domain = base_model.reference_id.domain - name = base_model.reference_id.name - version = base_model.reference_id.version - sdk_task = _task.SdkTask.fetch(project, domain, name, version) - return cls(sdk_task) - - -class SdkWorkflowNode(_six.with_metaclass(_sdk_bases.ExtendedSdkType, _workflow_model.WorkflowNode)): - def __init__(self, sdk_workflow=None, sdk_launch_plan=None): - """ - :param flytekit.common.workflow.SdkWorkflow sdk_workflow: - :param flytekit.common.launch_plan.SdkLaunchPlan sdk_launch_plan: - """ - self._sdk_workflow = sdk_workflow - self._sdk_launch_plan = sdk_launch_plan - super(SdkWorkflowNode, self).__init__() - - @property - def launchplan_ref(self): - """ - [Optional] A globally unique identifier for the launch plan. Should map to Admin. - :rtype: flytekit.models.core.identifier.Identifier - """ - return self._sdk_launch_plan.id if self._sdk_launch_plan else None - - @property - def sub_workflow_ref(self): - """ - [Optional] Reference to a subworkflow, that should be defined with the compiler context. - :rtype: flytekit.models.core.identifier.Identifier - """ - return self._sdk_workflow.id if self._sdk_workflow else None - - @property - def sdk_launch_plan(self): - """ - :rtype: flytekit.common.launch_plan.SdkLaunchPlan - """ - return self._sdk_launch_plan - - @property - def sdk_workflow(self): - """ - :rtype: flytekit.common.workflow.SdkWorkflow - """ - return self._sdk_workflow - - @classmethod - def promote_from_model(cls, base_model): - """ - :param flytekit.models.core.workflow.WorkflowNode base_model: - :rtype: SdkWorkflowNode - """ - - project = base_model.reference.project - domain = base_model.reference.domain - name = base_model.reference.name - version = base_model.reference.version - if base_model.launchplan_ref is not None: - sdk_launch_plan = _launch_plan.SdkLaunchPlan.fetch(project, domain, name, version) - return cls(sdk_launch_plan=sdk_launch_plan) - elif base_model.sub_workflow_ref is not None: - sdk_workflow = _workflow.SdkWorkflow.fetch(project, domain, name, version) - return cls(sdk_workflow=sdk_workflow) - else: - raise _system_exceptions.FlyteSystemException("Bad workflow node model, neither subworkflow nor " - "launchplan specified.") - - class SdkNode(_six.with_metaclass(_sdk_bases.ExtendedSdkType, _hash_mixin.HashOnReferenceMixin, _workflow_model.Node)): def __init__( @@ -246,9 +143,9 @@ def __init__( workflow_node = None if sdk_workflow is not None: - workflow_node = SdkWorkflowNode(sdk_workflow=sdk_workflow) + workflow_node = _component_nodes.SdkWorkflowNode(sdk_workflow=sdk_workflow) elif sdk_launch_plan is not None: - workflow_node = SdkWorkflowNode(sdk_launch_plan=sdk_launch_plan) + workflow_node = _component_nodes.SdkWorkflowNode(sdk_launch_plan=sdk_launch_plan) super(SdkNode, self).__init__( id=_dnsify(id) if id else None, @@ -256,7 +153,7 @@ def __init__( inputs=bindings, upstream_node_ids=[n.id for n in upstream_nodes], output_aliases=[], # TODO: Are aliases a thing in SDK nodes - task_node=SdkTaskNode(sdk_task) if sdk_task else None, + task_node=_component_nodes.SdkTaskNode(sdk_task) if sdk_task else None, workflow_node=workflow_node, branch_node=sdk_branch.target if sdk_branch else None ) @@ -282,9 +179,9 @@ def promote_from_model(cls, model): sdk_task_node, sdk_workflow_node = None, None if model.task_node is not None: - sdk_task_node = SdkTaskNode.promote_from_model(model.task_node) + sdk_task_node = _component_nodes.SdkTaskNode.promote_from_model(model.task_node) elif model.workflow_node is not None: - sdk_workflow_node = SdkWorkflowNode.promote_from_model(model.workflow_node) + sdk_workflow_node = _component_nodes.SdkWorkflowNode.promote_from_model(model.workflow_node) else: raise _system_exceptions.FlyteSystemException("Bad Node model, neither task nor workflow detected") From 9d70a6707d4541b87e4fe783381d7b6742ca8641 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 2 Mar 2020 13:14:37 -0800 Subject: [PATCH 18/33] fix test --- tests/flytekit/unit/common_tests/test_nodes.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/flytekit/unit/common_tests/test_nodes.py b/tests/flytekit/unit/common_tests/test_nodes.py index 99509dac7f..fc17ac93ab 100644 --- a/tests/flytekit/unit/common_tests/test_nodes.py +++ b/tests/flytekit/unit/common_tests/test_nodes.py @@ -1,5 +1,5 @@ from __future__ import absolute_import -from flytekit.common import nodes as _nodes, interface as _interface +from flytekit.common import nodes as _nodes, interface as _interface, component_nodes as _component_nodes from flytekit.models.core import workflow as _core_workflow_models, identifier as _identifier from flytekit.models import literals as _literals from flytekit.sdk import tasks as _tasks, types as _types, workflow as _workflow @@ -180,7 +180,7 @@ def testy_test(wf_params, a, b): pass testy_test._id = _identifier.Identifier(_identifier.ResourceType.TASK, 'project', 'domain', 'name', 'version') - n = _nodes.SdkTaskNode(testy_test) + n = _component_nodes.SdkTaskNode(testy_test) assert n.reference_id.project == 'project' assert n.reference_id.domain == 'domain' assert n.reference_id.name == 'name' @@ -262,7 +262,7 @@ class test_workflow(object): lp = test_workflow.create_launch_plan() lp._id = _identifier.Identifier(_identifier.ResourceType.TASK, 'project', 'domain', 'name', 'version') - n = _nodes.SdkWorkflowNode(sdk_launch_plan=lp) + n = _component_nodes.SdkWorkflowNode(sdk_launch_plan=lp) assert n.launchplan_ref.project == 'project' assert n.launchplan_ref.domain == 'domain' assert n.launchplan_ref.name == 'name' From 0fdffdc3725d600109ccdddb6748c90dabdefffa Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 3 Mar 2020 12:52:21 -0800 Subject: [PATCH 19/33] adding tests --- flytekit/clients/raw.py | 5 +- flytekit/common/component_nodes.py | 30 ++++- flytekit/common/nodes.py | 10 +- flytekit/common/workflow.py | 9 +- flytekit/models/core/compiler.py | 6 +- .../resources/CompiledWorkflowClosure.pb | Bin 0 -> 2118 bytes .../unit/common_tests/test_helpers.py | 45 -------- .../common_tests/test_workflow_promote.py | 109 +++++++++++++++++- 8 files changed, 156 insertions(+), 58 deletions(-) create mode 100644 tests/flytekit/unit/common_tests/resources/CompiledWorkflowClosure.pb diff --git a/flytekit/clients/raw.py b/flytekit/clients/raw.py index 1f9452a476..dddafbae06 100644 --- a/flytekit/clients/raw.py +++ b/flytekit/clients/raw.py @@ -307,7 +307,10 @@ def get_workflow(self, get_object_request): :rtype: flyteidl.admin.workflow_pb2.Workflow :raises: TODO """ - return self._stub.GetWorkflow(get_object_request, metadata=self._metadata) + adminwf = self._stub.GetWorkflow(get_object_request, metadata=self._metadata) + with open('compiled_workflow_closure.pb', 'wb') as fh: + fh.write(adminwf.closure.compiled_workflow.SerializeToString()) + return adminwf #################################################################################################################### # diff --git a/flytekit/common/component_nodes.py b/flytekit/common/component_nodes.py index e0eb53010f..723c89da5b 100644 --- a/flytekit/common/component_nodes.py +++ b/flytekit/common/component_nodes.py @@ -1,6 +1,7 @@ from __future__ import absolute_import import six as _six +import logging as _logging from flytekit.common import sdk_bases as _sdk_bases from flytekit.common.exceptions import system as _system_exceptions @@ -32,15 +33,25 @@ def sdk_task(self): return self._sdk_task @classmethod - def promote_from_model(cls, base_model): + def promote_from_model(cls, base_model, tasks): """ Takes the idl wrapper for a TaskNode and returns the hydrated Flytekit object for it by fetching it from the engine. :param flytekit.models.core.workflow.TaskNode base_model: + :param list[flytekit.models.task.TaskTemplate] tasks: :rtype: SdkTaskNode """ from flytekit.common.tasks import task as _task + tasks = tasks or [] + for t in tasks: + if t.id == base_model.reference_id: + _logging.debug("Found existing task template for {}, will not retrieve from Admin".format(t.id)) + sdk_task = _task.SdkTask.promote_from_model(t) + return cls(sdk_task) + + # If not found, fetch it from Admin + _logging.debug("Fetching task template for {} from Admin".format(base_model.reference_id)) project = base_model.reference_id.project domain = base_model.reference_id.domain name = base_model.reference_id.name @@ -90,12 +101,16 @@ def sdk_workflow(self): return self._sdk_workflow @classmethod - def promote_from_model(cls, base_model): + def promote_from_model(cls, base_model, sub_workflows=None, tasks=None): """ :param flytekit.models.core.workflow.WorkflowNode base_model: + :param list[flytekit.models.core.workflow.WorkflowTemplate] sub_workflows: + :param list[flytekit.models.task.TaskTemplate] tasks: :rtype: SdkWorkflowNode """ + # put the import statement here to prevent circular dependency error from flytekit.common import workflow as _workflow, launch_plan as _launch_plan + project = base_model.reference.project domain = base_model.reference.domain name = base_model.reference.name @@ -104,6 +119,17 @@ def promote_from_model(cls, base_model): sdk_launch_plan = _launch_plan.SdkLaunchPlan.fetch(project, domain, name, version) return cls(sdk_launch_plan=sdk_launch_plan) elif base_model.sub_workflow_ref is not None: + sub_workflows = sub_workflows or [] + # The workflow templates for sub-workflows should have been included in the original response + for sw in sub_workflows: + if sw.id == base_model.reference: + promoted = _workflow.SdkWorkflow.promote_from_model(sw, sub_workflows=sub_workflows, + tasks=tasks) + return cls(sdk_workflow=promoted) + + # if not found for some reason, fetch it from Admin again. + _logging.warning("Your subworkflow with id {} is not included in the promote call.".format( + base_model.reference)) sdk_workflow = _workflow.SdkWorkflow.fetch(project, domain, name, version) return cls(sdk_workflow=sdk_workflow) else: diff --git a/flytekit/common/nodes.py b/flytekit/common/nodes.py index 859dc74446..27e8efad4a 100644 --- a/flytekit/common/nodes.py +++ b/flytekit/common/nodes.py @@ -166,9 +166,12 @@ def executable_sdk_object(self): return self._executable_sdk_object @classmethod - def promote_from_model(cls, model): + def promote_from_model(cls, model, sub_workflows=None, tasks=None): """ :param flytekit.models.core.workflow.Node model: + :param list[flytekit.models.core.workflow.WorkflowTemplate] sub_workflows: + :param list[flytekit.models.task.TaskTemplate] tasks: If specified, these task templates will be passed to the + SdkTaskNode promote_from_model call, and used instead of fetching from Admin. :rtype: SdkNode """ id = model.id @@ -179,9 +182,10 @@ def promote_from_model(cls, model): sdk_task_node, sdk_workflow_node = None, None if model.task_node is not None: - sdk_task_node = _component_nodes.SdkTaskNode.promote_from_model(model.task_node) + sdk_task_node = _component_nodes.SdkTaskNode.promote_from_model(model.task_node, tasks) elif model.workflow_node is not None: - sdk_workflow_node = _component_nodes.SdkWorkflowNode.promote_from_model(model.workflow_node) + sdk_workflow_node = _component_nodes.SdkWorkflowNode.promote_from_model( + model.workflow_node,sub_workflows, tasks) else: raise _system_exceptions.FlyteSystemException("Bad Node model, neither task nor workflow detected") diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index 3d9082db0e..7bf3c4b4b7 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -213,9 +213,14 @@ def fetch(cls, project, domain, name, version=None): return sdk_workflow @classmethod - def promote_from_model(cls, base_model): + def promote_from_model(cls, base_model, sub_workflows=None, tasks=None): """ :param flytekit.models.core.workflow.WorkflowTemplate base_model: + :param list[flytekit.models.core.workflow.WorkflowTemplate] sub_workflows: Provide a list of WorkflowTemplate + models (should be returned from Admin as part of the admin CompiledWorkflowClosure. Relevant sub-workflows + should always be provided. + :param list[flytekit.models.task.TaskTemplate] tasks: Same as above but for tasks. If tasks are not provided + relevant TaskTemplates will be fetched from Admin :rtype: SdkWorkflow """ node_map = {} @@ -225,7 +230,7 @@ def promote_from_model(cls, base_model): # so we need to strip them back out here. continue else: - node_map[n.id] = _nodes.SdkNode.promote_from_model(n) + node_map[n.id] = _nodes.SdkNode.promote_from_model(n, sub_workflows, tasks) # Set upstream nodes for each node for n in base_model.nodes: diff --git a/flytekit/models/core/compiler.py b/flytekit/models/core/compiler.py index 955d296dff..33acb37c42 100644 --- a/flytekit/models/core/compiler.py +++ b/flytekit/models/core/compiler.py @@ -125,6 +125,7 @@ def from_flyte_idl(cls, p): ) +# TODO: properly sort out the model code and remove one of these duplicate CompiledTasks class CompiledTask(_common.FlyteIdlEntity): def __init__(self, template): @@ -207,8 +208,11 @@ def from_flyte_idl(cls, p): :param flyteidl.core.compiler_pb2.CompiledWorkflowClosure p: :rtype: CompiledWorkflowClosure """ + # This import is here to prevent a circular dependency issue. + # TODO: properly sort out the model code and remove the duplicate CompiledTask + from flytekit.models.task import CompiledTask as _CompiledTask return cls( primary=CompiledWorkflow.from_flyte_idl(p.primary), sub_workflows=[CompiledWorkflow.from_flyte_idl(s) for s in p.sub_workflows], - tasks=[CompiledTask.from_flyte_idl(t) for t in p.tasks] + tasks=[_CompiledTask.from_flyte_idl(t) for t in p.tasks] ) diff --git a/tests/flytekit/unit/common_tests/resources/CompiledWorkflowClosure.pb b/tests/flytekit/unit/common_tests/resources/CompiledWorkflowClosure.pb new file mode 100644 index 0000000000000000000000000000000000000000..1f3ce5c79a76a959969c4e4bef24ebb9150699f3 GIT binary patch literal 2118 zcmd5--EI;=6fPDN4{Zq3-%M>&sThmd<*$UF3k_6SF)cI{HC~v_!p`c3Wryr8f2nw3 z;)QSEdl+wh2XB1`6Cc1thlL8PqG(L?f@C@KotgQ*^UXQ%-UV-*@XF!7fIPC=HHk?K zljztZkrq*DqfRu_pQ%z>tFRw$i1mg_gf`V{vP#v^rwG`HD#+lr-v&3) zBXeuZ$B^ymt8+~-br+N4vq-AEl1p(OU+u&C=UV$YT_uLPS92v=k1C#JoybOy} zVlHpi4X$rqG|!%d|DD4gUa|s0A1Cy2mQ~kbo&}H(dnXF!5TnoG8GC=zyTjh>Cbg_; zo%tK~oQ)^yq&MtD$9=dtLCb4{W;bntNlRrnM!F+=F8<<=2NO!b;oPS z$13Iu^Km|mFAi4vMfPvOB6tRcw%R1J{xX*g%ocs?gq%dHHP*-#8dAHm8M1}~=~#W= zNYC2E@q$70mQp1`i)bzEZFH@DhV7N$Mb5e@SEo`_C3tsc3t`VNQxoe1!^lwB&2dV9 zL>L&(&Uovy*TY_p8_Wf+p%Yo@U>{o?L{6+gug8>0z}17JN9CN5FO+koLgq-=FBT5+ zhxbdFa=us)PBP`Y2!(V?&5%l~qRLmPCM!>Zk73!Cm_N=O=Fs0!F&KbV+x91=;=SB{ rdEz++*$lJ6?i7zR`2rfB;z8D6Y^Xx!I2X)*cgBNU>6EcXGtca=v{ske literal 0 HcmV?d00001 diff --git a/tests/flytekit/unit/common_tests/test_helpers.py b/tests/flytekit/unit/common_tests/test_helpers.py index e87ec0eb67..523fa1b4fa 100644 --- a/tests/flytekit/unit/common_tests/test_helpers.py +++ b/tests/flytekit/unit/common_tests/test_helpers.py @@ -6,48 +6,3 @@ from flytekit.models.core import workflow as _workflow_model - -def get_workflow_template(): - """ - This function retrieves a TasKTemplate object from the pb file in the resources directory. - It was created by reading from Flyte Admin, the following workflow, after registration. - - from __future__ import absolute_import - - from flytekit.common.types.primitives import Integer - from flytekit.sdk.tasks import ( - python_task, - inputs, - outputs, - ) - from flytekit.sdk.types import Types - from flytekit.sdk.workflow import workflow_class, Input, Output - - - @inputs(a=Types.Integer) - @outputs(b=Types.Integer, c=Types.Integer) - @python_task() - def demo_task_for_promote(wf_params, a, b, c): - b.set(a + 1) - c.set(a + 2) - - - @workflow_class() - class OneTaskWFForPromote(object): - wf_input = Input(Types.Integer, required=True) - my_task_node = demo_task_for_promote(a=wf_input) - wf_output_b = Output(my_task_node.outputs.b, sdk_type=Integer) - wf_output_c = Output(my_task_node.outputs.c, sdk_type=Integer) - - - :rtype: flytekit.models.core.workflow.WorkflowTemplate - """ - workflow_template_pb = _workflow_pb2.WorkflowTemplate() - # So that tests that use this work when run from any directory - basepath = _path.dirname(__file__) - filepath = _path.abspath(_path.join(basepath, "resources", "OneTaskWFForPromote.pb")) - with open(filepath, "rb") as fh: - workflow_template_pb.ParseFromString(fh.read()) - - wt = _workflow_model.WorkflowTemplate.from_flyte_idl(workflow_template_pb) - return wt diff --git a/tests/flytekit/unit/common_tests/test_workflow_promote.py b/tests/flytekit/unit/common_tests/test_workflow_promote.py index fada2e7ab4..de3dc41b11 100644 --- a/tests/flytekit/unit/common_tests/test_workflow_promote.py +++ b/tests/flytekit/unit/common_tests/test_workflow_promote.py @@ -3,16 +3,22 @@ from datetime import timedelta from mock import patch as _patch +from os import path as _path from flytekit.common import workflow as _workflow_common from flytekit.common.tasks import task as _task from flytekit.models import interface as _interface, \ literals as _literals, types as _types, task as _task_model -from flytekit.models.core import workflow as _workflow_model, identifier as _identifier +from flytekit.models.core import workflow as _workflow_model, identifier as _identifier, compiler as _compiler_model from flytekit.sdk import tasks as _sdk_tasks from flytekit.sdk import workflow as _sdk_workflow from flytekit.sdk.types import Types as _Types -from tests.flytekit.unit.common_tests import test_helpers +from flyteidl.core import compiler_pb2 as _compiler_pb2, workflow_pb2 as _workflow_pb2 +from flytekit.sdk.types import Types +from flytekit.sdk.workflow import workflow_class, Input, Output +from flytekit.sdk.tasks import inputs, outputs, python_task +from flytekit.sdk.types import Types +from flytekit.sdk.workflow import workflow_class, Input, Output def get_sample_node_metadata(node_id): @@ -59,6 +65,52 @@ def get_sample_task_metadata(): ) +def get_workflow_template(): + """ + This function retrieves a TasKTemplate object from the pb file in the resources directory. + It was created by reading from Flyte Admin, the following workflow, after registration. + + from __future__ import absolute_import + + from flytekit.common.types.primitives import Integer + from flytekit.sdk.tasks import ( + python_task, + inputs, + outputs, + ) + from flytekit.sdk.types import Types + from flytekit.sdk.workflow import workflow_class, Input, Output + + + @inputs(a=Types.Integer) + @outputs(b=Types.Integer, c=Types.Integer) + @python_task() + def demo_task_for_promote(wf_params, a, b, c): + b.set(a + 1) + c.set(a + 2) + + + @workflow_class() + class OneTaskWFForPromote(object): + wf_input = Input(Types.Integer, required=True) + my_task_node = demo_task_for_promote(a=wf_input) + wf_output_b = Output(my_task_node.outputs.b, sdk_type=Integer) + wf_output_c = Output(my_task_node.outputs.c, sdk_type=Integer) + + + :rtype: flytekit.models.core.workflow.WorkflowTemplate + """ + workflow_template_pb = _workflow_pb2.WorkflowTemplate() + # So that tests that use this work when run from any directory + basepath = _path.dirname(__file__) + filepath = _path.abspath(_path.join(basepath, "resources", "OneTaskWFForPromote.pb")) + with open(filepath, "rb") as fh: + workflow_template_pb.ParseFromString(fh.read()) + + wt = _workflow_model.WorkflowTemplate.from_flyte_idl(workflow_template_pb) + return wt + + @_patch("flytekit.common.tasks.task.SdkTask.fetch") def test_basic_workflow_promote(mock_task_fetch): # This section defines a sample workflow from a user @@ -100,8 +152,8 @@ class TestPromoteExampleWf(object): ) sdk_promoted_task = _task.SdkTask.promote_from_model(task_template) mock_task_fetch.return_value = sdk_promoted_task - promoted_template = test_helpers.get_workflow_template() - promoted_wf = _workflow_common.SdkWorkflow.promote_from_model(promoted_template) + workflow_template = get_workflow_template() + promoted_wf = _workflow_common.SdkWorkflow.promote_from_model(workflow_template) assert promoted_wf.interface.inputs["wf_input"] == TestPromoteExampleWf.interface.inputs["wf_input"] assert promoted_wf.interface.outputs["wf_output_b"] == TestPromoteExampleWf.interface.outputs["wf_output_b"] @@ -110,3 +162,52 @@ class TestPromoteExampleWf(object): assert len(promoted_wf.nodes) == 1 assert len(TestPromoteExampleWf.nodes) == 1 assert promoted_wf.nodes[0].inputs[0] == TestPromoteExampleWf.nodes[0].inputs[0] + + +def get_compiled_workflow_closure(): + """ + :rtype: flytekit.models.core.compiler.CompiledWorkflowClosure + """ + cwc_pb = _compiler_pb2.CompiledWorkflowClosure() + # So that tests that use this work when run from any directory + basepath = _path.dirname(__file__) + filepath = _path.abspath(_path.join(basepath, "resources", "CompiledWorkflowClosure.pb")) + with open(filepath, "rb") as fh: + cwc_pb.ParseFromString(fh.read()) + + return _compiler_model.CompiledWorkflowClosure.from_flyte_idl(cwc_pb) + + +def test_more_promote(): + cwc = get_compiled_workflow_closure() + primary = cwc.primary + sub_wf_templates = [s.template for s in cwc.sub_workflows] + task_templates = [t.template for t in cwc.tasks] + promoted_wf = _workflow_common.SdkWorkflow.promote_from_model(primary.template, sub_wf_templates, task_templates) + + # This file that the promoted_wf reads contains the compiled workflow closure protobuf retrieved from Admin + # after registering a workflow that basically looks like the one below. + + @inputs(num=Types.Integer) + @outputs(out=Types.Integer) + @python_task + def inner_task(wf_params, num, out): + wf_params.logging.info("Running inner task... setting output to input") + out.set(num) + + @workflow_class() + class IdentityWorkflow(object): + a = Input(Types.Integer, default=5, help="Input for inner workflow") + odd_nums_task = inner_task(num=a) + task_output = Output(odd_nums_task.outputs.out, sdk_type=Types.Integer) + + @workflow_class() + class StaticSubWorkflowCaller(object): + outer_a = Input(Types.Integer, default=5, help="Input for inner workflow") + identity_wf_execution = IdentityWorkflow(a=outer_a) + wf_output = Output(identity_wf_execution.outputs.task_output, sdk_type=Types.Integer) + + assert StaticSubWorkflowCaller.interface == promoted_wf.interface + assert StaticSubWorkflowCaller.nodes[0].id == promoted_wf.nodes[0].id + assert StaticSubWorkflowCaller.nodes[0].inputs == promoted_wf.nodes[0].inputs + assert StaticSubWorkflowCaller.outputs == promoted_wf.outputs From fb19a7bb11fc17ea6d45a0d2f7125a925e9af0a8 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 3 Mar 2020 12:54:33 -0800 Subject: [PATCH 20/33] revert raw.py --- flytekit/clients/raw.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/flytekit/clients/raw.py b/flytekit/clients/raw.py index dddafbae06..1f9452a476 100644 --- a/flytekit/clients/raw.py +++ b/flytekit/clients/raw.py @@ -307,10 +307,7 @@ def get_workflow(self, get_object_request): :rtype: flyteidl.admin.workflow_pb2.Workflow :raises: TODO """ - adminwf = self._stub.GetWorkflow(get_object_request, metadata=self._metadata) - with open('compiled_workflow_closure.pb', 'wb') as fh: - fh.write(adminwf.closure.compiled_workflow.SerializeToString()) - return adminwf + return self._stub.GetWorkflow(get_object_request, metadata=self._metadata) #################################################################################################################### # From b2f157384342edc0d7bdd1b2c1d62794d3ed3719 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 3 Mar 2020 12:58:26 -0800 Subject: [PATCH 21/33] revert readme --- README.md | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/README.md b/README.md index f403d57f0a..39ce35455b 100644 --- a/README.md +++ b/README.md @@ -83,14 +83,3 @@ shellcheck **/*.sh ``` - -flyte-cli -h localhost:30081 -p flytetester -d development -i list-workflow-versions -n cookbook.sample_workflows.formula_1.outer.StaticLaunchPlanCaller - -wf:flytetester:development:cookbook.sample_workflows.formula_1.outer.StaticLaunchPlanCaller:7be6342b4d5d95f5e31e6ad89636ad48925643ab - -from flytekit.clis.flyte_cli import main -from flytekit.common.workflow import SdkWorkflow -s = SdkWorkflow.fetch('flytetester', 'development', 'cookbook.sample_workflows.formula_1.outer.StaticLaunchPlanCaller', '7be6342b4d5d95f5e31e6ad89636ad48925643ab') - -FLYTE_INTERNAL_IMAGE=docker.io/lyft/flyteexamples:c76320408f58d7e0fe12147bc398b7f027bf0a77 FLYTE_PLATFORM_URL=localhost:30081 pyflyte -c /root/local.config -p flytetester -d staging register workflows - From fab700f10aac9e1ce654b430e7f992d6cd0c07f6 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 3 Mar 2020 12:58:56 -0800 Subject: [PATCH 22/33] more revert readme --- README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/README.md b/README.md index 39ce35455b..f56985c4bd 100644 --- a/README.md +++ b/README.md @@ -81,5 +81,3 @@ source ~/.virtualenvs/flytekit/bin/activate python -m pytest tests/flytekit/unit shellcheck **/*.sh ``` - - From d6b05915ba083a8b0a4a8836fdb9f9dbdfa173ff Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 3 Mar 2020 13:02:16 -0800 Subject: [PATCH 23/33] comment --- flytekit/common/component_nodes.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flytekit/common/component_nodes.py b/flytekit/common/component_nodes.py index 723c89da5b..b902985a06 100644 --- a/flytekit/common/component_nodes.py +++ b/flytekit/common/component_nodes.py @@ -127,7 +127,10 @@ def promote_from_model(cls, base_model, sub_workflows=None, tasks=None): tasks=tasks) return cls(sdk_workflow=promoted) - # if not found for some reason, fetch it from Admin again. + # If not found for some reason, fetch it from Admin again. + # The reason there is a warning here but not for tasks is because sub-workflows should always be passed + # along. Ideally subworkflows are never even registered with Admin, so fetching from Admin ideally doesn't + # return anything. _logging.warning("Your subworkflow with id {} is not included in the promote call.".format( base_model.reference)) sdk_workflow = _workflow.SdkWorkflow.fetch(project, domain, name, version) From 629604c05ac50b794b90f3341e9a6a51aaec08a4 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 3 Mar 2020 13:03:43 -0800 Subject: [PATCH 24/33] bump --- flytekit/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/__init__.py b/flytekit/__init__.py index 5058504d12..36196a4628 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -1,4 +1,4 @@ from __future__ import absolute_import import flytekit.plugins -__version__ = '0.5.0' +__version__ = '0.6.0b0' From 8af404ddcbb00afba2248d495e58b213a4ac968d Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 6 Mar 2020 11:06:28 -0800 Subject: [PATCH 25/33] move proto files into a subdir, delete unecessary test file --- .../{ => protos}/CompiledWorkflowClosure.pb | Bin .../resources/{ => protos}/OneTaskWFForPromote.pb | Bin tests/flytekit/unit/common_tests/test_helpers.py | 8 -------- .../unit/common_tests/test_workflow_promote.py | 4 ++-- 4 files changed, 2 insertions(+), 10 deletions(-) rename tests/flytekit/unit/common_tests/resources/{ => protos}/CompiledWorkflowClosure.pb (100%) rename tests/flytekit/unit/common_tests/resources/{ => protos}/OneTaskWFForPromote.pb (100%) delete mode 100644 tests/flytekit/unit/common_tests/test_helpers.py diff --git a/tests/flytekit/unit/common_tests/resources/CompiledWorkflowClosure.pb b/tests/flytekit/unit/common_tests/resources/protos/CompiledWorkflowClosure.pb similarity index 100% rename from tests/flytekit/unit/common_tests/resources/CompiledWorkflowClosure.pb rename to tests/flytekit/unit/common_tests/resources/protos/CompiledWorkflowClosure.pb diff --git a/tests/flytekit/unit/common_tests/resources/OneTaskWFForPromote.pb b/tests/flytekit/unit/common_tests/resources/protos/OneTaskWFForPromote.pb similarity index 100% rename from tests/flytekit/unit/common_tests/resources/OneTaskWFForPromote.pb rename to tests/flytekit/unit/common_tests/resources/protos/OneTaskWFForPromote.pb diff --git a/tests/flytekit/unit/common_tests/test_helpers.py b/tests/flytekit/unit/common_tests/test_helpers.py deleted file mode 100644 index 523fa1b4fa..0000000000 --- a/tests/flytekit/unit/common_tests/test_helpers.py +++ /dev/null @@ -1,8 +0,0 @@ -from __future__ import absolute_import - -from os import path as _path - -from flyteidl.core import workflow_pb2 as _workflow_pb2 - -from flytekit.models.core import workflow as _workflow_model - diff --git a/tests/flytekit/unit/common_tests/test_workflow_promote.py b/tests/flytekit/unit/common_tests/test_workflow_promote.py index de3dc41b11..4993abb31e 100644 --- a/tests/flytekit/unit/common_tests/test_workflow_promote.py +++ b/tests/flytekit/unit/common_tests/test_workflow_promote.py @@ -103,7 +103,7 @@ class OneTaskWFForPromote(object): workflow_template_pb = _workflow_pb2.WorkflowTemplate() # So that tests that use this work when run from any directory basepath = _path.dirname(__file__) - filepath = _path.abspath(_path.join(basepath, "resources", "OneTaskWFForPromote.pb")) + filepath = _path.abspath(_path.join(basepath, "resources/protos", "OneTaskWFForPromote.pb")) with open(filepath, "rb") as fh: workflow_template_pb.ParseFromString(fh.read()) @@ -171,7 +171,7 @@ def get_compiled_workflow_closure(): cwc_pb = _compiler_pb2.CompiledWorkflowClosure() # So that tests that use this work when run from any directory basepath = _path.dirname(__file__) - filepath = _path.abspath(_path.join(basepath, "resources", "CompiledWorkflowClosure.pb")) + filepath = _path.abspath(_path.join(basepath, "resources/protos", "CompiledWorkflowClosure.pb")) with open(filepath, "rb") as fh: cwc_pb.ParseFromString(fh.read()) From ebb89222559f11609803593ad6b2792a985e6809 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 6 Mar 2020 12:36:04 -0800 Subject: [PATCH 26/33] replace manually setting upstream with left shift operator --- flytekit/common/nodes.py | 1 + flytekit/common/workflow.py | 7 +++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/flytekit/common/nodes.py b/flytekit/common/nodes.py index 27e8efad4a..4a165a699e 100644 --- a/flytekit/common/nodes.py +++ b/flytekit/common/nodes.py @@ -279,6 +279,7 @@ def __lshift__(self, other): def __rshift__(self, other): """ Add a node downstream of this node without necessarily mapping outputs -> inputs. + :param Node other: node to place downstream """ if hash(self) not in set(hash(n) for n in other.upstream_nodes): diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index 7bf3c4b4b7..90942fb76b 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -236,8 +236,11 @@ def promote_from_model(cls, base_model, sub_workflows=None, tasks=None): for n in base_model.nodes: if n.id == _constants.START_NODE_ID or n.id == _constants.END_NODE_ID: continue - child_node = node_map[n.id] - child_node.upstream_nodes[:] = [node_map[upstream_id] for upstream_id in n.upstream_node_ids] + + current = node_map[n.id] + for upstream_id in current.upstream_node_ids: + upstream_node = node_map[upstream_id] + current << upstream_node # No inputs/outputs specified, see the constructor for more information on the overrides. return cls( From f499dd1c5c6dfaefd539d964ed86b47a7ea7794f Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 6 Mar 2020 13:34:17 -0800 Subject: [PATCH 27/33] better looking detection of system nodes --- flytekit/common/workflow.py | 29 ++++++------ .../unit/common_tests/test_workflow.py | 47 ++++++++++++++++--- 2 files changed, 56 insertions(+), 20 deletions(-) diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index 90942fb76b..1dfcf43a58 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -177,8 +177,9 @@ def user_inputs(self): def get_sub_workflows(self): """ - what should the return type of this be? - :rtype: list[] + Recursive call that returns all subworkflows in the current workflow + + :rtype: list[SdkWorkflow] """ result = [] for n in self.nodes: @@ -212,6 +213,14 @@ def fetch(cls, project, domain, name, version=None): sdk_workflow._id = workflow_id return sdk_workflow + @classmethod + def get_non_system_nodes(cls, nodes): + """ + :param list[flytekit.models.core.workflow.Node] nodes: + :rtype: list[flytekit.models.core.workflow.Node] + """ + return [n for n in nodes if n.id not in {_constants.START_NODE_ID, _constants.END_NODE_ID}] + @classmethod def promote_from_model(cls, base_model, sub_workflows=None, tasks=None): """ @@ -223,20 +232,12 @@ def promote_from_model(cls, base_model, sub_workflows=None, tasks=None): relevant TaskTemplates will be fetched from Admin :rtype: SdkWorkflow """ - node_map = {} - for n in base_model.nodes: - if n.id == _constants.START_NODE_ID or n.id == _constants.END_NODE_ID: - # The workflow compilation process done by Admin/Propeller will add a fake start-node to the graph - # so we need to strip them back out here. - continue - else: - node_map[n.id] = _nodes.SdkNode.promote_from_model(n, sub_workflows, tasks) + base_model_non_system_nodes = cls.get_non_system_nodes(base_model.nodes) + node_map = {n.id: _nodes.SdkNode.promote_from_model(n, sub_workflows, tasks) + for n in base_model_non_system_nodes} # Set upstream nodes for each node - for n in base_model.nodes: - if n.id == _constants.START_NODE_ID or n.id == _constants.END_NODE_ID: - continue - + for n in base_model_non_system_nodes: current = node_map[n.id] for upstream_id in current.upstream_node_ids: upstream_node = node_map[upstream_id] diff --git a/tests/flytekit/unit/common_tests/test_workflow.py b/tests/flytekit/unit/common_tests/test_workflow.py index 1164c59b15..bda7ceb80f 100644 --- a/tests/flytekit/unit/common_tests/test_workflow.py +++ b/tests/flytekit/unit/common_tests/test_workflow.py @@ -1,12 +1,14 @@ from __future__ import absolute_import -from flytekit.common import workflow, constants, promise +import pytest as _pytest + +from flytekit.common import workflow, constants, promise, nodes, interface +from flytekit.common.exceptions import user as _user_exceptions from flytekit.common.types import primitives, containers +from flytekit.models import literals as _literals from flytekit.models.core import workflow as _workflow_models, identifier as _identifier +from flytekit.sdk import types as _types from flytekit.sdk.tasks import python_task, inputs, outputs -from flytekit.common.exceptions import user as _user_exceptions - -import pytest as _pytest def test_output(): @@ -18,7 +20,6 @@ def test_output(): def test_workflow(): - @inputs(a=primitives.Integer) @outputs(b=primitives.Integer) @python_task() @@ -260,6 +261,40 @@ def my_list_task(wf_params, a, b): assert n.outputs['scalar_out'].node_id == 'node-id' assert n.outputs['nested_out'].sdk_type.to_flyte_literal_type() == \ - containers.List(containers.List(primitives.Integer)).to_flyte_literal_type() + containers.List(containers.List(primitives.Integer)).to_flyte_literal_type() assert n.outputs['nested_out'].var == 'nested_out' assert n.outputs['nested_out'].node_id == 'node-id' + + +def test_blah(): + @inputs(a=primitives.Integer) + @outputs(b=primitives.Integer) + @python_task() + def my_task(wf_params, a, b): + b.set(a + 1) + + my_task._id = _identifier.Identifier(_identifier.ResourceType.TASK, 'project', 'domain', 'my_task', 'version') + + required_input = promise.Input('required', primitives.Integer) + + n1 = my_task(a=required_input).assign_id_and_return('n1') + + n_start = nodes.SdkNode( + 'start-node', + [], + [ + _literals.Binding( + 'a', + interface.BindingData.from_python_std(_types.Types.Integer.to_flyte_literal_type(), 3) + ) + ], + None, + sdk_task=my_task, + sdk_workflow=None, + sdk_launch_plan=None, + sdk_branch=None + ) + + non_system_nodes = workflow.SdkWorkflow.get_non_system_nodes([n1, n_start]) + assert len(non_system_nodes) == 1 + assert non_system_nodes[0].id == 'n1' From cb8b07790397713b31e7aa6ef11992da1d8d5c6f Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 6 Mar 2020 14:45:06 -0800 Subject: [PATCH 28/33] use a dict instead of a list when searching for workflows and tasks in the promote cycle --- flytekit/common/component_nodes.py | 31 +++++++++---------- flytekit/common/nodes.py | 14 +++++---- flytekit/common/workflow.py | 15 ++++++--- .../common_tests/test_workflow_promote.py | 8 ++--- 4 files changed, 38 insertions(+), 30 deletions(-) diff --git a/flytekit/common/component_nodes.py b/flytekit/common/component_nodes.py index b902985a06..7edee3f8fd 100644 --- a/flytekit/common/component_nodes.py +++ b/flytekit/common/component_nodes.py @@ -39,16 +39,15 @@ def promote_from_model(cls, base_model, tasks): engine. :param flytekit.models.core.workflow.TaskNode base_model: - :param list[flytekit.models.task.TaskTemplate] tasks: + :param dict[flytekit.models.core.identifier.Identifier, flytekit.models.task.TaskTemplate] tasks: :rtype: SdkTaskNode """ from flytekit.common.tasks import task as _task - tasks = tasks or [] - for t in tasks: - if t.id == base_model.reference_id: - _logging.debug("Found existing task template for {}, will not retrieve from Admin".format(t.id)) - sdk_task = _task.SdkTask.promote_from_model(t) - return cls(sdk_task) + if base_model.reference_id in tasks: + t = tasks[base_model.reference_id] + _logging.debug("Found existing task template for {}, will not retrieve from Admin".format(t.id)) + sdk_task = _task.SdkTask.promote_from_model(t) + return cls(sdk_task) # If not found, fetch it from Admin _logging.debug("Fetching task template for {} from Admin".format(base_model.reference_id)) @@ -101,11 +100,12 @@ def sdk_workflow(self): return self._sdk_workflow @classmethod - def promote_from_model(cls, base_model, sub_workflows=None, tasks=None): + def promote_from_model(cls, base_model, sub_workflows, tasks): """ :param flytekit.models.core.workflow.WorkflowNode base_model: - :param list[flytekit.models.core.workflow.WorkflowTemplate] sub_workflows: - :param list[flytekit.models.task.TaskTemplate] tasks: + :param dict[flytekit.models.core.identifier.Identifier, flytekit.models.core.workflow.WorkflowTemplate] + sub_workflows: + :param dict[flytekit.models.core.identifier.Identifier, flytekit.models.task.TaskTemplate] tasks: :rtype: SdkWorkflowNode """ # put the import statement here to prevent circular dependency error @@ -119,13 +119,12 @@ def promote_from_model(cls, base_model, sub_workflows=None, tasks=None): sdk_launch_plan = _launch_plan.SdkLaunchPlan.fetch(project, domain, name, version) return cls(sdk_launch_plan=sdk_launch_plan) elif base_model.sub_workflow_ref is not None: - sub_workflows = sub_workflows or [] # The workflow templates for sub-workflows should have been included in the original response - for sw in sub_workflows: - if sw.id == base_model.reference: - promoted = _workflow.SdkWorkflow.promote_from_model(sw, sub_workflows=sub_workflows, - tasks=tasks) - return cls(sdk_workflow=promoted) + if base_model.reference in sub_workflows: + sw = sub_workflows[base_model.reference] + promoted = _workflow.SdkWorkflow.promote_from_model(sw, sub_workflows=sub_workflows, + tasks=tasks) + return cls(sdk_workflow=promoted) # If not found for some reason, fetch it from Admin again. # The reason there is a warning here but not for tasks is because sub-workflows should always be passed diff --git a/flytekit/common/nodes.py b/flytekit/common/nodes.py index 4a165a699e..2ff6e65771 100644 --- a/flytekit/common/nodes.py +++ b/flytekit/common/nodes.py @@ -63,7 +63,7 @@ def __init__(self, type_map, node): def __getattr__(self, key): if key == 'iteritems' and hasattr(super(ParameterMapper, self), 'items'): - return super(ParameterMapper, self).items + return super(ParameterMapper, self).items if hasattr(super(ParameterMapper, self), key): return getattr(super(ParameterMapper, self), key) if key not in self: @@ -166,12 +166,14 @@ def executable_sdk_object(self): return self._executable_sdk_object @classmethod - def promote_from_model(cls, model, sub_workflows=None, tasks=None): + def promote_from_model(cls, model, sub_workflows, tasks): """ :param flytekit.models.core.workflow.Node model: - :param list[flytekit.models.core.workflow.WorkflowTemplate] sub_workflows: - :param list[flytekit.models.task.TaskTemplate] tasks: If specified, these task templates will be passed to the - SdkTaskNode promote_from_model call, and used instead of fetching from Admin. + :param dict[flytekit.models.core.identifier.Identifier, flytekit.models.core.workflow.WorkflowTemplate] + sub_workflows: + :param dict[flytekit.models.core.identifier.Identifier, flytekit.models.task.TaskTemplate] tasks: If specified, + these task templates will be passed to the SdkTaskNode promote_from_model call, and used + instead of fetching from Admin. :rtype: SdkNode """ id = model.id @@ -185,7 +187,7 @@ def promote_from_model(cls, model, sub_workflows=None, tasks=None): sdk_task_node = _component_nodes.SdkTaskNode.promote_from_model(model.task_node, tasks) elif model.workflow_node is not None: sdk_workflow_node = _component_nodes.SdkWorkflowNode.promote_from_model( - model.workflow_node,sub_workflows, tasks) + model.workflow_node, sub_workflows, tasks) else: raise _system_exceptions.FlyteSystemException("Bad Node model, neither task nor workflow detected") diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index 1dfcf43a58..2ea9d7c6c0 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -209,7 +209,11 @@ def fetch(cls, project, domain, name, version=None): version = version or _internal_config.VERSION.get() workflow_id = _identifier.Identifier(_identifier_model.ResourceType.WORKFLOW, project, domain, name, version) admin_workflow = _engine_loader.get_engine().fetch_workflow(workflow_id) - sdk_workflow = cls.promote_from_model(admin_workflow.closure.compiled_workflow.primary.template) + cwc = admin_workflow.closure.compiled_workflow + primary_template = cwc.primary.template + sub_workflow_map = {sw.template.id: sw.template for sw in cwc.sub_workflows} + task_map = {t.template.id: t.template for t in cwc.tasks} + sdk_workflow = cls.promote_from_model(primary_template, sub_workflow_map, task_map) sdk_workflow._id = workflow_id return sdk_workflow @@ -225,14 +229,17 @@ def get_non_system_nodes(cls, nodes): def promote_from_model(cls, base_model, sub_workflows=None, tasks=None): """ :param flytekit.models.core.workflow.WorkflowTemplate base_model: - :param list[flytekit.models.core.workflow.WorkflowTemplate] sub_workflows: Provide a list of WorkflowTemplate + :param dict[flytekit.models.core.identifier.Identifier, flytekit.models.core.workflow.WorkflowTemplate] + sub_workflows: Provide a list of WorkflowTemplate models (should be returned from Admin as part of the admin CompiledWorkflowClosure. Relevant sub-workflows should always be provided. - :param list[flytekit.models.task.TaskTemplate] tasks: Same as above but for tasks. If tasks are not provided - relevant TaskTemplates will be fetched from Admin + :param dict[flytekit.models.core.identifier.Identifier, flytekit.models.task.TaskTemplate] tasks: Same as above + but for tasks. If tasks are not provided relevant TaskTemplates will be fetched from Admin :rtype: SdkWorkflow """ base_model_non_system_nodes = cls.get_non_system_nodes(base_model.nodes) + sub_workflows = sub_workflows or {} + tasks = tasks or {} node_map = {n.id: _nodes.SdkNode.promote_from_model(n, sub_workflows, tasks) for n in base_model_non_system_nodes} diff --git a/tests/flytekit/unit/common_tests/test_workflow_promote.py b/tests/flytekit/unit/common_tests/test_workflow_promote.py index 4993abb31e..4d60650530 100644 --- a/tests/flytekit/unit/common_tests/test_workflow_promote.py +++ b/tests/flytekit/unit/common_tests/test_workflow_promote.py @@ -178,12 +178,12 @@ def get_compiled_workflow_closure(): return _compiler_model.CompiledWorkflowClosure.from_flyte_idl(cwc_pb) -def test_more_promote(): +def test_subworkflow_promote(): cwc = get_compiled_workflow_closure() primary = cwc.primary - sub_wf_templates = [s.template for s in cwc.sub_workflows] - task_templates = [t.template for t in cwc.tasks] - promoted_wf = _workflow_common.SdkWorkflow.promote_from_model(primary.template, sub_wf_templates, task_templates) + sub_workflow_map = {sw.template.id: sw.template for sw in cwc.sub_workflows} + task_map = {t.template.id: t.template for t in cwc.tasks} + promoted_wf = _workflow_common.SdkWorkflow.promote_from_model(primary.template, sub_workflow_map, task_map) # This file that the promoted_wf reads contains the compiled workflow closure protobuf retrieved from Admin # after registering a workflow that basically looks like the one below. From ed6366d9d2b7f1e97bd16c68a961a5585b549b27 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 6 Mar 2020 14:46:45 -0800 Subject: [PATCH 29/33] version --- flytekit/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/__init__.py b/flytekit/__init__.py index 36196a4628..a232130dbf 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -1,4 +1,4 @@ from __future__ import absolute_import import flytekit.plugins -__version__ = '0.6.0b0' +__version__ = '0.6.0' From 104015c24a4f798716f23eb11de65ec062cc1a7b Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 9 Mar 2020 12:03:32 -0700 Subject: [PATCH 30/33] identifier is empty --- flytekit/common/tasks/task.py | 2 +- flytekit/models/core/identifier.py | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/flytekit/common/tasks/task.py b/flytekit/common/tasks/task.py index 2c5ef14c38..e20faef4b4 100644 --- a/flytekit/common/tasks/task.py +++ b/flytekit/common/tasks/task.py @@ -92,7 +92,7 @@ def promote_from_model(cls, base_model): container=base_model.container ) # Override the newly generated name if one exists in the base model - if base_model.id: + if not base_model.id.is_empty: t._id = _identifier.Identifier.promote_from_model(base_model.id) return t diff --git a/flytekit/models/core/identifier.py b/flytekit/models/core/identifier.py index e678370620..3ed22ad504 100644 --- a/flytekit/models/core/identifier.py +++ b/flytekit/models/core/identifier.py @@ -62,6 +62,13 @@ def version(self): """ return self._version + @property + def is_empty(self): + if self._resource_type == ResourceType.UNSPECIFIED and self._project == '' and self._domain == '' and \ + self._name == '' and self._version == '': + return True + return False + def to_flyte_idl(self): """ :rtype: flyteidl.core.identifier_pb2.NamedEntityIdentifier From fed14fc162e808e4dd4abd0193707e7968ebaf10 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 9 Mar 2020 12:05:24 -0700 Subject: [PATCH 31/33] unit test --- tests/flytekit/unit/models/core/test_identifier.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/flytekit/unit/models/core/test_identifier.py b/tests/flytekit/unit/models/core/test_identifier.py index 7ee19433f2..a5fe6cdf00 100644 --- a/tests/flytekit/unit/models/core/test_identifier.py +++ b/tests/flytekit/unit/models/core/test_identifier.py @@ -64,3 +64,9 @@ def test_workflow_execution_identifier(): assert obj2.project == "project" assert obj2.domain == "domain" assert obj2.name == "name" + +def test_task_execution_identifier(): + empty_id = identifier.Identifier(identifier.ResourceType.UNSPECIFIED, "", "", "", "") + not_empty_id = identifier.Identifier(identifier.ResourceType.UNSPECIFIED, "", "", "", "version") + assert empty_id.is_empty + assert not not_empty_id.is_empty From d6efa71457e9c039b8014342fa06b1bdfda01f14 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 9 Mar 2020 12:11:52 -0700 Subject: [PATCH 32/33] move to FlyteIdlEntity --- flytekit/models/common.py | 4 ++++ flytekit/models/core/identifier.py | 7 ------- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/flytekit/models/common.py b/flytekit/models/common.py index e1487536df..efc9393408 100644 --- a/flytekit/models/common.py +++ b/flytekit/models/common.py @@ -70,6 +70,10 @@ def verbose_string(self): """ return self.short_string() + @property + def is_empty(self): + return len(self.to_flyte_idl().SerializeToString()) == 0 + @_abc.abstractmethod def to_flyte_idl(self): pass diff --git a/flytekit/models/core/identifier.py b/flytekit/models/core/identifier.py index 3ed22ad504..e678370620 100644 --- a/flytekit/models/core/identifier.py +++ b/flytekit/models/core/identifier.py @@ -62,13 +62,6 @@ def version(self): """ return self._version - @property - def is_empty(self): - if self._resource_type == ResourceType.UNSPECIFIED and self._project == '' and self._domain == '' and \ - self._name == '' and self._version == '': - return True - return False - def to_flyte_idl(self): """ :rtype: flyteidl.core.identifier_pb2.NamedEntityIdentifier From e26f349548feb5ff13163b8ac89e47d3b8ac0265 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 9 Mar 2020 13:59:31 -0700 Subject: [PATCH 33/33] 0.6.0b1 --- flytekit/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/__init__.py b/flytekit/__init__.py index a232130dbf..5cb72f0038 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -1,4 +1,4 @@ from __future__ import absolute_import import flytekit.plugins -__version__ = '0.6.0' +__version__ = '0.6.0b1'