Skip to content

Commit

Permalink
Optimize the logic of the node leases
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Jan 8, 2024
1 parent e6a618b commit a118781
Show file tree
Hide file tree
Showing 22 changed files with 1,459 additions and 378 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ jobs:
./hack/e2e-test.sh kwokctl/kwokctl_${{ matrix.kwokctl-runtime }}_metric
- name: Test Benchmark
if: ${{ matrix.os == 'ubuntu-latest' && matrix.kwokctl-runtime != 'kind' && matrix.kwokctl-runtime != 'kind-podman' }}
if: ${{ matrix.os == 'ubuntu-latest' && matrix.kwokctl-runtime == 'binary' }}
shell: bash
run: |
./hack/e2e-test.sh kwokctl/kwokctl_${{ matrix.kwokctl-runtime }}_benchmark
Expand Down
88 changes: 51 additions & 37 deletions pkg/kwok/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,22 +111,22 @@ type Controller struct {
broadcaster record.EventBroadcaster
recorder record.EventRecorder

nodeCacheGetter informer.Getter[*corev1.Node]
podCacheGetter informer.Getter[*corev1.Pod]
nodeCacheGetter informer.Getter[*corev1.Node]
podCacheGetter informer.Getter[*corev1.Pod]
nodeLeaseCacheGetter informer.Getter[*coordinationv1.Lease]

onLeaseNodeManageFunc func(nodeName string)
onNodeManagedFunc func(nodeName string)
readOnlyFunc func(nodeName string) bool
onNodeManagedFunc func(nodeName string)
onNodeUnmanagedFunc func(nodeName string)
readOnlyFunc func(nodeName string) bool

manageNodesWithLabelSelector string
manageNodesWithAnnotationSelector string
manageNodesWithFieldSelector string
manageNodeLeasesWithFieldSelector string
managePodsWithFieldSelector string

nodeLeasesChan chan informer.Event[*coordinationv1.Lease]
nodeChan chan informer.Event[*corev1.Node]
podsChan chan informer.Event[*corev1.Pod]
nodesChan chan informer.Event[*corev1.Node]
podsChan chan informer.Event[*corev1.Pod]

nodeLeasesInformer *informer.Informer[*coordinationv1.Lease, *coordinationv1.LeaseList]
nodesInformer *informer.Informer[*corev1.Node, *corev1.NodeList]
Expand Down Expand Up @@ -225,7 +225,7 @@ func (c *Controller) init(ctx context.Context) (err error) {
c.recorder = c.broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "kwok_controller"})
c.broadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: c.conf.TypedClient.CoreV1().Events("")})

c.nodeChan = make(chan informer.Event[*corev1.Node], 1)
c.nodesChan = make(chan informer.Event[*corev1.Node], 1)
c.podsChan = make(chan informer.Event[*corev1.Pod], 1)

nodesCli := c.conf.TypedClient.CoreV1().Nodes()
Expand All @@ -234,7 +234,7 @@ func (c *Controller) init(ctx context.Context) (err error) {
LabelSelector: c.manageNodesWithLabelSelector,
AnnotationSelector: c.manageNodesWithAnnotationSelector,
FieldSelector: c.manageNodesWithFieldSelector,
}, c.nodeChan)
}, c.nodesChan)
if err != nil {
return fmt.Errorf("failed to watch nodes: %w", err)
}
Expand All @@ -255,7 +255,6 @@ func (c *Controller) init(ctx context.Context) (err error) {
}

if c.conf.NodeLeaseDurationSeconds != 0 {
c.nodeLeasesChan = make(chan informer.Event[*coordinationv1.Lease], 1)
nodeLeasesCli := c.conf.TypedClient.CoordinationV1().Leases(corev1.NamespaceNodeLease)
c.nodeLeasesInformer = informer.NewInformer[*coordinationv1.Lease, *coordinationv1.LeaseList](nodeLeasesCli)
}
Expand All @@ -276,9 +275,10 @@ func (c *Controller) initNodeLeaseController(ctx context.Context) error {
return nil
}

