Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit 18a594e

Browse files
authored
persisting k8s plugin phase, version, and reason (#331)
* using PhaseVersion to ensure Reason updates are sent Signed-off-by: Daniel Rammer <[email protected]> * refactored and fixed tests Signed-off-by: Daniel Rammer <[email protected]> * fixed linter Signed-off-by: Daniel Rammer <[email protected]> * added docs Signed-off-by: Daniel Rammer <[email protected]> * corrected missing old usecase Signed-off-by: Daniel Rammer <[email protected]> * actually this will work Signed-off-by: Daniel Rammer <[email protected]> * added missing increment on phsae version - thanks yee Signed-off-by: Daniel Rammer <[email protected]> --------- Signed-off-by: Daniel Rammer <[email protected]>
1 parent 60d345d commit 18a594e

File tree

7 files changed

+124
-26
lines changed

7 files changed

+124
-26
lines changed

go/tasks/pluginmachinery/core/phase.go

+10
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,16 @@ func (p PhaseInfo) Err() *core.ExecutionError {
139139
return p.err
140140
}
141141

142+
func (p PhaseInfo) WithVersion(version uint32) PhaseInfo {
143+
return PhaseInfo{
144+
phase: p.phase,
145+
version: version,
146+
info: p.info,
147+
err: p.err,
148+
reason: p.reason,
149+
}
150+
}
151+
142152
func (p PhaseInfo) String() string {
143153
if p.err != nil {
144154
return fmt.Sprintf("Phase<%s:%d Error:%s>", p.phase, p.version, p.err)

go/tasks/pluginmachinery/k8s/mocks/plugin_context.go

+34
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/tasks/pluginmachinery/k8s/plugin.go

+14
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,20 @@ type PluginContext interface {
6868

6969
// Returns a handle to the Task's execution metadata.
7070
TaskExecutionMetadata() pluginsCore.TaskExecutionMetadata
71+
72+
// Returns a reader that retrieves previously stored plugin internal state. the state itself is immutable
73+
PluginStateReader() pluginsCore.PluginStateReader
74+
}
75+
76+
// PluginState defines the state of a k8s plugin. This information must be maintained between propeller evaluations to
77+
// determine if there have been any updates since the previously evaluation.
78+
type PluginState struct {
79+
// Phase is the plugin phase.
80+
Phase pluginsCore.Phase
81+
// PhaseVersion is an number used to indicate reportable changes to state that have the same phase.
82+
PhaseVersion uint32
83+
// Reason is the message explaining the purpose for being in the reported state.
84+
Reason string
7185
}
7286

7387
// Defines a simplified interface to author plugins for k8s resources.

go/tasks/plugins/array/k8s/management_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,16 @@ func getMockTaskExecutionContext(ctx context.Context, parallelism int) *mocks.Ta
129129
ReferenceConstructor: &storage.URLPathConstructor{},
130130
}
131131

132+
pluginStateReader := &mocks.PluginStateReader{}
133+
pluginStateReader.OnGetMatch(mock.Anything).Return(0, nil)
134+
132135
tCtx := &mocks.TaskExecutionContext{}
133136
tCtx.OnTaskReader().Return(tr)
134137
tCtx.OnTaskExecutionMetadata().Return(tMeta)
135138
tCtx.OnOutputWriter().Return(ow)
136139
tCtx.OnInputReader().Return(ir)
137140
tCtx.OnDataStore().Return(dataStore)
141+
tCtx.OnPluginStateReader().Return(pluginStateReader)
138142
return tCtx
139143
}
140144

go/tasks/plugins/k8s/pod/container_test.go

+19-6
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@ func dummyContainerTaskContext(resources *v1.ResourceRequirements, command []str
102102
taskCtx.OnTaskReader().Return(taskReader)
103103

104104
taskCtx.OnTaskExecutionMetadata().Return(dummyTaskMetadata)
105+
106+
pluginStateReader := &pluginsCoreMock.PluginStateReader{}
107+
pluginStateReader.OnGetMatch(mock.Anything).Return(0, nil)
108+
taskCtx.OnPluginStateReader().Return(pluginStateReader)
109+
105110
return taskCtx
106111
}
107112

@@ -140,28 +145,32 @@ func TestContainerTaskExecutor_BuildResource(t *testing.T) {
140145
}
141146

142147
func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) {
148+
command := []string{"command"}
149+
args := []string{"{{.Input}}"}
150+
taskCtx := dummyContainerTaskContext(containerResourceRequirements, command, args)
151+
143152
j := &v1.Pod{
144153
Status: v1.PodStatus{},
145154
}
146155

147156
ctx := context.TODO()
148157
t.Run("running", func(t *testing.T) {
149158
j.Status.Phase = v1.PodRunning
150-
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j)
159+
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j)
151160
assert.NoError(t, err)
152161
assert.Equal(t, pluginsCore.PhaseRunning, phaseInfo.Phase())
153162
})
154163

