Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RayService] More envtests that follow the most common scenario in the RayService code path #2880

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 222 additions & 0 deletions ray-operator/controllers/ray/rayservice_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package ray
import (
"context"
"fmt"
"math/rand/v2"
"strconv"
"time"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
Expand Down Expand Up @@ -627,4 +629,224 @@ var _ = Context("RayService env tests", func() {
time.Second*60, time.Millisecond*500).Should(Equal(pendingRayClusterName), "My current RayCluster name = %v", rayService.Status.ActiveServiceStatus.RayClusterName)
})
})

Describe("After a RayService is running", func() {
ctx := context.Background()
var rayService *rayv1.RayService
var rayCluster *rayv1.RayCluster

BeforeEach(OncePerOrdered, func() {
// This simulates the most common scenario in the RayService code path:
// (1) Create a RayService custom resource
// (2) The RayService controller creates a pending RayCluster
// (3) The serve application becomes ready on the pending RayCluster
// (4) The Kubernetes head and serve services are created
// (5) The pending RayCluster transitions to become the active RayCluster
namespace := "default"
serveAppName := "app1"
rayService = rayServiceTemplate("test-base-path-"+strconv.Itoa(rand.IntN(1000)), namespace, serveAppName) //nolint:gosec // no need for cryptographically secure random number
rayCluster = &rayv1.RayCluster{}

By("Create a RayService custom resource")
err := k8sClient.Create(ctx, rayService)
Expect(err).NotTo(HaveOccurred(), "failed to create RayService resource")
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: rayService.Name, Namespace: namespace}, rayService),
time.Second*3, time.Millisecond*500).Should(BeNil(), "RayService: %v", rayService.Name)

By("Conditions should be initialized correctly")
Eventually(
func() bool {
return meta.IsStatusConditionTrue(rayService.Status.Conditions, string(rayv1.UpgradeInProgress))
},
time.Second*3, time.Millisecond*500).Should(BeFalse(), "UpgradeInProgress condition: %v", rayService.Status.Conditions)
Eventually(
func() bool {
return meta.IsStatusConditionTrue(rayService.Status.Conditions, string(rayv1.RayServiceReady))
},
time.Second*3, time.Millisecond*500).Should(BeFalse(), "RayServiceReady condition: %v", rayService.Status.Conditions)

By("Should create a pending RayCluster")
Eventually(
getPreparingRayClusterNameFunc(ctx, rayService),
time.Second*15, time.Millisecond*500).Should(Not(BeEmpty()), "Pending RayCluster name: %v", rayService.Status.PendingServiceStatus.RayClusterName)

By("Promote the pending RayCluster to the active RayCluster")
// Update the status of the head Pod to Running. Note that the default fake dashboard client
// will return a healthy serve application status.
pendingRayClusterName := rayService.Status.PendingServiceStatus.RayClusterName
updateHeadPodToRunningAndReady(ctx, pendingRayClusterName, namespace)

// Make sure the pending RayCluster becomes the active RayCluster.
Eventually(
getRayClusterNameFunc(ctx, rayService),
time.Second*15, time.Millisecond*500).Should(Equal(pendingRayClusterName), "Active RayCluster name: %v", rayService.Status.ActiveServiceStatus.RayClusterName)

// Initialize RayCluster for the following tests.
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: rayService.Status.ActiveServiceStatus.RayClusterName, Namespace: namespace}, rayCluster),
time.Second*3, time.Millisecond*500).Should(BeNil(), "RayCluster: %v", rayCluster.Name)

By("Check the serve application status in the RayService status")
// Check the serve application status in the RayService status.
// The serve application should be healthy.
Eventually(
checkServiceHealth(ctx, rayService),
time.Second*3, time.Millisecond*500).Should(BeTrue(), "RayService status: %v", rayService.Status)

By("Should create a new head service resource")
svc := &corev1.Service{}
headSvcName, err := utils.GenerateHeadServiceName(utils.RayServiceCRD, rayService.Spec.RayClusterSpec, rayService.Name)
Expect(err).ToNot(HaveOccurred())
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: headSvcName, Namespace: namespace}, svc),
time.Second*3, time.Millisecond*500).Should(BeNil(), "Head service: %v", svc)
// TODO: Verify the head service by checking labels and annotations.

By("Should create a new serve service resource")
svc = &corev1.Service{}
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: utils.GenerateServeServiceName(rayService.Name), Namespace: namespace}, svc),
time.Second*3, time.Millisecond*500).Should(BeNil(), "Serve service: %v", svc)
// TODO: Verify the serve service by checking labels and annotations.

By("The RayServiceReady condition should be true when the number of endpoints is greater than 0")
endpoints := endpointsTemplate(utils.GenerateServeServiceName(rayService.Name), namespace)
err = k8sClient.Create(ctx, endpoints)
Expect(err).NotTo(HaveOccurred(), "failed to create Endpoints resource")
Eventually(func() int32 {
if err := k8sClient.Get(ctx, client.ObjectKey{Name: rayService.Name, Namespace: namespace}, rayService); err != nil {
return 0
}
return rayService.Status.NumServeEndpoints
}, time.Second*3, time.Millisecond*500).Should(BeNumerically(">", 0), "RayService status: %v", rayService.Status)
Expect(meta.IsStatusConditionTrue(rayService.Status.Conditions, string(rayv1.RayServiceReady))).Should(BeTrue())
})