err := c.nodeLeasesInformer.Watch(ctx, informer.Option{
var err error
c.nodeLeaseCacheGetter, err = c.nodeLeasesInformer.WatchWithCache(ctx, informer.Option{
FieldSelector: c.manageNodeLeasesWithFieldSelector,
}, c.nodeLeasesChan)
}, nil)
if err != nil {
return fmt.Errorf("failed to watch node leases: %w", err)
}
Expand All @@ -293,8 +293,11 @@ func (c *Controller) initNodeLeaseController(ctx context.Context) error {
TypedClient: c.conf.TypedClient,
LeaseDurationSeconds: c.conf.NodeLeaseDurationSeconds,
LeaseParallelism: c.conf.NodeLeaseParallelism,
RenewInterval: renewInterval,
RenewIntervalJitter: renewIntervalJitter,
GetLease: func(nodeName string) (*coordinationv1.Lease, bool) {
return c.nodeLeaseCacheGetter.GetWithNamespace(nodeName, corev1.NamespaceNodeLease)
},
RenewInterval: renewInterval,
RenewIntervalJitter: renewIntervalJitter,
MutateLeaseFunc: setNodeOwnerFunc(func(nodeName string) []metav1.OwnerReference {
node, ok := c.nodeCacheGetter.Get(nodeName)
if !ok {
Expand All @@ -312,7 +315,8 @@ func (c *Controller) initNodeLeaseController(ctx context.Context) error {
}),
HolderIdentity: c.conf.ID,
OnNodeManagedFunc: func(nodeName string) {
c.onLeaseNodeManageFunc(nodeName)
c.nodeManageQueue.Add(nodeName)
c.podOnNodeManageQueue.Add(nodeName)
},
})
if err != nil {
Expand All @@ -324,18 +328,17 @@ func (c *Controller) initNodeLeaseController(ctx context.Context) error {
return !c.nodeLeases.Held(nodeName)
}

c.onLeaseNodeManageFunc = func(nodeName string) {
c.nodeManageQueue.Add(nodeName)
c.podOnNodeManageQueue.Add(nodeName)
}
c.onNodeManagedFunc = func(nodeName string) {
// Try to hold the lease
c.nodeLeases.TryHold(nodeName)
}
c.onNodeUnmanagedFunc = func(nodeName string) {
c.nodeLeases.ReleaseHold(nodeName)
}

go c.nodeLeaseSyncWorker(ctx)

err = c.nodeLeases.Start(ctx, c.nodeLeasesChan)
err = c.nodeLeases.Start(ctx)
if err != nil {
return fmt.Errorf("failed to start node leases controller: %w", err)
}
Expand All @@ -351,16 +354,15 @@ func (c *Controller) nodeLeaseSyncWorker(ctx context.Context) {
logger.Warn("node not found in cache", "node", nodeName)
err := c.nodesInformer.Sync(ctx, informer.Option{
FieldSelector: fields.OneTermEqualSelector("metadata.name", nodeName).String(),
}, c.nodeChan)
}, c.nodesChan)
if err != nil {
logger.Error("failed to update node", err, "node", nodeName)
}
continue
}
c.nodeChan <- informer.Event[*corev1.Node]{
Type: informer.Sync,
Object: node,
}

// Avoid slow cache synchronization, which may be judged as unmanaged.
c.nodes.ManageNode(node)
}
}

Expand Down Expand Up @@ -436,30 +438,42 @@ func (c *Controller) initStagesManager(ctx context.Context) error {
return nil
}

func (c *Controller) onNodeManaged(nodeName string) {
if c.onNodeManagedFunc == nil {
return
}
c.onNodeManagedFunc(nodeName)
}

func (c *Controller) onNodeUnmanaged(nodeName string) {
if c.onNodeUnmanagedFunc == nil {
return
}
c.onNodeUnmanagedFunc(nodeName)
}

