Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Change Redis.go to support explicitely connecting to a Redis Cluster with a single address for GCP compatibility #450

Merged
merged 5 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/googleforgames/open-saves/internal/app/collector"
"github.com/googleforgames/open-saves/internal/pkg/cache/redis"
"github.com/googleforgames/open-saves/internal/pkg/cmd"
log "github.com/sirupsen/logrus"
)
Expand All @@ -29,13 +30,15 @@ func main() {
defaultBucket := cmd.GetEnvVarString("OPEN_SAVES_BUCKET", "gs://triton-dev-store")
defaultProject := cmd.GetEnvVarString("OPEN_SAVES_PROJECT", "triton-for-games-dev")
defaultCache := cmd.GetEnvVarString("OPEN_SAVES_CACHE", "localhost:6379")
defaultRedisMode := cmd.GetEnvVarString("OPEN_SAVES_REDIS_MODE", redis.RedisModeSingle)
defaultExpiration := cmd.GetEnvVarDuration("OPEN_SAVES_GARBAGE_EXPIRATION", 24*time.Hour)

var (
cloud = flag.String("cloud", defaultCloud, "The public cloud provider you wish to run Open Saves on")
bucket = flag.String("bucket", defaultBucket, "The bucket which will hold Open Saves blobs")
project = flag.String("project", defaultProject, "The GCP project ID to use for Datastore")
cache = flag.String("cache", defaultCache, "The address of the cache store instance")
redisMode = flag.String("redis-mode", defaultRedisMode, "The mode the Redis cache is configured, single or cluster")
expiration = flag.Duration("garbage-expiration", defaultExpiration, "Collector deletes entries older than this time.Duration value (e.g. \"24h\")")
)

Expand All @@ -52,13 +55,15 @@ func main() {
if *cache == "" {
log.Fatal("missing -cache argument for cache store")
}
// RedisMode is considered optional, so we don't need to validate it here.

cfg := &collector.Config{
Cloud: *cloud,
Bucket: *bucket,
Project: *project,
Cache: *cache,
Before: time.Now().Add(-*expiration),
Cloud: *cloud,
Bucket: *bucket,
Project: *project,
Cache: *cache,
RedisMode: *redisMode,
Before: time.Now().Add(-*expiration),
}

ctx := context.Background()
Expand Down
5 changes: 5 additions & 0 deletions configs/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ log_level: "info"
shutdown_grace_period: "5s"
cache_default_ttl: "5m"

# When working with a single Redis instance, set redis_mode: "single" and supply a single address.
# When working with Redis Cluster, set redis_mode: "cluster"
# - if it is a standard Redis Cluster, supply a list of IPs separated by (,)
# - if it is a GCP MemoryStore Redis Cluster, supply a single address - the discovery address.
redis_address: "localhost:6379"
redis_mode: "single"
redis_min_idle_conns: 500
redis_pool_size: 10000
redis_idle_timeout: 0
Expand Down
17 changes: 11 additions & 6 deletions internal/app/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ import (

// Config defines common fields needed to start the garbage collector.
type Config struct {
Cloud string
Bucket string
Cache string
Project string
Before time.Time
Cloud string
Bucket string
Cache string
RedisMode string
Project string
Before time.Time
}

// Collector is a garbage collector of unused resources in Datastore.
Expand All @@ -65,7 +66,11 @@ func newCollector(ctx context.Context, cfg *Config) (*Collector, error) {
log.Fatalf("Failed to create a MetaDB instance: %v", err)
return nil, err
}
cache := cache.New(redis.NewRedis(cfg.Cache), &config.CacheConfig{})
redisCfg := config.RedisConfig{
Address: cfg.Cache,
RedisMode: cfg.RedisMode,
}
cache := cache.New(redis.NewRedisWithConfig(&redisCfg), &config.CacheConfig{})
c := &Collector{
blob: gcs,
metaDB: metadb,
Expand Down
13 changes: 8 additions & 5 deletions internal/app/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"cloud.google.com/go/datastore"
"github.com/google/uuid"
"github.com/googleforgames/open-saves/internal/pkg/cache/redis"
"github.com/googleforgames/open-saves/internal/pkg/metadb/blobref"
"github.com/googleforgames/open-saves/internal/pkg/metadb/blobref/chunkref"
"github.com/googleforgames/open-saves/internal/pkg/metadb/record"
Expand All @@ -36,6 +37,7 @@ const (
testProject = "triton-for-games-dev"
testBucket = "gs://triton-integration"
testCacheAddr = "localhost:6379"
testRedisMode = redis.RedisModeSingle
blobKind = "blob"
chunkKind = "chunk"
testTimeThreshold = -1 * time.Hour
Expand All @@ -44,11 +46,12 @@ const (
func newTestCollector(ctx context.Context, t *testing.T) *Collector {
t.Helper()
cfg := &Config{
Cloud: "gcp",
Bucket: testBucket,
Project: testProject,
Cache: testCacheAddr,
Before: time.Now().Add(testTimeThreshold),
Cloud: "gcp",
Bucket: testBucket,
Project: testProject,
Cache: testCacheAddr,
RedisMode: testRedisMode,
Before: time.Now().Add(testTimeThreshold),
}
c, err := newCollector(ctx, cfg)
if err != nil {
Expand Down
50 changes: 40 additions & 10 deletions internal/pkg/cache/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ import (
"time"
)

// Knonw Redis connection modes.
const (
RedisModeSingle = "single"
RedisModeCluster = "cluster"
)

const (
redisClusterSeparator = ","
)

// Redis is an implementation of the cache.Cache interface.
type Redis struct {
c redis.UniversalClient
Expand All @@ -33,23 +43,43 @@ type Redis struct {
func NewRedis(address string) *Redis {
cfg := &config.RedisConfig{
Address: address,
RedisMode: RedisModeSingle,
}

return NewRedisWithConfig(cfg)
}

// NewRedisWithConfig creates a new Redis instance with configurable options.
func NewRedisWithConfig(cfg *config.RedisConfig) *Redis {
o := &redis.UniversalOptions{
Addrs: parseRedisAddress(cfg.Address),
MinIdleConns: cfg.MinIdleConns,
PoolSize: cfg.PoolSize,
ConnMaxIdleTime: cfg.IdleTimeout,
ConnMaxLifetime: cfg.MaxConnAge,
var c redis.UniversalClient

if cfg.RedisMode == RedisModeCluster {
o := &redis.ClusterOptions{
// When working with a standard Redis Cluster, it is expected the address being a list of addresses separated by commas (,)
// When working with CGP MemoryStore Redis Cluster, it is expected the address being a single address - the discovery address.
Addrs: parseRedisAddress(cfg.Address),
MinIdleConns: cfg.MinIdleConns,
PoolSize: cfg.PoolSize,
ConnMaxIdleTime: cfg.IdleTimeout,
ConnMaxLifetime: cfg.MaxConnAge,
}

c = redis.NewClusterClient(o)
} else {
// By default, if no RedisMode is supplied, Single mode will be selected.
// This is to be retro compatible with previous versions of OpenSaves.
// In this case the address is expected to be a single address.
o := &redis.Options{
Addr: cfg.Address,
MinIdleConns: cfg.MinIdleConns,
PoolSize: cfg.PoolSize,
ConnMaxIdleTime: cfg.IdleTimeout,
ConnMaxLifetime: cfg.MaxConnAge,
}

c = redis.NewClient(o)
}

c := redis.NewUniversalClient(o)

err := redisotel.InstrumentMetrics(c)
if err != nil {
log.Errorf("got error adding metric instrumentation to redis client: %v", err)
Expand All @@ -65,11 +95,11 @@ func NewRedisWithConfig(cfg *config.RedisConfig) *Redis {
}
}

// Parse the input Redis address by splitting the list of addresses separated by commas (,)
// parseRedisAddress Parse the input Redis address by splitting the list of addresses separated by commas (,)
func parseRedisAddress(address string) []string {
addresses := []string{}

for _, foundAddr := range strings.Split(address, ",") {
for _, foundAddr := range strings.Split(address, redisClusterSeparator) {
addresses = append(addresses, strings.TrimSpace(foundAddr))
}

Expand Down
52 changes: 52 additions & 0 deletions internal/pkg/cache/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package redis
import (
"context"
"github.com/alicebob/miniredis/v2"
"github.com/googleforgames/open-saves/internal/pkg/config"
redis "github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -68,6 +69,57 @@ func TestRedis_All(t *testing.T) {
assert.Empty(t, keys)
}

func TestRedis_ClusterAll(t *testing.T) {
ctx := context.Background()

// Use miniredis for tests.
s := miniredis.RunT(t)
// Configure Redis to use the cluster client.
cfg := config.RedisConfig {
Address: s.Addr(),
RedisMode: RedisModeCluster,
}
r := NewRedisWithConfig(&cfg)
require.NotNil(t, r)
// Assert that we are getting a ClusterClient.
assert.IsType(t, &redis.ClusterClient{}, r.c)

assert.NoError(t, r.FlushAll(ctx))

keys, err := r.ListKeys(ctx)
assert.NoError(t, err)
assert.Empty(t, keys)

_, err = r.Get(ctx, "unknown")
assert.Error(t, err)

by := []byte("byte")
assert.NoError(t, r.Set(ctx, "hello", by, 0))

val, err := r.Get(ctx, "hello")
assert.NoError(t, err)
assert.Equal(t, by, val)

// test with TTL. The resolution is one millisecond.
assert.NoError(t, r.Set(ctx, "withTTL", by, 1*time.Millisecond))
s.FastForward(2 * time.Millisecond)
val, err = r.Get(ctx, "withTTL")
assert.ErrorIs(t, redis.Nil, err)
assert.Nil(t, val)

keys, err = r.ListKeys(ctx)
assert.NoError(t, err)
assert.Equal(t, []string{"hello"}, keys)

assert.NoError(t, r.Delete(ctx, "hello"))

assert.NoError(t, r.FlushAll(ctx))

keys, err = r.ListKeys(ctx)
assert.NoError(t, err)
assert.Empty(t, keys)
}

// Test parsing of Redis addresses works as expected.
func TestRedisParseRedisAddress(t *testing.T) {
type ParseRedisAddressFixture struct {
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/config/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func Load(path string) (*ServiceConfig, error) {
// Redis configuration
redisConfig := RedisConfig{
Address: viper.GetString(RedisAddress),
RedisMode: viper.GetString(RedisMode),
MaxRetries: viper.GetInt(RedisMaxRetries),
MinRetyBackoff: viper.GetDuration(RedisMinRetryBackoff),
MaxRetryBackoff: viper.GetDuration(RedisMaxRetryBackoff),
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/config/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
CacheDefaultTTL = "cache_default_ttl"

RedisAddress = "redis_address"
RedisMode = "redis_mode"
RedisMinIdleConns = "redis_min_idle_conns"
RedisPoolSize = "redis_pool_size"
RedisIdleTimeout = "redis_idle_timeout"
Expand Down Expand Up @@ -85,6 +86,7 @@ type CacheConfig struct {
// RedisConfig as defined in https://pkg.go.dev/github.com/redis/go-redis/v9#Options
type RedisConfig struct {
Address string
RedisMode string

MaxRetries int
MinRetyBackoff time.Duration
Expand Down