AfterEach(OncePerOrdered, func() {
err := k8sClient.Delete(ctx, rayService)
Expect(err).NotTo(HaveOccurred(), "failed to delete the test RayService resource")
})

When("updating the serveConfigV2", Ordered, func() {
var newConfigV2 string

BeforeAll(func() {
newConfigV2 = serveConfigV2Template("newAppName")
rayService.Spec.ServeConfigV2 = newConfigV2
err := k8sClient.Update(ctx, rayService)
Expect(err).NotTo(HaveOccurred(), "failed to update RayService resource")
})

It("should create an UpdatedServeApplications event", func() {
var eventList corev1.EventList
listOpts := []client.ListOption{
client.InNamespace(rayService.Namespace),
client.MatchingFields{
"involvedObject.uid": string(rayService.UID),
"reason": string(utils.UpdatedServeApplications),
},
}
Eventually(func() int {
err := k8sClient.List(ctx, &eventList, listOpts...)
Expect(err).NotTo(HaveOccurred(), "failed to list events")
return len(eventList.Items)
}, time.Second*15, time.Millisecond*500).Should(Equal(1))
})

It("refreshes rayService", func() {
err := k8sClient.Get(ctx, client.ObjectKey{Name: rayService.Name, Namespace: rayService.Namespace}, rayService)
Expect(err).NotTo(HaveOccurred(), "failed to get RayService resource")
})

It("doesn't create a new pending cluster", func() {
Expect(rayService.Status.PendingServiceStatus.RayClusterName).To(BeEmpty())
})

It("doesn't switch to a new active cluster", func() {
Expect(rayService.Status.ActiveServiceStatus.RayClusterName).To(Equal(rayCluster.Name))
})
})

When("adding a new worker group", Ordered, func() {
BeforeAll(func() {
newWorkerGroupSpec := rayService.Spec.RayClusterSpec.WorkerGroupSpecs[0].DeepCopy()
newWorkerGroupSpecs := []rayv1.WorkerGroupSpec{*newWorkerGroupSpec, *newWorkerGroupSpec}
newWorkerGroupSpecs[1].GroupName = "worker-group-to-active-cluster"

rayService.Spec.RayClusterSpec.WorkerGroupSpecs = newWorkerGroupSpecs
err := k8sClient.Update(ctx, rayService)
Expect(err).NotTo(HaveOccurred(), "failed to update test RayService resource")
})

It("reflects the changes in the active cluster's WorkerGroupSpecs", func() {
Eventually(
getActiveRayClusterWorkerGroupSpecsFunc(ctx, rayService),
time.Second*15, time.Millisecond*500).Should(HaveLen(2))
})

It("doesn't create a new pending cluster", func() {
Expect(rayService.Status.PendingServiceStatus.RayClusterName).To(BeEmpty())
})

It("doesn't switch to a new active cluster", func() {
Expect(rayService.Status.ActiveServiceStatus.RayClusterName).To(Equal(rayCluster.Name))
})
})

When("during the zero-downtime upgrade", func() {
var pendingClusterName string

BeforeEach(OncePerOrdered, func() {
rayService.Spec.RayClusterSpec.RayVersion += "-next"
err := k8sClient.Update(ctx, rayService)
Expect(err).NotTo(HaveOccurred(), "failed to update RayService resource")

Eventually(
getPreparingRayClusterNameFunc(ctx, rayService),
time.Second*15, time.Millisecond*500).Should(Not(BeEmpty()), "Pending RayCluster name: %v", rayService.Status.PendingServiceStatus.RayClusterName)
pendingClusterName = rayService.Status.PendingServiceStatus.RayClusterName
})

When("updating the RayVersion again", Ordered, func() {
BeforeAll(func() {
rayService.Spec.RayClusterSpec.RayVersion += "-next2"
err := k8sClient.Update(ctx, rayService)
Expect(err).NotTo(HaveOccurred(), "failed to update RayService resource")
})

It("doesn't create a new pending cluster", func() {
Consistently(
getPreparingRayClusterNameFunc(ctx, rayService),
time.Second*10, time.Millisecond*500).Should(Equal(pendingClusterName), "Pending RayCluster name = %v", rayService.Status.PendingServiceStatus.RayClusterName)
})

It("doesn't switch to a new active cluster", func() {
Expect(rayService.Status.ActiveServiceStatus.RayClusterName).To(Equal(rayCluster.Name))
})
})

When("adding a new worker group", Ordered, func() {
BeforeAll(func() {
newWorkerGroupSpec := rayService.Spec.RayClusterSpec.WorkerGroupSpecs[0].DeepCopy()
newWorkerGroupSpecs := []rayv1.WorkerGroupSpec{*newWorkerGroupSpec, *newWorkerGroupSpec}
newWorkerGroupSpecs[1].GroupName = "worker-group-to-pending-cluster"

rayService.Spec.RayClusterSpec.WorkerGroupSpecs = newWorkerGroupSpecs
err := k8sClient.Update(ctx, rayService)
Expect(err).NotTo(HaveOccurred(), "failed to update test RayService resource")
})

It("reflects the changes in the pending cluster's WorkerGroupSpecs", func() {
Eventually(
getPendingRayClusterWorkerGroupSpecsFunc(ctx, rayService),
time.Second*15, time.Millisecond*500).Should(HaveLen(2))
})

It("doesn't switch to a new active cluster", func() {
Expect(rayService.Status.ActiveServiceStatus.RayClusterName).To(Equal(rayCluster.Name))
})
})
})
})
})
Loading