func (c *Controller) initNodeController(ctx context.Context, lifecycle resources.Getter[Lifecycle]) (err error) {
c.nodes, err = NewNodeController(NodeControllerConfig{
Clock: c.conf.Clock,
TypedClient: c.conf.TypedClient,
NodeCacheGetter: c.nodeCacheGetter,
NodeIP: c.conf.NodeIP,
NodeName: c.conf.NodeName,
NodePort: c.conf.NodePort,
DisregardStatusWithAnnotationSelector: c.conf.DisregardStatusWithAnnotationSelector,
DisregardStatusWithLabelSelector: c.conf.DisregardStatusWithLabelSelector,
OnNodeManagedFunc: func(nodeName string) {
c.onNodeManagedFunc(nodeName)
},
Lifecycle: lifecycle,
PlayStageParallelism: c.conf.NodePlayStageParallelism,
FuncMap: defaultFuncMap,
Recorder: c.recorder,
ReadOnlyFunc: c.readOnlyFunc,
EnableMetrics: c.conf.EnableMetrics,
OnNodeManagedFunc: c.onNodeManaged,
OnNodeUnmanagedFunc: c.onNodeUnmanaged,
Lifecycle: lifecycle,
PlayStageParallelism: c.conf.NodePlayStageParallelism,
FuncMap: defaultFuncMap,
Recorder: c.recorder,
ReadOnlyFunc: c.readOnlyFunc,
EnableMetrics: c.conf.EnableMetrics,
})
if err != nil {
return fmt.Errorf("failed to create nodes controller: %w", err)
}
err = c.nodes.Start(ctx, c.nodeChan)
err = c.nodes.Start(ctx, c.nodesChan)
if err != nil {
return fmt.Errorf("failed to start nodes controller: %w", err)
}
Expand Down
55 changes: 36 additions & 19 deletions pkg/kwok/controllers/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ var (
type NodeController struct {
clock clock.Clock
typedClient kubernetes.Interface
nodeCacheGetter informer.Getter[*corev1.Node]
nodeIP string
nodeName string
nodePort int
disregardStatusWithAnnotationSelector labels.Selector
disregardStatusWithLabelSelector labels.Selector
onNodeManagedFunc func(nodeName string)
onNodeUnmanagedFunc func(nodeName string)
nodesSets maps.SyncMap[string, *NodeInfo]
renderer gotpl.Renderer
preprocessChan chan *corev1.Node
Expand All @@ -109,8 +109,8 @@ type NodeController struct {
type NodeControllerConfig struct {
Clock clock.Clock
TypedClient kubernetes.Interface
NodeCacheGetter informer.Getter[*corev1.Node]
OnNodeManagedFunc func(nodeName string)
OnNodeUnmanagedFunc func(nodeName string)
DisregardStatusWithAnnotationSelector string
DisregardStatusWithLabelSelector string
NodeIP string
Expand Down Expand Up @@ -152,10 +152,10 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) {
c := &NodeController{
clock: conf.Clock,
typedClient: conf.TypedClient,
nodeCacheGetter: conf.NodeCacheGetter,
disregardStatusWithAnnotationSelector: disregardStatusWithAnnotationSelector,
disregardStatusWithLabelSelector: disregardStatusWithLabelSelector,
onNodeManagedFunc: conf.OnNodeManagedFunc,
onNodeUnmanagedFunc: conf.OnNodeUnmanagedFunc,
nodeIP: conf.NodeIP,
nodeName: conf.NodeName,
nodePort: conf.NodePort,
Expand Down Expand Up @@ -206,6 +206,11 @@ func (c *NodeController) need(node *corev1.Node) bool {
return true
}

// ManageNode manages a node
func (c *NodeController) ManageNode(node *corev1.Node) {
c.preprocessChan <- node
}

// watchResources watch resources and send to preprocessChan
func (c *NodeController) watchResources(ctx context.Context, events <-chan informer.Event[*corev1.Node]) {
logger := log.FromContext(ctx)
Expand All @@ -230,10 +235,10 @@ loop:
} else {
c.preprocessChan <- node
}
}

if c.onNodeManagedFunc != nil && event.Type != informer.Modified {
c.onNodeManagedFunc(node.Name)
if c.onNodeManagedFunc != nil && event.Type != informer.Modified {
c.onNodeManagedFunc(node.Name)
}
}
case informer.Deleted:
node := event.Object
Expand All @@ -247,6 +252,10 @@ loop:
c.delayQueue.Cancel(resourceJob)
}
}

if c.onNodeUnmanagedFunc != nil {
c.onNodeUnmanagedFunc(node.Name)
}
}
case <-ctx.Done():
break loop
Expand Down Expand Up @@ -330,16 +339,28 @@ func (c *NodeController) preprocessWorker(ctx context.Context) {
func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) error {
key := node.Name

resourceJob, ok := c.delayQueueMapping.Load(key)
if ok && resourceJob.Resource.ResourceVersion == node.ResourceVersion {
return nil
}

logger := log.FromContext(ctx)
logger = logger.With(
"node", key,
)

resourceJob, ok := c.delayQueueMapping.Load(key)
if ok {
if resourceJob.Resource.ResourceVersion == node.ResourceVersion {
logger.Debug("Skip node",
"reason", "resource version not changed",
"stage", resourceJob.Stage.Name(),
)
return nil
}

if !c.delayQueue.Cancel(resourceJob) {
logger.Debug("Failed to cancel stage",
"stage", resourceJob.Stage.Name(),
)
}
}

data, err := expression.ToJSONStandard(node)
if err != nil {
return err
Expand Down Expand Up @@ -373,14 +394,10 @@ func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) erro
Stage: stage,
Key: key,
}
ok = c.delayQueue.AddAfter(item, delay)
if !ok {
logger.Debug("Skip node",
"reason", "delayed",
)
} else {
c.delayQueueMapping.Store(key, item)
}

c.delayQueue.AddAfter(item, delay)
c.delayQueueMapping.Store(key, item)

return nil
}

Expand Down
Loading

0 comments on commit a118781

Please sign in to comment.