Skip to content

Commit

Permalink
[RayService] Avoid sending health check requests to the head Pod when…
Browse files Browse the repository at this point in the history
… `excludeHeadPodFromServeSvc` is true (ray-project#2776)

Signed-off-by: kaihsun <[email protected]>
  • Loading branch information
kevin85421 authored and win5923 committed Jan 20, 2025
1 parent 8371091 commit 3fcedc0
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 20 deletions.
38 changes: 19 additions & 19 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"math"
"os"
"reflect"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -121,7 +120,6 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
originalRayServiceInstance := rayServiceInstance.DeepCopy()

if err := validateRayServiceSpec(rayServiceInstance); err != nil {
logger.Error(err, "The RayService spec is invalid")
r.Recorder.Eventf(rayServiceInstance, corev1.EventTypeWarning, string(utils.InvalidRayServiceSpec),
"The RayService spec is invalid %s/%s: %v", rayServiceInstance.Namespace, rayServiceInstance.Name, err)
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
Expand Down Expand Up @@ -210,7 +208,7 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
if err := r.reconcileServices(ctx, rayServiceInstance, rayClusterInstance, utils.HeadService); err != nil {
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}
if err := r.labelHeadPodForServeStatus(ctx, rayClusterInstance, rayServiceInstance.Spec.ExcludeHeadPodFromServeSvc); err != nil {
if err := r.updateHeadPodServeLabel(ctx, rayClusterInstance, rayServiceInstance.Spec.ExcludeHeadPodFromServeSvc); err != nil {
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}
if err := r.reconcileServices(ctx, rayServiceInstance, rayClusterInstance, utils.ServingService); err != nil {
Expand Down Expand Up @@ -1135,8 +1133,6 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
}
}

// TODO(architkulkarni): Check the RayVersion. If < 2.8.0, error.

if clientURL, err = utils.FetchHeadServiceURL(ctx, r.Client, rayClusterInstance, utils.DashboardPortName); err != nil || clientURL == "" {
return false, err
}
Expand Down Expand Up @@ -1174,7 +1170,11 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
return isReady, nil
}

func (r *RayServiceReconciler) labelHeadPodForServeStatus(ctx context.Context, rayClusterInstance *rayv1.RayCluster, excludeHeadPodFromServeSvc bool) error {
func (r *RayServiceReconciler) updateHeadPodServeLabel(ctx context.Context, rayClusterInstance *rayv1.RayCluster, excludeHeadPodFromServeSvc bool) error {
// `updateHeadPodServeLabel` updates the head Pod's serve label based on the health status of the proxy actor.
// If `excludeHeadPodFromServeSvc` is true, the head Pod will not be used to serve requests, regardless of proxy actor health.
// If `excludeHeadPodFromServeSvc` is false, the head Pod's serve label will be set based on the health check result.
// The label is used by the Kubernetes serve service to determine whether to include the head Pod in the service endpoints.
headPod, err := common.GetRayClusterHeadPod(ctx, r, rayClusterInstance)
if err != nil {
return err
Expand All @@ -1183,29 +1183,29 @@ func (r *RayServiceReconciler) labelHeadPodForServeStatus(ctx context.Context, r
return fmt.Errorf("found 0 head. cluster name %s, namespace %v", rayClusterInstance.Name, rayClusterInstance.Namespace)
}

httpProxyClient := r.httpProxyClientFunc()
httpProxyClient.InitClient()
client := r.httpProxyClientFunc()
client.InitClient()

rayContainer := headPod.Spec.Containers[utils.RayContainerIndex]
servingPort := utils.FindContainerPort(&rayContainer, utils.ServingPortName, utils.DefaultServingPort)
httpProxyClient.SetHostIp(headPod.Status.PodIP, headPod.Namespace, headPod.Name, servingPort)
client.SetHostIp(headPod.Status.PodIP, headPod.Namespace, headPod.Name, servingPort)

if headPod.Labels == nil {
headPod.Labels = make(map[string]string)
}
oldLabel := headPod.Labels[utils.RayClusterServingServiceLabelKey]
newLabel := utils.EnableRayClusterServingServiceFalse

// Make a copy of the labels for comparison later, to decide whether we need to push an update.
originalLabels := make(map[string]string, len(headPod.Labels))
for key, value := range headPod.Labels {
originalLabels[key] = value
}
if err = httpProxyClient.CheckProxyActorHealth(ctx); err == nil && !excludeHeadPodFromServeSvc {
headPod.Labels[utils.RayClusterServingServiceLabelKey] = utils.EnableRayClusterServingServiceTrue
} else {
headPod.Labels[utils.RayClusterServingServiceLabelKey] = utils.EnableRayClusterServingServiceFalse
// If excludeHeadPodFromServeSvc is true, head Pod will not be used to serve requests
// no matter whether the proxy actor is healthy or not. Therefore, only send the health
// check request if excludeHeadPodFromServeSvc is false.
if !excludeHeadPodFromServeSvc {
isHealthy := client.CheckProxyActorHealth(ctx) == nil
newLabel = strconv.FormatBool(isHealthy)
}

if !reflect.DeepEqual(originalLabels, headPod.Labels) {
if oldLabel != newLabel {
headPod.Labels[utils.RayClusterServingServiceLabelKey] = newLabel
if updateErr := r.Update(ctx, headPod); updateErr != nil {
return updateErr
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@ func TestLabelHeadPodForServeStatus(t *testing.T) {
},
}

err := r.labelHeadPodForServeStatus(ctx, &cluster, tc.excludeHeadPodFromServeSvc)
err := r.updateHeadPodServeLabel(ctx, &cluster, tc.excludeHeadPodFromServeSvc)
assert.NoError(t, err)
// Get latest headPod status
headPod, err = common.GetRayClusterHeadPod(ctx, r, &cluster)
Expand Down

0 comments on commit 3fcedc0

Please sign in to comment.