Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit 3a6cc0b

Browse files
authored
Merge b80c8a9 into 4f02c2c
2 parents 4f02c2c + b80c8a9 commit 3a6cc0b

File tree

5 files changed

+88
-3
lines changed

5 files changed

+88
-3
lines changed

go/tasks/pluginmachinery/flytek8s/pod_helper.go

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ const SIGKILL = 137
3131
const defaultContainerTemplateName = "default"
3232
const primaryContainerTemplateName = "primary"
3333
const PrimaryContainerKey = "primary_container_name"
34+
const Sidecar = "sidecar"
3435

3536
// ApplyInterruptibleNodeSelectorRequirement configures the node selector requirement of the node-affinity using the configuration specified.
3637
func ApplyInterruptibleNodeSelectorRequirement(interruptible bool, affinity *v1.Affinity) {

go/tasks/plugins/array/k8s/management.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
187187
phaseInfo = core.PhaseInfoWaitingForResourcesInfo(time.Now(), core.DefaultPhaseVersion, "Exceeded ResourceManager quota", nil)
188188
} else {
189189
phaseInfo, perr = launchSubtask(ctx, stCtx, config, kubeClient)
190-
190+
logger.Infof(ctx, "Failed to launch subtask with error [%s]", perr)
191191
// if launchSubtask fails we attempt to deallocate the (previously allocated)
192192
// resource to mitigate leaks
193193
if perr != nil {

go/tasks/plugins/array/k8s/subtask.go

+21-1
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ import (
88
"strings"
99
"time"
1010

11+
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"
12+
1113
"github.com/flyteorg/flyteplugins/go/tasks/errors"
1214
pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
13-
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"
1415
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
1516
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s"
1617
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/tasklog"
@@ -159,6 +160,20 @@ func clearFinalizers(ctx context.Context, o client.Object, kubeClient pluginsCor
159160
return nil
160161
}
161162

163+
// updateCopilotArgs append array index to the end of the output prefix
164+
func updateCopilotArgs(pod *v1.Pod, stCtx SubTaskExecutionContext) {
165+
for sidecarIndex, container := range pod.Spec.Containers {
166+
if container.Name == config.GetK8sPluginConfig().CoPilot.NamePrefix+flytek8s.Sidecar {
167+
for i, arg := range pod.Spec.Containers[sidecarIndex].Args {
168+
if arg == "--to-output-prefix" {
169+
pod.Spec.Containers[sidecarIndex].Args[i+1] = fmt.Sprintf("%s/%s", pod.Spec.Containers[sidecarIndex].Args[i+1], strconv.Itoa(stCtx.originalIndex))
170+
}
171+
}
172+
break
173+
}
174+
}
175+
}
176+
162177
// launchSubtask creates a k8s pod defined by the SubTaskExecutionContext and Config.
163178
func launchSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Config, kubeClient pluginsCore.KubeClient) (pluginsCore.PhaseInfo, error) {
164179
o, err := podPlugin.DefaultPodPlugin.BuildResource(ctx, stCtx)
@@ -187,6 +202,7 @@ func launchSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Conf
187202
})
188203

189204
pod.Spec.Containers[containerIndex].Env = append(pod.Spec.Containers[containerIndex].Env, arrayJobEnvVars...)
205+
updateCopilotArgs(pod, stCtx)
190206

191207
logger.Infof(ctx, "Creating Object: Type:[%v], Object:[%v/%v]", pod.GetObjectKind().GroupVersionKind(), pod.GetNamespace(), pod.GetName())
192208
err = kubeClient.GetClient().Create(ctx, pod)
@@ -330,6 +346,10 @@ func getTaskContainerIndex(pod *v1.Pod) (int, error) {
330346
if len(pod.Spec.Containers) == 1 {
331347
return 0, nil
332348
}
349+
// Copilot is always the second container if it is enabled.
350+
if len(pod.Spec.Containers) == 2 && pod.Spec.Containers[1].Name == config.GetK8sPluginConfig().CoPilot.NamePrefix+flytek8s.Sidecar {
351+
return 0, nil
352+
}
333353
// For tasks with a K8sPod task target, they may produce multiple containers but at least one must be the designated primary.
334354
return -1, stdErrors.Errorf(ErrBuildPodTemplate, "Expected a specified primary container key when building an array job with a K8sPod spec target")
335355

go/tasks/plugins/array/k8s/subtask_exec_context_test.go

+25-1
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@ package k8s
33
import (
44
"context"
55
"fmt"
6+
67
"testing"
78

89
podPlugin "github.com/flyteorg/flyteplugins/go/tasks/plugins/k8s/pod"
10+
v1 "k8s.io/api/core/v1"
911

1012
"github.com/flyteorg/flytestdlib/storage"
11-
1213
"github.com/stretchr/testify/assert"
1314
)
1415

@@ -36,3 +37,26 @@ func TestSubTaskExecutionContext(t *testing.T) {
3637
assert.Equal(t, storage.DataReference("/prefix/"), stCtx.OutputWriter().GetOutputPrefixPath())
3738
assert.Equal(t, storage.DataReference("/raw_prefix/5/1"), stCtx.OutputWriter().GetRawOutputPrefix())
3839
}
40+
41+
func TestUpdateCopilotArgs(t *testing.T) {
42+
pod := &v1.Pod{
43+
Spec: v1.PodSpec{
44+
Containers: []v1.Container{
45+
{
46+
Name: "flyte-copilot-sidecar",
47+
Args: []string{"--to-output-prefix", "s3://bucket/key"},
48+
},
49+
},
50+
},
51+
}
52+
ctx := context.Background()
53+
54+
tCtx := getMockTaskExecutionContext(ctx, 0)
55+
taskTemplate, err := tCtx.TaskReader().Read(ctx)
56+
assert.Nil(t, err)
57+
58+
stCtx, err := NewSubTaskExecutionContext(ctx, tCtx, taskTemplate, 0, 5, uint64(1), uint64(0))
59+
assert.Nil(t, err)
60+
updateCopilotArgs(pod, stCtx)
61+
assert.Equal(t, pod.Spec.Containers[0].Args[1], "s3://bucket/key/5")
62+
}
+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package k8s
2+
3+
import (
4+
"testing"
5+
6+
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"
7+
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
8+
"gotest.tools/assert"
9+
v1 "k8s.io/api/core/v1"
10+
)
11+
12+
func TestGetTaskContainerTask(t *testing.T) {
13+
pod := &v1.Pod{
14+
Spec: v1.PodSpec{
15+
Containers: []v1.Container{
16+
{
17+
Name: "PrimaryContainer",
18+
},
19+
},
20+
},
21+
}
22+
index, err := getTaskContainerIndex(pod)
23+
assert.NilError(t, err)
24+
assert.Equal(t, index, 0)
25+
26+
pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{Name: config.GetK8sPluginConfig().CoPilot.NamePrefix + flytek8s.Sidecar})
27+
index, err = getTaskContainerIndex(pod)
28+
assert.NilError(t, err)
29+
assert.Equal(t, index, 0)
30+
31+
pod.Annotations = map[string]string{flytek8s.PrimaryContainerKey: "PrimaryContainer"}
32+
index, err = getTaskContainerIndex(pod)
33+
assert.NilError(t, err)
34+
assert.Equal(t, index, 0)
35+
36+
pod.Spec.Containers[0].Name = "SecondaryContainer"
37+
_, err = getTaskContainerIndex(pod)
38+
assert.ErrorContains(t, err, "Couldn't find any container matching the primary container")
39+
40+
}

0 commit comments

Comments
 (0)