155164
t.Run("queued", func(t *testing.T) {
156165
j.Status.Phase = v1.PodPending
157-
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j)
166+
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j)
158167
assert.NoError(t, err)
159168
assert.Equal(t, pluginsCore.PhaseQueued, phaseInfo.Phase())
160169
})
161170

162171
t.Run("failNoCondition", func(t *testing.T) {
163172
j.Status.Phase = v1.PodFailed
164-
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j)
173+
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j)
165174
assert.NoError(t, err)
166175
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
167176
ec := phaseInfo.Err().GetCode()
@@ -177,7 +186,7 @@ func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) {
177186
Type: v1.PodReasonUnschedulable,
178187
},
179188
}
180-
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j)
189+
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j)
181190
assert.NoError(t, err)
182191
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
183192
ec := phaseInfo.Err().GetCode()
@@ -186,7 +195,7 @@ func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) {
186195

187196
t.Run("success", func(t *testing.T) {
188197
j.Status.Phase = v1.PodSucceeded
189-
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j)
198+
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j)
190199
assert.NoError(t, err)
191200
assert.NotNil(t, phaseInfo)
192201
assert.Equal(t, pluginsCore.PhaseSuccess, phaseInfo.Phase())
@@ -199,6 +208,10 @@ func TestContainerTaskExecutor_GetProperties(t *testing.T) {
199208
}
200209

201210
func TestContainerTaskExecutor_GetTaskStatus_InvalidImageName(t *testing.T) {
211+
command := []string{"command"}
212+
args := []string{"{{.Input}}"}
213+
taskCtx := dummyContainerTaskContext(containerResourceRequirements, command, args)
214+
202215
ctx := context.TODO()
203216
reason := "InvalidImageName"
204217
message := "Failed to apply default image tag \"TEST/flyteorg/myapp:latest\": couldn't parse image reference" +
@@ -230,7 +243,7 @@ func TestContainerTaskExecutor_GetTaskStatus_InvalidImageName(t *testing.T) {
230243

231244
t.Run("failInvalidImageName", func(t *testing.T) {
232245
pendingPod.Status.Phase = v1.PodPending
233-
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, pendingPod)
246+
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, pendingPod)
234247
finalReason := fmt.Sprintf("|%s", reason)
235248
finalMessage := fmt.Sprintf("|%s", message)
236249
assert.NoError(t, err)

go/tasks/plugins/k8s/pod/plugin.go

+39-20
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,12 @@ func (p plugin) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContex
145145
}
146146

147147
func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.PluginContext, r client.Object, logPlugin tasklog.Plugin, logSuffix string) (pluginsCore.PhaseInfo, error) {
148+
pluginState := k8s.PluginState{}
149+
_, err := pluginContext.PluginStateReader().Get(&pluginState)
150+
if err != nil {
151+
return pluginsCore.PhaseInfoUndefined, err
152+
}
153+
148154
pod := r.(*v1.Pod)
149155

150156
transitionOccurredAt := flytek8s.GetLastTransitionOccurredAt(pod).Time
@@ -166,36 +172,49 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin
166172
info.Logs = taskLogs
167173
}
168174

175+
phaseInfo := pluginsCore.PhaseInfoUndefined
169176
switch pod.Status.Phase {
170177
case v1.PodSucceeded:
171-
return flytek8s.DemystifySuccess(pod.Status, info)
178+
phaseInfo, err = flytek8s.DemystifySuccess(pod.Status, info)
172179
case v1.PodFailed:
173-
return flytek8s.DemystifyFailure(pod.Status, info)
180+
phaseInfo, err = flytek8s.DemystifyFailure(pod.Status, info)
174181
case v1.PodPending:
175-
return flytek8s.DemystifyPending(pod.Status)
182+
phaseInfo, err = flytek8s.DemystifyPending(pod.Status)
176183
case v1.PodReasonUnschedulable:
177-
return pluginsCore.PhaseInfoQueued(transitionOccurredAt, pluginsCore.DefaultPhaseVersion, "pod unschedulable"), nil
184+
phaseInfo = pluginsCore.PhaseInfoQueued(transitionOccurredAt, pluginsCore.DefaultPhaseVersion, "pod unschedulable")
178185
case v1.PodUnknown:
179-
return pluginsCore.PhaseInfoUndefined, nil
180-
}
181-
182-
primaryContainerName, exists := r.GetAnnotations()[flytek8s.PrimaryContainerKey]
183-
if !exists {
184-
// if the primary container annotation dos not exist, then the task requires all containers
185-
// to succeed to declare success. therefore, if the pod is not in one of the above states we
186-
// fallback to declaring the task as 'running'.
187-
if len(info.Logs) > 0 {
188-
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion+1, &info), nil
186+
// DO NOTHING
187+
default:
188+
primaryContainerName, exists := r.GetAnnotations()[flytek8s.PrimaryContainerKey]
189+
if !exists {
190+
// if the primary container annotation dos not exist, then the task requires all containers
191+
// to succeed to declare success. therefore, if the pod is not in one of the above states we
192+
// fallback to declaring the task as 'running'.
193+
phaseInfo = pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, &info)
194+
if len(info.Logs) > 0 {
195+
phaseInfo = phaseInfo.WithVersion(pluginsCore.DefaultPhaseVersion + 1)
196+
}
197+
} else {
198+
// if the primary container annotation exists, we use the status of the specified container
199+
phaseInfo = flytek8s.DeterminePrimaryContainerPhase(primaryContainerName, pod.Status.ContainerStatuses, &info)
200+
if phaseInfo.Phase() == pluginsCore.PhaseRunning && len(info.Logs) > 0 {
201+
phaseInfo = phaseInfo.WithVersion(pluginsCore.DefaultPhaseVersion + 1)
202+
}
189203
}
190-
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, &info), nil
191204
}
192205

