From 62859478a6be947748d458d7171b859f496c1961 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 6 Aug 2024 12:09:56 -0700 Subject: [PATCH 1/3] Create feature flag to switch between current shuffle sharding group planner and partition compaction group planner Signed-off-by: Alex Le --- docs/blocks-storage/compactor.md | 4 + docs/configuration/config-file-reference.md | 4 + pkg/compactor/compactor.go | 78 ++++++++++++------- pkg/compactor/partition_compaction_grouper.go | 38 +++++++++ pkg/compactor/partition_compaction_planner.go | 31 ++++++++ pkg/util/shard.go | 4 + 6 files changed, 132 insertions(+), 27 deletions(-) create mode 100644 pkg/compactor/partition_compaction_grouper.go create mode 100644 pkg/compactor/partition_compaction_planner.go diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index 04a08cfa64..ff66ac2c6d 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -285,6 +285,10 @@ compactor: # CLI flag: -compactor.ring.wait-active-instance-timeout [wait_active_instance_timeout: | default = 10m] + # The compaction mode to use. Supported values are: default, partitioning. + # CLI flag: -compactor.compaction-mode + [compaction_mode: | default = "default"] + # How long block visit marker file should be considered as expired and able to # be picked up by compactor again. # CLI flag: -compactor.block-visit-marker-timeout diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 745b612fe7..3d4ed9c08b 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2223,6 +2223,10 @@ sharding_ring: # CLI flag: -compactor.ring.wait-active-instance-timeout [wait_active_instance_timeout: | default = 10m] +# The compaction mode to use. Supported values are: default, partitioning. +# CLI flag: -compactor.compaction-mode +[compaction_mode: | default = "default"] + # How long block visit marker file should be considered as expired and able to # be picked up by compactor again. # CLI flag: -compactor.block-visit-marker-timeout diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index ff7907fe2c..06b8dd98aa 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -53,9 +53,12 @@ var ( errInvalidBlockRanges = "compactor block range periods should be divisible by the previous one, but %s is not divisible by %s" RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) - supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} - errInvalidShardingStrategy = errors.New("invalid sharding strategy") - errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") + supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} + errInvalidShardingStrategy = errors.New("invalid sharding strategy") + errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") + supportedCompactionModes = []string{util.CompactionModeDefault, util.CompactionModePartitioning} + errInvalidCompactionMode = errors.New("invalid compaction mode") + errInvalidCompactionModePartitioning = errors.New("compaction mode partitioning can only be enabled when shuffle sharding is enabled") DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, _ prometheus.Counter, _ prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter) compact.Grouper { return compact.NewDefaultGrouperWithMetrics( @@ -77,29 +80,33 @@ var ( } ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Grouper { - return NewShuffleShardingGrouper( - ctx, - logger, - bkt, - cfg.AcceptMalformedIndex, - true, // Enable vertical compaction - blocksMarkedForNoCompaction, - metadata.NoneFunc, - syncerMetrics, - compactorMetrics, - cfg, - ring, - ringLifecycle.Addr, - ringLifecycle.ID, - limits, - userID, - cfg.BlockFilesConcurrency, - cfg.BlocksFetchConcurrency, - cfg.CompactionConcurrency, - cfg.BlockVisitMarkerTimeout, - blockVisitMarkerReadFailed, - blockVisitMarkerWriteFailed, - noCompactionMarkFilter.NoCompactMarkedBlocks) + if cfg.CompactionMode == util.CompactionModePartitioning { + return NewPartitionCompactionGrouper(ctx, logger, bkt) + } else { + return NewShuffleShardingGrouper( + ctx, + logger, + bkt, + cfg.AcceptMalformedIndex, + true, // Enable vertical compaction + blocksMarkedForNoCompaction, + metadata.NoneFunc, + syncerMetrics, + compactorMetrics, + cfg, + ring, + ringLifecycle.Addr, + ringLifecycle.ID, + limits, + userID, + cfg.BlockFilesConcurrency, + cfg.BlocksFetchConcurrency, + cfg.CompactionConcurrency, + cfg.BlockVisitMarkerTimeout, + blockVisitMarkerReadFailed, + blockVisitMarkerWriteFailed, + noCompactionMarkFilter.NoCompactMarkedBlocks) + } } DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { @@ -123,7 +130,11 @@ var ( plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner { - return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed) + if cfg.CompactionMode == util.CompactionModePartitioning { + return NewPartitionCompactionPlanner(ctx, bkt, logger) + } else { + return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed) + } } return compactor, plannerFactory, nil } @@ -202,6 +213,9 @@ type Config struct { ShardingStrategy string `yaml:"sharding_strategy"` ShardingRing RingConfig `yaml:"sharding_ring"` + // Compaction mode. + CompactionMode string `yaml:"compaction_mode"` + // No need to add options to customize the retry backoff, // given the defaults should be fine, but allow to override // it in tests. @@ -244,6 +258,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.CleanupConcurrency, "compactor.cleanup-concurrency", 20, "Max number of tenants for which blocks cleanup and maintenance should run concurrently.") f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.") f.StringVar(&cfg.ShardingStrategy, "compactor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) + f.StringVar(&cfg.CompactionMode, "compactor.compaction-mode", util.CompactionModeDefault, fmt.Sprintf("The compaction mode to use. Supported values are: %s.", strings.Join(supportedCompactionModes, ", "))) f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+ "If not 0, blocks will be marked for deletion and compactor component will permanently delete blocks marked for deletion from the bucket. "+ "If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.") @@ -290,6 +305,15 @@ func (cfg *Config) Validate(limits validation.Limits) error { } } + // Make sure a valid compaction mode is being used + if !util.StringsContain(supportedCompactionModes, cfg.CompactionMode) { + return errInvalidCompactionMode + } + + if !cfg.ShardingEnabled && cfg.CompactionMode == util.CompactionModePartitioning { + return errInvalidCompactionModePartitioning + } + return nil } diff --git a/pkg/compactor/partition_compaction_grouper.go b/pkg/compactor/partition_compaction_grouper.go new file mode 100644 index 0000000000..c3687f7e6a --- /dev/null +++ b/pkg/compactor/partition_compaction_grouper.go @@ -0,0 +1,38 @@ +package compactor + +import ( + "context" + + "github.com/go-kit/log" + "github.com/oklog/ulid" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact" +) + +type PartitionCompactionGrouper struct { + ctx context.Context + logger log.Logger + bkt objstore.InstrumentedBucket +} + +func NewPartitionCompactionGrouper( + ctx context.Context, + logger log.Logger, + bkt objstore.InstrumentedBucket, +) *PartitionCompactionGrouper { + if logger == nil { + logger = log.NewNopLogger() + } + + return &PartitionCompactionGrouper{ + ctx: ctx, + logger: logger, + bkt: bkt, + } +} + +// Groups function modified from https://github.com/cortexproject/cortex/pull/2616 +func (g *PartitionCompactionGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error) { + panic("PartitionCompactionGrouper not implemented") +} diff --git a/pkg/compactor/partition_compaction_planner.go b/pkg/compactor/partition_compaction_planner.go new file mode 100644 index 0000000000..963771aa6d --- /dev/null +++ b/pkg/compactor/partition_compaction_planner.go @@ -0,0 +1,31 @@ +package compactor + +import ( + "context" + + "github.com/go-kit/log" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +type PartitionCompactionPlanner struct { + ctx context.Context + bkt objstore.InstrumentedBucket + logger log.Logger +} + +func NewPartitionCompactionPlanner( + ctx context.Context, + bkt objstore.InstrumentedBucket, + logger log.Logger, +) *PartitionCompactionPlanner { + return &PartitionCompactionPlanner{ + ctx: ctx, + bkt: bkt, + logger: logger, + } +} + +func (p *PartitionCompactionPlanner) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error) { + panic("PartitionCompactionPlanner not implemented") +} diff --git a/pkg/util/shard.go b/pkg/util/shard.go index 82392b3a1a..1ffe234ee9 100644 --- a/pkg/util/shard.go +++ b/pkg/util/shard.go @@ -10,6 +10,10 @@ const ( // Sharding strategies. ShardingStrategyDefault = "default" ShardingStrategyShuffle = "shuffle-sharding" + + // Compaction mode + CompactionModeDefault = "default" + CompactionModePartitioning = "partitioning" ) var ( From a8efe988d59d63ac27131ddef86b7915fca9e21d Mon Sep 17 00:00:00 2001 From: Alex Le Date: Thu, 8 Aug 2024 13:43:12 -0700 Subject: [PATCH 2/3] rename Signed-off-by: Alex Le --- pkg/compactor/compactor.go | 28 ++++++++++++++-------------- pkg/util/shard.go | 4 ++-- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 06b8dd98aa..817f93572b 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -53,12 +53,12 @@ var ( errInvalidBlockRanges = "compactor block range periods should be divisible by the previous one, but %s is not divisible by %s" RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) - supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} - errInvalidShardingStrategy = errors.New("invalid sharding strategy") - errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") - supportedCompactionModes = []string{util.CompactionModeDefault, util.CompactionModePartitioning} - errInvalidCompactionMode = errors.New("invalid compaction mode") - errInvalidCompactionModePartitioning = errors.New("compaction mode partitioning can only be enabled when shuffle sharding is enabled") + supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} + errInvalidShardingStrategy = errors.New("invalid sharding strategy") + errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") + supportedCompactionStrategies = []string{util.CompactionStrategyDefault, util.CompactionStrategyPartitioning} + errInvalidCompactionStrategy = errors.New("invalid compaction strategy") + errInvalidCompactionStrategyPartitioning = errors.New("compaction strategy partitioning can only be enabled when shuffle sharding is enabled") DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, _ prometheus.Counter, _ prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter) compact.Grouper { return compact.NewDefaultGrouperWithMetrics( @@ -80,7 +80,7 @@ var ( } ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Grouper { - if cfg.CompactionMode == util.CompactionModePartitioning { + if cfg.CompactionStrategy == util.CompactionStrategyPartitioning { return NewPartitionCompactionGrouper(ctx, logger, bkt) } else { return NewShuffleShardingGrouper( @@ -130,7 +130,7 @@ var ( plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner { - if cfg.CompactionMode == util.CompactionModePartitioning { + if cfg.CompactionStrategy == util.CompactionStrategyPartitioning { return NewPartitionCompactionPlanner(ctx, bkt, logger) } else { return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed) @@ -214,7 +214,7 @@ type Config struct { ShardingRing RingConfig `yaml:"sharding_ring"` // Compaction mode. - CompactionMode string `yaml:"compaction_mode"` + CompactionStrategy string `yaml:"compaction_mode"` // No need to add options to customize the retry backoff, // given the defaults should be fine, but allow to override @@ -258,7 +258,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.CleanupConcurrency, "compactor.cleanup-concurrency", 20, "Max number of tenants for which blocks cleanup and maintenance should run concurrently.") f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.") f.StringVar(&cfg.ShardingStrategy, "compactor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) - f.StringVar(&cfg.CompactionMode, "compactor.compaction-mode", util.CompactionModeDefault, fmt.Sprintf("The compaction mode to use. Supported values are: %s.", strings.Join(supportedCompactionModes, ", "))) + f.StringVar(&cfg.CompactionStrategy, "compactor.compaction-mode", util.CompactionStrategyDefault, fmt.Sprintf("The compaction strategy to use. Supported values are: %s.", strings.Join(supportedCompactionStrategies, ", "))) f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+ "If not 0, blocks will be marked for deletion and compactor component will permanently delete blocks marked for deletion from the bucket. "+ "If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.") @@ -306,12 +306,12 @@ func (cfg *Config) Validate(limits validation.Limits) error { } // Make sure a valid compaction mode is being used - if !util.StringsContain(supportedCompactionModes, cfg.CompactionMode) { - return errInvalidCompactionMode + if !util.StringsContain(supportedCompactionStrategies, cfg.CompactionStrategy) { + return errInvalidCompactionStrategy } - if !cfg.ShardingEnabled && cfg.CompactionMode == util.CompactionModePartitioning { - return errInvalidCompactionModePartitioning + if !cfg.ShardingEnabled && cfg.CompactionStrategy == util.CompactionStrategyPartitioning { + return errInvalidCompactionStrategyPartitioning } return nil diff --git a/pkg/util/shard.go b/pkg/util/shard.go index 1ffe234ee9..5d3de01cc4 100644 --- a/pkg/util/shard.go +++ b/pkg/util/shard.go @@ -12,8 +12,8 @@ const ( ShardingStrategyShuffle = "shuffle-sharding" // Compaction mode - CompactionModeDefault = "default" - CompactionModePartitioning = "partitioning" + CompactionStrategyDefault = "default" + CompactionStrategyPartitioning = "partitioning" ) var ( From bec59bc328de1963234323eb7955b67ba22612ae Mon Sep 17 00:00:00 2001 From: Alex Le Date: Thu, 8 Aug 2024 14:06:49 -0700 Subject: [PATCH 3/3] update doc Signed-off-by: Alex Le --- docs/blocks-storage/compactor.md | 2 +- docs/configuration/config-file-reference.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index ff66ac2c6d..030035f2ac 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -285,7 +285,7 @@ compactor: # CLI flag: -compactor.ring.wait-active-instance-timeout [wait_active_instance_timeout: | default = 10m] - # The compaction mode to use. Supported values are: default, partitioning. + # The compaction strategy to use. Supported values are: default, partitioning. # CLI flag: -compactor.compaction-mode [compaction_mode: | default = "default"] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 3d4ed9c08b..4880cf70b6 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2223,7 +2223,7 @@ sharding_ring: # CLI flag: -compactor.ring.wait-active-instance-timeout [wait_active_instance_timeout: | default = 10m] -# The compaction mode to use. Supported values are: default, partitioning. +# The compaction strategy to use. Supported values are: default, partitioning. # CLI flag: -compactor.compaction-mode [compaction_mode: | default = "default"]