Skip to content

Commit

Permalink
[RayCluster] don't allow overriding ray.io/cluster label
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Sy Kim <[email protected]>
  • Loading branch information
andrewsykim committed Nov 19, 2024
1 parent 3e83dbb commit 992dc1b
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 11 deletions.
28 changes: 17 additions & 11 deletions ray-operator/controllers/ray/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,19 +535,25 @@ func labelPod(rayNodeType rayv1.RayNodeType, rayClusterName string, groupName st
}

for k, v := range ret {
if k == string(rayNodeType) {
// overriding invalid values for this label
if v != string(rayv1.HeadNode) && v != string(rayv1.WorkerNode) {
labels[k] = v
}
if _, ok := labels[k]; !ok {
labels[k] = v
continue
}
if k == utils.RayNodeGroupLabelKey {
// overriding invalid values for this label
if v != groupName {
labels[k] = v
}

labelValue := labels[k]
// overriding invalid values for ray.io/node-type
if k == utils.RayNodeLabelKey && labelValue != string(rayv1.HeadNode) && labelValue != string(rayv1.WorkerNode) {
labels[k] = v
}
if _, ok := labels[k]; !ok {

if k == utils.RayNodeGroupLabelKey && labelValue != groupName {
// overriding invalid values for ray.io/group
labels[k] = v
}

if k == utils.RayClusterLabelKey && labelValue != rayClusterName {
// Overriding invalid values for Ray cluster name label
// This label is used as a selector when listing Pods and should always match the Ray cluster name
labels[k] = v
}
}
Expand Down
94 changes: 94 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,100 @@ var _ = Context("Inside the default namespace", func() {
})
})

Describe("RayCluster with invalid overridden ray.io/cluster labels", Ordered, func() {
ctx := context.Background()
namespace := "default"
rayCluster := rayClusterTemplate("raycluster-overridden-cluster-label", namespace)
rayCluster.Spec.HeadGroupSpec.Template.Labels = map[string]string{
utils.RayClusterLabelKey: "invalid-cluster-name",
}
headPods := corev1.PodList{}
workerPods := corev1.PodList{}
workerFilters := common.RayClusterGroupPodsAssociationOptions(rayCluster, rayCluster.Spec.WorkerGroupSpecs[0].GroupName).ToListOptions()
headFilters := common.RayClusterHeadPodsAssociationOptions(rayCluster).ToListOptions()

It("Verify RayCluster spec", func() {
// These test are designed based on the following assumptions:
// (1) The ray.io/cluster label of the HeadGroupSpec is overridden.
// (2) There is only one worker group, and its `replicas` is set to 3.
Expect(rayCluster.Spec.HeadGroupSpec.Template.Labels[utils.RayClusterLabelKey]).NotTo(BeEmpty())
Expect(rayCluster.Spec.WorkerGroupSpecs).To(HaveLen(1))
Expect(rayCluster.Spec.WorkerGroupSpecs[0].Replicas).To(Equal(ptr.To[int32](3)))
})

It("Create a RayCluster custom resource", func() {
err := k8sClient.Create(ctx, rayCluster)
Expect(err).NotTo(HaveOccurred(), "Failed to create RayCluster")
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster),
time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayCluster: %v", rayCluster.Name)
})

It("Check the number of head Pods", func() {
numHeadPods := 1
Eventually(
listResourceFunc(ctx, &headPods, headFilters...),
time.Second*3, time.Millisecond*500).Should(Equal(numHeadPods), fmt.Sprintf("headGroup %v, headFilters: %v", headPods.Items, headFilters))
for _, head := range headPods.Items {
Expect(head.Labels[utils.RayClusterLabelKey]).To(Equal("raycluster-overridden-cluster-label"))
}
})

It("Check the number of worker Pods", func() {
numWorkerPods := 3
Eventually(
listResourceFunc(ctx, &workerPods, workerFilters...),
time.Second*3, time.Millisecond*500).Should(Equal(numWorkerPods), fmt.Sprintf("workerGroup %v", workerPods.Items))
})

It("Update all Pods to Running", func() {
// We need to manually update Pod statuses otherwise they'll always be Pending.
// envtest doesn't create a full K8s cluster. It's only the control plane.
// There's no container runtime or any other K8s controllers.
// So Pods are created, but no controller updates them from Pending to Running.
// See https://book.kubebuilder.io/reference/envtest.html

// Note that this test assumes that headPods and workerPods are up-to-date.
for _, headPod := range headPods.Items {
headPod.Status.Phase = corev1.PodRunning
headPod.Status.PodIP = "1.1.1.1" // This should be carried to rayCluster.Status.Head.ServiceIP. We check it later.
Expect(k8sClient.Status().Update(ctx, &headPod)).Should(Succeed())
}

Eventually(
isAllPodsRunningByFilters).WithContext(ctx).WithArguments(headPods, headFilters).WithTimeout(time.Second*3).WithPolling(time.Millisecond*500).Should(BeTrue(), "Head Pod should be running.")

for _, workerPod := range workerPods.Items {
workerPod.Status.Phase = corev1.PodRunning
Expect(k8sClient.Status().Update(ctx, &workerPod)).Should(Succeed())
}

Eventually(
isAllPodsRunningByFilters).WithContext(ctx).WithArguments(workerPods, workerFilters).WithTimeout(time.Second*3).WithPolling(time.Millisecond*500).Should(BeTrue(), "All worker Pods should be running.")
})

It("RayCluster's .status.state and .status.head.ServiceIP should be updated shortly after all Pods are Running", func() {
// Note that RayCluster is `ready` when all Pods are Running and their PodReady conditions are true.
// However, in envtest, PodReady conditions are automatically set to true when Pod.Status.Phase is set to Running.
// We need to figure out the behavior. See https://github.com/ray-project/kuberay/issues/1736 for more details.
Eventually(
getClusterState(ctx, namespace, rayCluster.Name),
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready))
// Check that the StateTransitionTimes are set.
Eventually(
func() *metav1.Time {
status := getClusterStatus(ctx, namespace, rayCluster.Name)()
return status.StateTransitionTimes[rayv1.Ready]
},
time.Second*3, time.Millisecond*500).Should(Not(BeNil()))

Eventually(func() (string, error) {
err := getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster)()
return rayCluster.Status.Head.ServiceIP, err
}, time.Second*3, time.Millisecond*500).Should(Equal("1.1.1.1"), "Should be able to see the rayCluster.Status.Head.ServiceIP: %v", rayCluster.Status.Head.ServiceIP)
})
})

Describe("RayCluster with autoscaling enabled", Ordered, func() {
ctx := context.Background()
namespace := "default"
Expand Down

0 comments on commit 992dc1b

Please sign in to comment.