193-
// if the primary container annotation exists, we use the status of the specified container
194-
primaryContainerPhase := flytek8s.DeterminePrimaryContainerPhase(primaryContainerName, pod.Status.ContainerStatuses, &info)
195-
if primaryContainerPhase.Phase() == pluginsCore.PhaseRunning && len(info.Logs) > 0 {
196-
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion+1, primaryContainerPhase.Info()), nil
206+
if err != nil {
207+
return pluginsCore.PhaseInfoUndefined, err
208+
} else if phaseInfo.Phase() != pluginsCore.PhaseRunning && phaseInfo.Phase() == pluginState.Phase &&
209+
phaseInfo.Version() <= pluginState.PhaseVersion && phaseInfo.Reason() != pluginState.Reason {
210+
211+
// if we have the same Phase as the previous evaluation and updated the Reason but not the PhaseVersion we must
212+
// update the PhaseVersion so an event is sent to reflect the Reason update. this does not handle the Running
213+
// Phase because the legacy used `DefaultPhaseVersion + 1` which will only increment to 1.
214+
phaseInfo = phaseInfo.WithVersion(pluginState.PhaseVersion + 1)
197215
}
198-
return primaryContainerPhase, nil
216+
217+
return phaseInfo, err
199218
}
200219

201220
func (plugin) GetProperties() k8s.PluginProperties {

go/tasks/plugins/k8s/pod/sidecar_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ func getDummySidecarTaskContext(taskTemplate *core.TaskTemplate, resources *v1.R
120120

121121
taskCtx.OnTaskExecutionMetadata().Return(dummyTaskMetadata)
122122

123+
pluginStateReader := &pluginsCoreMock.PluginStateReader{}
124+
pluginStateReader.OnGetMatch(mock.Anything).Return(0, nil)
125+
taskCtx.OnPluginStateReader().Return(pluginStateReader)
126+
123127
return taskCtx
124128
}
125129

0 commit comments

Comments
 (0)