From 47bd5e4302b9f5bdd960f8d66dca559867398599 Mon Sep 17 00:00:00 2001 From: Paul Dittamo <37558497+pvditt@users.noreply.github.com> Date: Tue, 19 Mar 2024 15:53:32 -0700 Subject: [PATCH] Update cacheservice entrypoint + some clean up (#129) * add placeholder migrations to successfully deploy cacheservice Signed-off-by: Paul Dittamo * add persistent flags for cache service * clean up Signed-off-by: Paul Dittamo * clean up - add sample cache service config Signed-off-by: Paul Dittamo * add cache service otel trace for local builds Signed-off-by: Paul Dittamo * update cacheservice section key Signed-off-by: Paul Dittamo --------- Signed-off-by: Paul Dittamo --- cacheservice/cacheservice_config.yaml | 28 ++++++++++++++++++++ cacheservice/cmd/entrypoints/migrate.go | 24 +++++++++++++++++ cacheservice/cmd/entrypoints/root.go | 21 +++++++++++++++ cacheservice/pkg/config/config.go | 2 +- cacheservice/pkg/rpc/cacheservice/service.go | 4 +-- cmd/single/start.go | 25 ++++++++--------- 6 files changed, 89 insertions(+), 15 deletions(-) create mode 100644 cacheservice/cacheservice_config.yaml create mode 100644 cacheservice/cmd/entrypoints/migrate.go diff --git a/cacheservice/cacheservice_config.yaml b/cacheservice/cacheservice_config.yaml new file mode 100644 index 0000000000..c8b7d3f72d --- /dev/null +++ b/cacheservice/cacheservice_config.yaml @@ -0,0 +1,28 @@ +# This is a sample configuration file. +# Real configuration when running inside K8s (local or otherwise) lives in a ConfigMap +# Look in the artifacts directory in the flyte repo for what's actually run +logger: + level: 5 +cache-server: + grpcPort: 8091 + httpPort: 8090 + grpcServerReflection: true +cacheservice: + storage-prefix: "cached_outputs" + metrics-scope: "cacheservice" + profiler-port: 10254 + heartbeat-grace-period-multiplier: 3 + max-reservation-heartbeat: 10s +storage: + connection: + access-key: minio + auth-type: accesskey + disable-ssl: true + endpoint: http://localhost:9000 + region: my-region-here + secret-key: miniostorage + cache: + max_size_mbs: 10 + target_gc_percent: 100 + container: my-container + type: minio \ No newline at end of file diff --git a/cacheservice/cmd/entrypoints/migrate.go b/cacheservice/cmd/entrypoints/migrate.go new file mode 100644 index 0000000000..a4ce8b1b48 --- /dev/null +++ b/cacheservice/cmd/entrypoints/migrate.go @@ -0,0 +1,24 @@ +package entrypoints + +import ( + "github.com/spf13/cobra" +) + +var parentMigrateCmd = &cobra.Command{ + Use: "migrate", + Short: "This command controls migration behavior for the Flyte cacheservice database. Please choose a subcommand.", +} + +// This runs all the migrations. This is a placeholder for now as cache service does not have any migrations +var migrateCmd = &cobra.Command{ + Use: "run", + Short: "This command will run all the migrations for the database", + RunE: func(cmd *cobra.Command, args []string) error { + return nil + }, +} + +func init() { + RootCmd.AddCommand(parentMigrateCmd) + parentMigrateCmd.AddCommand(migrateCmd) +} diff --git a/cacheservice/cmd/entrypoints/root.go b/cacheservice/cmd/entrypoints/root.go index 803224f007..a4592f3baa 100644 --- a/cacheservice/cmd/entrypoints/root.go +++ b/cacheservice/cmd/entrypoints/root.go @@ -2,6 +2,7 @@ package entrypoints import ( "context" + "flag" "fmt" "os" @@ -18,6 +19,26 @@ var ( configAccessor = viper.NewAccessor(config.Options{}) ) +func init() { + // See https://gist.github.com/nak3/78a32817a8a3950ae48f239a44cd3663 + // allows `$ cacheservice --logtostderr` to work + pflag.CommandLine.AddGoFlagSet(flag.CommandLine) + + // Add persistent flags - persistent flags persist through all sub-commands + RootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is ./cacheservice_config.yaml)") + + RootCmd.AddCommand(viper.GetConfigCommand()) + + // Allow viper to read the value of the flags + configAccessor.InitializePflags(RootCmd.PersistentFlags()) + + err := flag.CommandLine.Parse([]string{}) + if err != nil { + fmt.Println(err) + os.Exit(-1) + } +} + func Execute() error { if err := RootCmd.Execute(); err != nil { fmt.Println(err) diff --git a/cacheservice/pkg/config/config.go b/cacheservice/pkg/config/config.go index 5c00738a29..5640369f33 100644 --- a/cacheservice/pkg/config/config.go +++ b/cacheservice/pkg/config/config.go @@ -6,7 +6,7 @@ import ( "github.com/flyteorg/flyte/flytestdlib/config" ) -const SectionKey = "cache" +const SectionKey = "cache-server" //go:generate pflags Config diff --git a/cacheservice/pkg/rpc/cacheservice/service.go b/cacheservice/pkg/rpc/cacheservice/service.go index dccf40b0f0..93c619e4d7 100644 --- a/cacheservice/pkg/rpc/cacheservice/service.go +++ b/cacheservice/pkg/rpc/cacheservice/service.go @@ -51,7 +51,7 @@ func (s *CacheService) ReleaseReservation(ctx context.Context, request *cacheser return s.CacheManager.ReleaseReservation(ctx, request) } -func NewCacheService() *CacheService { +func NewCacheServiceServer() *CacheService { configProvider := runtime.NewConfigurationProvider() cacheServiceConfig := configProvider.ApplicationConfiguration().GetCacheServiceConfig() cacheServiceScope := promutils.NewScope(cacheServiceConfig.MetricsScope).NewSubScope("cacheservice") @@ -114,7 +114,7 @@ func newGRPCServer(_ context.Context, cfg *config.Config) *grpc.Server { ), ), ) - cacheservice.RegisterCacheServiceServer(grpcServer, NewCacheService()) + cacheservice.RegisterCacheServiceServer(grpcServer, NewCacheServiceServer()) healthServer := health.NewServer() healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) diff --git a/cmd/single/start.go b/cmd/single/start.go index 2e145969d5..7f1add5ed6 100644 --- a/cmd/single/start.go +++ b/cmd/single/start.go @@ -5,6 +5,17 @@ import ( "net/http" "os" + _ "github.com/golang/glog" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" + _ "gorm.io/driver/postgres" // Required to import database driver. + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/metrics" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + ctrlWebhook "sigs.k8s.io/controller-runtime/pkg/webhook" + cacheserviceConfig "github.com/flyteorg/flyte/cacheservice/pkg/config" "github.com/flyteorg/flyte/cacheservice/pkg/rpc/cacheservice" datacatalogConfig "github.com/flyteorg/flyte/datacatalog/pkg/config" @@ -29,16 +40,6 @@ import ( "github.com/flyteorg/flyte/flytestdlib/promutils" "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" "github.com/flyteorg/flyte/flytestdlib/storage" - _ "github.com/golang/glog" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/spf13/cobra" - "golang.org/x/sync/errgroup" - _ "gorm.io/driver/postgres" // Required to import database driver. - "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/metrics" - metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" - ctrlWebhook "sigs.k8s.io/controller-runtime/pkg/webhook" ) const defaultNamespace = "all" @@ -205,8 +206,8 @@ var startCmd = &cobra.Command{ cfg := GetConfig() for _, serviceName := range []string{otelutils.AdminClientTracer, otelutils.AdminGormTracer, otelutils.AdminServerTracer, - otelutils.BlobstoreClientTracer, otelutils.DataCatalogClientTracer, otelutils.DataCatalogGormTracer, - otelutils.DataCatalogServerTracer, otelutils.FlytePropellerTracer, otelutils.K8sClientTracer} { + otelutils.BlobstoreClientTracer, otelutils.CacheServiceClientTracer, otelutils.DataCatalogClientTracer, + otelutils.DataCatalogGormTracer, otelutils.DataCatalogServerTracer, otelutils.FlytePropellerTracer, otelutils.K8sClientTracer} { if err := otelutils.RegisterTracerProvider(serviceName, otelutils.GetConfig()); err != nil { logger.Errorf(ctx, "Failed to create otel tracer provider. %v", err) return err