Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Apply interruptible override from workflow execution config #minor #429

Merged
merged 5 commits into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v1.0.0
github.com/flyteorg/flyteidl v1.1.0
github.com/flyteorg/flyteplugins v1.0.0
github.com/flyteorg/flytestdlib v1.0.0
github.com/ghodss/yaml v1.0.0
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,9 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v1.0.0 h1:02V/h8cN3TzI6H9kzB2XNKR4XsJDmsGGfDWxbfmRZGs=
github.com/flyteorg/flyteidl v1.0.0/go.mod h1:JW0z1ZaHS9zWvDAwSMIyGhsf+V4zrzBBgh5IuqzMFCM=
github.com/flyteorg/flyteidl v1.1.0 h1:f8tdMXOuorS/d+4Ut2QarfDbdCOriK0S+EnlQzrwz9E=
github.com/flyteorg/flyteidl v1.1.0/go.mod h1:JW0z1ZaHS9zWvDAwSMIyGhsf+V4zrzBBgh5IuqzMFCM=
github.com/flyteorg/flyteplugins v1.0.0 h1:77hUJjiIxBmQ9rd3+cXjSGnzOVAFrSzCd59aIaYFB/8=
github.com/flyteorg/flyteplugins v1.0.0/go.mod h1:4Cpn+9RfanIieTTh2XsuL6zPYXtsR5UDe8YaEmXONT4=
github.com/flyteorg/flytestdlib v1.0.0 h1:gb99ignMsVcNTUmWzArtcIDdkRjyzQQVBkWNOQakiFg=
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/execution_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type ExecutionConfig struct {
// Defines the resource requests and limits specified for tasks run as part of this execution that ought to be
// applied at execution time.
TaskResources TaskResources
// Defines whether a workflow has been flagged as interruptible.
Interruptible *bool
}

