Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CRD for Metric #739

Merged
merged 1 commit into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions pkg/config/resources/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package resources

import (
"context"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -38,22 +37,28 @@ type ConvertFunc[O any, T runtime.Object, S ~[]T] func(objs S) O

// NewDynamicGetter returns a new Getter that returns the latest list of resources.
func NewDynamicGetter[O any, T runtime.Object, L runtime.Object](syncer Syncer[T, L], convertFunc ConvertFunc[O, T, []T]) DynamicGetter[O] {
syncCh := make(chan struct{}, 1)
syncCh <- struct{}{}
getter := &dynamicGetter[O, T, L]{
syncCh: syncCh,
syncer: syncer,
convertFunc: convertFunc,
}

return struct {
Getter[O]
Starter
Synced
}{
Getter: withCache[O](getter),
Starter: getter,
Synced: getter,
}
}

type dynamicGetter[O any, T runtime.Object, L runtime.Object] struct {
syncer Syncer[T, L]
syncCh chan struct{}

convertFunc ConvertFunc[O, T, []T]

Expand All @@ -73,8 +78,18 @@ func (c *dynamicGetter[O, T, L]) Start(ctx context.Context) error {
},
},
t,
10*time.Second,
cache.ResourceEventHandlerFuncs{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.sync()
},
UpdateFunc: func(oldObj, newObj interface{}) {
c.sync()
},
DeleteFunc: func(obj interface{}) {
c.sync()
},
},
)