type TaskPluginOverride struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/flyteworkflow/v1alpha1/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ type NodeSpec struct {
ActiveDeadline *v1.Duration `json:"activeDeadline,omitempty"`
// The value set to True means task is OK with getting interrupted
// +optional
Interruptibe *bool `json:"interruptible,omitempty"`
Interruptible *bool `json:"interruptible,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

}

func (in *NodeSpec) GetName() string {
Expand All @@ -169,7 +169,7 @@ func (in *NodeSpec) GetActiveDeadline() *time.Duration {
}

func (in *NodeSpec) IsInterruptible() *bool {
return in.Interruptibe
return in.Interruptible
}

func (in *NodeSpec) GetConfig() *typesv1.ConfigMap {
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ func (in *FlyteWorkflow) GetServiceAccountName() string {
}

func (in *FlyteWorkflow) IsInterruptible() bool {
// use execution config override if set (can enable/disable interruptible flag for a single execution)
if in.ExecutionConfig.Interruptible != nil {
return *in.ExecutionConfig.Interruptible
}

// fall back to node defaults if no override was provided
return in.NodeDefaults.Interruptible
}

Expand Down
27 changes: 27 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,30 @@ func TestWorkflowSpec(t *testing.T) {
assert.Equal(t, 7, len(w.GetConnections().Downstream))
assert.Equal(t, 8, len(w.GetConnections().Upstream))
}

func TestWorkflowIsInterruptible(t *testing.T) {
w := &v1alpha1.FlyteWorkflow{}

// no execution spec or metadata defined -> interruptible defaults to false
assert.False(t, w.IsInterruptible())

// marked as interruptible via execution config (e.g. for a single execution)
execConfigInterruptible := true
w.ExecutionConfig.Interruptible = &execConfigInterruptible
assert.True(t, w.IsInterruptible())

// marked as not interruptible via execution config, overwriting node defaults
execConfigInterruptible = false
w.NodeDefaults.Interruptible = true
assert.False(t, w.IsInterruptible())

// marked as interruptible via execution config, overwriting node defaults
execConfigInterruptible = true
w.NodeDefaults.Interruptible = false
assert.True(t, w.IsInterruptible())

// interruptible flag retrieved from node defaults (e.g. workflow definition), no execution config override
w.ExecutionConfig.Interruptible = nil
w.NodeDefaults.Interruptible = true
assert.True(t, w.IsInterruptible())
}
4 changes: 2 additions & 2 deletions pkg/apis/flyteworkflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ tasks:
value: testValue2
- key: testKey3
value: testValue3
- key: testKey1
value: testValue1
- key: testKey2
value: testValue2
- key: testKey3
value: testValue3
- key: testKey1
value: testValue1
- key: testKey2
value: testValue2
- key: testKey3
value: testValue3
image: myflytecontainer:abc123
resources: {}
id:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@
"Storage": "0",
"GPU": "0"
}
}
},
"Interruptible": null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@
"Storage": "0",
"GPU": "0"
}
}
},
"Interruptible": null
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_1.json
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@
"Storage": "0",
"GPU": "0"
}
}
},
"Interruptible": null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@
"Storage": "0",
"GPU": "0"
}
}
},
"Interruptible": null
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_2.json
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@
"Storage": "0",
"GPU": "0"
}
}
},
"Interruptible": null
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_3.json
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@
"Storage": "0",
"GPU": "0"
}
}
},
"Interruptible": null
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_4.json
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@
"Storage": "0",
"GPU": "0"
}
}
},
"Interruptible": null
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_5.json
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@
"Storage": "0",
"GPU": "0"
}
}
},
"Interruptible": null
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_6.json
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@
"Storage": "0",
"GPU": "0"
}
}
},
"Interruptible": null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@
"Storage": "0",
"GPU": "0"
}
}
},
"Interruptible": null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@
"Storage": "0",
"GPU": "0"
}
}
},
"Interruptible": null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@
"Storage": "0",
"GPU": "0"
}
}
},
"Interruptible": null
}
}
2 changes: 1 addition & 1 deletion pkg/compiler/transformers/k8s/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func buildNodeSpec(n *core.Node, tasks []*core.CompiledTask, errs errors.Compile
OutputAliases: toAliasValueArray(n.GetOutputAliases()),
InputBindings: toBindingValueArray(n.GetInputs()),
ActiveDeadline: activeDeadline,
Interruptibe: interruptible,
Interruptible: interruptible,
}

switch v := n.GetTarget().(type) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/compiler/transformers/k8s/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestBuildNodeSpec(t *testing.T) {
specs, ok := buildNodeSpec(n.GetCoreNode(), tasks, errs)
assert.Len(t, specs, expectedInnerNodesCount)
spec := specs[0]
assert.Nil(t, spec.Interruptibe)
assert.Nil(t, spec.Interruptible)
assert.False(t, errs.HasErrors())
assert.True(t, ok)
assert.NotNil(t, spec)
Expand Down
4 changes: 2 additions & 2 deletions pkg/compiler/transformers/k8s/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ func TestBuildFlyteWorkflow(t *testing.T) {
},
nil, nil, "")
assert.Equal(t, true, wf.NodeDefaults.Interruptible)
assert.True(t, *wf.WorkflowSpec.Nodes["n_1"].Interruptibe)
assert.Nil(t, wf.WorkflowSpec.Nodes[common.StartNodeID].Interruptibe)
assert.True(t, *wf.WorkflowSpec.Nodes["n_1"].Interruptible)
assert.Nil(t, wf.WorkflowSpec.Nodes[common.StartNodeID].Interruptible)
assert.Equal(t, "wf-1", wf.Labels[WorkflowNameLabel])
assert.Equal(t, "4", wf.Labels[ShardKeyLabel])
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/dynamic/dynamic_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t
composedPBStore.OnWriteRawMatch(
mock.MatchedBy(func(ctx context.Context) bool { return true }),
storage.DataReference("s3://my-s3-bucket/foo/bar/futures_compiled.pb"),
int64(1429),
int64(1450),
storage.Options{},
mock.MatchedBy(func(rdr *bytes.Reader) bool { return true })).Return(errors.New("foo"))

Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/nodes/node_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const NodeInterruptibleLabel = "interruptible"
type nodeExecMetadata struct {
v1alpha1.Meta
nodeExecID *core.NodeExecutionIdentifier
interrutptible bool
interruptible bool
interruptibleFailureThreshold uint32
nodeLabels map[string]string
}
Expand All @@ -44,7 +44,7 @@ func (e nodeExecMetadata) GetOwnerID() types.NamespacedName {
}

func (e nodeExecMetadata) IsInterruptible() bool {
return e.interrutptible
return e.interruptible
}

func (e nodeExecMetadata) GetInterruptibleFailureThreshold() uint32 {
Expand Down Expand Up @@ -151,7 +151,7 @@ func newNodeExecContext(_ context.Context, store *storage.DataStore, execContext
NodeId: node.GetID(),
ExecutionId: execContext.GetExecutionID().WorkflowExecutionIdentifier,
},
interrutptible: interruptible,
interruptible: interruptible,
interruptibleFailureThreshold: interruptibleFailureThreshold,
}

Expand Down
Loading