c.store = store
Expand All @@ -97,3 +112,14 @@ func (c *dynamicGetter[O, T, L]) Get() O {
func (c *dynamicGetter[O, T, L]) Version() string {
return c.controller.LastSyncResourceVersion()
}

func (c *dynamicGetter[O, T, L]) Sync() <-chan struct{} {
return c.syncCh
}

func (c *dynamicGetter[O, T, L]) sync() {
select {
case c.syncCh <- struct{}{}:
default:
}
}
11 changes: 11 additions & 0 deletions pkg/config/resources/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,20 @@ type Getter[O any] interface {
type DynamicGetter[O any] interface {
Getter[O]
Starter
Synced
}

// Starter is an interface for starting resources.
type Starter interface {
Start(ctx context.Context) error
}

// CacheGetter is an interface for getting resources that are cached.
type CacheGetter[O any] interface {
Getter[O]
}

// Synced is an interface for getting resources that are synced.
type Synced interface {
Sync() <-chan struct{}
}
18 changes: 12 additions & 6 deletions pkg/kwok/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func runE(ctx context.Context, flags *flagpole) error {
TypedClient: typedClient,
TypedKwokClient: typedKwokClient,
EnableCNI: flags.Options.EnableCNI,
EnableMetrics: len(metrics) != 0,
EnableMetrics: len(metrics) != 0 || slices.Contains(flags.Options.EnableCRDs, v1alpha1.MetricKind),
ManageAllNodes: flags.Options.ManageAllNodes,
ManageNodesWithAnnotationSelector: flags.Options.ManageNodesWithAnnotationSelector,
ManageNodesWithLabelSelector: flags.Options.ManageNodesWithLabelSelector,
Expand Down Expand Up @@ -309,11 +309,6 @@ func runE(ctx context.Context, flags *flagpole) error {
if err != nil {
return fmt.Errorf("failed to create server: %w", err)
}

err = svc.InstallMetrics()
if err != nil {
return fmt.Errorf("failed to install metrics: %w", err)
}
svc.InstallHealthz()

if flags.Options.EnableDebuggingHandlers {
Expand All @@ -322,6 +317,17 @@ func runE(ctx context.Context, flags *flagpole) error {
} else {
svc.InstallDebuggingDisabledHandlers()
}

err = svc.InstallCRD(ctx)
if err != nil {
return fmt.Errorf("failed to install crd: %w", err)
}

err = svc.InstallMetrics(ctx)
if err != nil {
return fmt.Errorf("failed to install metrics: %w", err)
}

go func() {
err := svc.Run(ctx, serverAddress, flags.Options.TLSCertFile, flags.Options.TLSPrivateKeyFile)
if err != nil {
Expand Down
13 changes: 11 additions & 2 deletions pkg/kwok/metrics/cel/evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cel
import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -78,6 +79,7 @@ type Environment struct {
env *easycel.Environment
conf NodeEvaluatorConfig
cacheEvaluator map[string]*Evaluator
cacheMut sync.Mutex
resultCacheVer *int64
}

Expand Down Expand Up @@ -202,6 +204,9 @@ func (e *Environment) init() error {
// Compile is responsible for compiling a cel program
func (e *Environment) Compile(src string) (*Evaluator, error) {
if e.cacheEvaluator != nil {
e.cacheMut.Lock()
defer e.cacheMut.Unlock()

if evaluator, ok := e.cacheEvaluator[src]; ok {
return evaluator, nil
}
Expand Down Expand Up @@ -234,8 +239,9 @@ type Evaluator struct {
latestCacheVer *int64
cacheVer int64

cache map[string]ref.Val
program cel.Program
cache map[string]ref.Val
cacheMut sync.Mutex
program cel.Program
}

func resultUniqueKey(node *corev1.Node, pod *corev1.Pod, container *corev1.Container) string {
Expand All @@ -255,6 +261,9 @@ func resultUniqueKey(node *corev1.Node, pod *corev1.Pod, container *corev1.Conta
func (e *Evaluator) evaluate(data Data) (ref.Val, error) {
var key string
if e.latestCacheVer != nil {
e.cacheMut.Lock()
defer e.cacheMut.Unlock()

if e.cache == nil || *e.latestCacheVer != e.cacheVer {
e.cache = map[string]ref.Val{}
e.cacheVer = *e.latestCacheVer
Expand Down
14 changes: 2 additions & 12 deletions pkg/kwok/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (

// UpdateHandler handles updating metrics on request
type UpdateHandler struct {
name string
metric *internalversion.Metric
controller *controllers.Controller
environment *cel.Environment

Expand All @@ -50,8 +48,6 @@ type UpdateHandler struct {

// UpdateHandlerConfig is configuration for a single node
type UpdateHandlerConfig struct {
NodeName string
Metrics *internalversion.Metric
Controller *controllers.Controller
Environment *cel.Environment
}
Expand All @@ -64,8 +60,6 @@ func NewMetricsUpdateHandler(conf UpdateHandlerConfig) *UpdateHandler {
)

h := &UpdateHandler{
name: conf.NodeName,
metric: conf.Metrics,
controller: conf.Controller,
environment: conf.Environment,
registry: registry,
Expand Down Expand Up @@ -497,13 +491,12 @@ func uniqueKey(name string, kind internalversion.Kind, labels map[string]string)
return builder.String()
}

func (h *UpdateHandler) update(ctx context.Context) {
// Update updates metrics for a node
func (h *UpdateHandler) Update(ctx context.Context, nodeName string, metrics []internalversion.MetricConfig) {
logger := log.FromContext(ctx)
has := map[string]struct{}{}
// Update metrics
h.environment.ClearResultCache()
metrics := h.metric.Spec.Metrics
nodeName := h.metric.Name
for _, metric := range metrics {
metric := metric
metricName := metric.Name
Expand Down Expand Up @@ -548,9 +541,6 @@ func (h *UpdateHandler) update(ctx context.Context) {
}

func (h *UpdateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
h.update(ctx)

// Serve metrics
h.handler.ServeHTTP(w, r)
}
89 changes: 79 additions & 10 deletions pkg/kwok/server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,27 @@ limitations under the License.
package server

import (
"context"
"fmt"
"strings"

"github.com/emicklei/go-restful/v3"
"github.com/prometheus/client_golang/prometheus/promhttp"

"sigs.k8s.io/kwok/pkg/apis/internalversion"
"sigs.k8s.io/kwok/pkg/config/resources"
"sigs.k8s.io/kwok/pkg/kwok/metrics"
"sigs.k8s.io/kwok/pkg/kwok/metrics/cel"
"sigs.k8s.io/kwok/pkg/log"
)

// InstallMetrics registers the metrics handler on the given mux.
func (s *Server) InstallMetrics() error {
func (s *Server) InstallMetrics(ctx context.Context) error {
promHandler := promhttp.Handler()
s.restfulCont.Handle("/metrics", promHandler)

selfMetric := func(req *restful.Request, resp *restful.Response) {
promHandler.ServeHTTP(resp.ResponseWriter, req.Request)
}

controller := s.controller
env, err := cel.NewEnvironment(cel.NodeEvaluatorConfig{
Expand All @@ -45,14 +54,74 @@ func (s *Server) InstallMetrics() error {
if err != nil {
return fmt.Errorf("failed to create CEL environment: %w", err)
}
for _, m := range s.metrics {
handler := metrics.NewMetricsUpdateHandler(metrics.UpdateHandlerConfig{
NodeName: m.Name,
Metrics: m,
Controller: controller,
Environment: env,
})
s.restfulCont.Handle(m.Spec.Path, handler)

const rootPath = "/metrics"
ws := new(restful.WebService)
ws.Path(rootPath)
ws.Route(ws.GET("/").To(selfMetric))

for _, m := range s.metrics.Get() {
if !strings.HasPrefix(m.Spec.Path, rootPath) {
return fmt.Errorf("metric path %q does not start with %q", m.Spec.Path, rootPath)
}

ws.Route(ws.GET(strings.TrimPrefix(m.Spec.Path, rootPath)).
To(s.getMetrics(m, env)))
}

s.restfulCont.Add(ws)
s.metricsWebService = ws

logger := log.FromContext(ctx)
syncd, ok := s.metrics.(resources.Synced)
if ok {
logger.Info("Starting metrics syncer")
go func() {
for range syncd.Sync() {
logger.Info("Metrics synced, updating metrics web service")
ws := new(restful.WebService)
ws.Path(rootPath)
ws.Route(ws.GET("/").To(selfMetric))

for _, m := range s.metrics.Get() {
if !strings.HasPrefix(m.Spec.Path, rootPath) {
logger.Warn("metric path does not start with "+rootPath, "path", m.Spec.Path)
continue
}
ws.Route(ws.GET(strings.TrimPrefix(m.Spec.Path, rootPath)).
To(s.getMetrics(m, env)))
}

err := s.restfulCont.Remove(s.metricsWebService)
if err != nil {
logger.Error("failed to remove metrics web service", err)
}
s.restfulCont.Add(ws)
s.metricsWebService = ws
}
}()
}

return nil
}

func (s *Server) getMetrics(metric *internalversion.Metric, env *cel.Environment) func(req *restful.Request, resp *restful.Response) {
return func(req *restful.Request, resp *restful.Response) {
nodeName := req.PathParameter("nodeName")
if nodeName == "" {
nodeName = metric.Name
}

handler, ok := s.metricsUpdateHandler.Load(nodeName)
if !ok {
handler = metrics.NewMetricsUpdateHandler(metrics.UpdateHandlerConfig{
Controller: s.controller,
Environment: env,
})
s.metricsUpdateHandler.Store(nodeName, handler)
}

handler.Update(req.Request.Context(), nodeName, metric.Spec.Metrics)
handler.ServeHTTP(resp.ResponseWriter, req.Request)
}
}
Loading