mirror of
https://github.com/gomods/athens
synced 2026-02-03 11:00:32 +00:00
read redis lock options from config to support custom TTL & timeout (#1791)
* read redis lock options from config to support custom TTL & timeout * fix test * fix typo * downgrade to bsm/redislock@v0.7.2 to prevent usage of beta go-redis version * revert test changes * return error for invalid lock config * update config parsing test * udpate docs to include redis lock config * fix test * set default max retries to 10 * reduce default redis lock timeout to 15s * update default TTL to 15mins Co-authored-by: Manu Gupta <manugupt1@gmail.com>
This commit is contained in:
@@ -167,11 +167,12 @@ func defaultConfig() *Config {
|
||||
IndexType: "none",
|
||||
SingleFlight: &SingleFlight{
|
||||
Etcd: &Etcd{"localhost:2379,localhost:22379,localhost:32379"},
|
||||
Redis: &Redis{"127.0.0.1:6379", ""},
|
||||
Redis: &Redis{"127.0.0.1:6379", "", DefaultRedisLockConfig()},
|
||||
RedisSentinel: &RedisSentinel{
|
||||
Endpoints: []string{"127.0.0.1:26379"},
|
||||
MasterName: "redis-1",
|
||||
SentinelPassword: "sekret",
|
||||
LockConfig: DefaultRedisLockConfig(),
|
||||
},
|
||||
},
|
||||
Index: &Index{
|
||||
|
||||
@@ -23,9 +23,9 @@ func testConfigFile(t *testing.T) (testConfigFile string) {
|
||||
return testConfigFile
|
||||
}
|
||||
|
||||
func compareConfigs(parsedConf *Config, expConf *Config, t *testing.T) {
|
||||
func compareConfigs(parsedConf *Config, expConf *Config, t *testing.T, ignoreTypes ...interface{}) {
|
||||
t.Helper()
|
||||
opts := cmpopts.IgnoreTypes(Storage{}, SingleFlight{}, Index{})
|
||||
opts := cmpopts.IgnoreTypes(append([]interface{}{Index{}}, ignoreTypes...)...)
|
||||
eq := cmp.Equal(parsedConf, expConf, opts)
|
||||
if !eq {
|
||||
diff := cmp.Diff(parsedConf, expConf, opts)
|
||||
@@ -108,7 +108,7 @@ func TestEnvOverrides(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Env override failed: %v", err)
|
||||
}
|
||||
compareConfigs(conf, expConf, t)
|
||||
compareConfigs(conf, expConf, t, Storage{}, SingleFlight{})
|
||||
}
|
||||
|
||||
func TestEnvOverridesPreservingPort(t *testing.T) {
|
||||
@@ -261,6 +261,27 @@ func TestParseExampleConfig(t *testing.T) {
|
||||
Token: "",
|
||||
Bucket: "MY_S3_BUCKET_NAME",
|
||||
},
|
||||
AzureBlob: &AzureBlobConfig{
|
||||
AccountName: "MY_AZURE_BLOB_ACCOUNT_NAME",
|
||||
AccountKey: "MY_AZURE_BLOB_ACCOUNT_KEY",
|
||||
ContainerName: "MY_AZURE_BLOB_CONTAINER_NAME",
|
||||
},
|
||||
External: &External{URL: ""},
|
||||
}
|
||||
|
||||
expSingleFlight := &SingleFlight{
|
||||
Redis: &Redis{
|
||||
Endpoint: "127.0.0.1:6379",
|
||||
Password: "",
|
||||
LockConfig: DefaultRedisLockConfig(),
|
||||
},
|
||||
RedisSentinel: &RedisSentinel{
|
||||
Endpoints: []string{"127.0.0.1:26379"},
|
||||
MasterName: "redis-1",
|
||||
SentinelPassword: "sekret",
|
||||
LockConfig: DefaultRedisLockConfig(),
|
||||
},
|
||||
Etcd: &Etcd{Endpoints: "localhost:2379,localhost:22379,localhost:32379"},
|
||||
}
|
||||
|
||||
expConf := &Config{
|
||||
@@ -287,7 +308,7 @@ func TestParseExampleConfig(t *testing.T) {
|
||||
StatsExporter: "prometheus",
|
||||
SingleFlightType: "memory",
|
||||
GoBinaryEnvVars: []string{"GOPROXY=direct"},
|
||||
SingleFlight: &SingleFlight{},
|
||||
SingleFlight: expSingleFlight,
|
||||
SumDBs: []string{"https://sum.golang.org"},
|
||||
NoSumPatterns: []string{},
|
||||
DownloadMode: "sync",
|
||||
@@ -370,6 +391,33 @@ func getEnvMap(config *Config) map[string]string {
|
||||
envVars["ATHENS_S3_BUCKET_NAME"] = storage.S3.Bucket
|
||||
}
|
||||
}
|
||||
|
||||
singleFlight := config.SingleFlight
|
||||
if singleFlight != nil {
|
||||
if singleFlight.Redis != nil {
|
||||
envVars["ATHENS_SINGLE_FLIGHT_TYPE"] = "redis"
|
||||
envVars["ATHENS_REDIS_ENDPOINT"] = singleFlight.Redis.Endpoint
|
||||
envVars["ATHENS_REDIS_PASSWORD"] = singleFlight.Redis.Endpoint
|
||||
if singleFlight.Redis.LockConfig != nil {
|
||||
envVars["ATHENS_REDIS_LOCK_TTL"] = strconv.Itoa(singleFlight.Redis.LockConfig.TTL)
|
||||
envVars["ATHENS_REDIS_LOCK_TIMEOUT"] = strconv.Itoa(singleFlight.Redis.LockConfig.Timeout)
|
||||
envVars["ATHENS_REDIS_LOCK_MAX_RETRIES"] = strconv.Itoa(singleFlight.Redis.LockConfig.MaxRetries)
|
||||
}
|
||||
} else if singleFlight.RedisSentinel != nil {
|
||||
envVars["ATHENS_SINGLE_FLIGHT_TYPE"] = "redis-sentinel"
|
||||
envVars["ATHENS_REDIS_SENTINEL_ENDPOINTS"] = strings.Join(singleFlight.RedisSentinel.Endpoints, ",")
|
||||
envVars["ATHENS_REDIS_SENTINEL_MASTER_NAME"] = singleFlight.RedisSentinel.MasterName
|
||||
envVars["ATHENS_REDIS_SENTINEL_PASSWORD"] = singleFlight.RedisSentinel.SentinelPassword
|
||||
if singleFlight.RedisSentinel.LockConfig != nil {
|
||||
envVars["ATHENS_REDIS_LOCK_TTL"] = strconv.Itoa(singleFlight.RedisSentinel.LockConfig.TTL)
|
||||
envVars["ATHENS_REDIS_LOCK_TIMEOUT"] = strconv.Itoa(singleFlight.RedisSentinel.LockConfig.Timeout)
|
||||
envVars["ATHENS_REDIS_LOCK_MAX_RETRIES"] = strconv.Itoa(singleFlight.RedisSentinel.LockConfig.MaxRetries)
|
||||
}
|
||||
} else if singleFlight.Etcd != nil {
|
||||
envVars["ATHENS_SINGLE_FLIGHT_TYPE"] = "etcd"
|
||||
envVars["ATHENS_ETCD_ENDPOINTS"] = singleFlight.Etcd.Endpoints
|
||||
}
|
||||
}
|
||||
return envVars
|
||||
}
|
||||
|
||||
|
||||
@@ -19,8 +19,9 @@ type Etcd struct {
|
||||
// Redis holds the client side configuration
|
||||
// to connect to redis as a SingleFlight implementation.
|
||||
type Redis struct {
|
||||
Endpoint string `envconfig:"ATHENS_REDIS_ENDPOINT"`
|
||||
Password string `envconfig:"ATHENS_REDIS_PASSWORD"`
|
||||
Endpoint string `envconfig:"ATHENS_REDIS_ENDPOINT"`
|
||||
Password string `envconfig:"ATHENS_REDIS_PASSWORD"`
|
||||
LockConfig *RedisLockConfig
|
||||
}
|
||||
|
||||
// RedisSentinel is the configuration for using redis with sentinel
|
||||
@@ -29,4 +30,19 @@ type RedisSentinel struct {
|
||||
Endpoints []string `envconfig:"ATHENS_REDIS_SENTINEL_ENDPOINTS"`
|
||||
MasterName string `envconfig:"ATHENS_REDIS_SENTINEL_MASTER_NAME"`
|
||||
SentinelPassword string `envconfig:"ATHENS_REDIS_SENTINEL_PASSWORD"`
|
||||
LockConfig *RedisLockConfig
|
||||
}
|
||||
|
||||
type RedisLockConfig struct {
|
||||
Timeout int `envconfig:"ATHENS_REDIS_LOCK_TIMEOUT"`
|
||||
TTL int `envconfig:"ATHENS_REDIS_LOCK_TTL"`
|
||||
MaxRetries int `envconfig:"ATHENS_REDIS_LOCK_MAX_RETRIES"`
|
||||
}
|
||||
|
||||
func DefaultRedisLockConfig() *RedisLockConfig {
|
||||
return &RedisLockConfig{
|
||||
TTL: 900,
|
||||
Timeout: 15,
|
||||
MaxRetries: 10,
|
||||
}
|
||||
}
|
||||
|
||||
+36
-10
@@ -2,10 +2,11 @@ package stash
|
||||
|
||||
import (
|
||||
"context"
|
||||
goerrors "errors"
|
||||
"time"
|
||||
|
||||
lock "github.com/bsm/redislock"
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/bsm/redislock"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/gomods/athens/pkg/config"
|
||||
"github.com/gomods/athens/pkg/errors"
|
||||
"github.com/gomods/athens/pkg/observ"
|
||||
@@ -13,28 +14,51 @@ import (
|
||||
)
|
||||
|
||||
// WithRedisLock returns a distributed singleflight
|
||||
// using an redis cluster. If it cannot connect, it will return an error.
|
||||
func WithRedisLock(endpoint string, password string, checker storage.Checker) (Wrapper, error) {
|
||||
// using a redis cluster. If it cannot connect, it will return an error.
|
||||
func WithRedisLock(endpoint string, password string, checker storage.Checker, lockConfig *config.RedisLockConfig) (Wrapper, error) {
|
||||
const op errors.Op = "stash.WithRedisLock"
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Network: "tcp",
|
||||
Addr: endpoint,
|
||||
Password: password,
|
||||
})
|
||||
_, err := client.Ping().Result()
|
||||
_, err := client.Ping(context.Background()).Result()
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
|
||||
lockOptions, err := lockOptionsFromConfig(lockConfig)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
|
||||
return func(s Stasher) Stasher {
|
||||
return &redisLock{client, s, checker}
|
||||
return &redisLock{client, s, checker, lockOptions}
|
||||
}, nil
|
||||
}
|
||||
|
||||
func lockOptionsFromConfig(lockConfig *config.RedisLockConfig) (redisLockOptions, error) {
|
||||
if lockConfig.TTL <= 0 || lockConfig.Timeout <= 0 || lockConfig.MaxRetries <= 0 {
|
||||
return redisLockOptions{}, goerrors.New("invalid lock options")
|
||||
}
|
||||
return redisLockOptions{
|
||||
ttl: time.Duration(lockConfig.TTL) * time.Second,
|
||||
timeout: time.Duration(lockConfig.Timeout) * time.Second,
|
||||
maxRetries: lockConfig.MaxRetries,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type redisLockOptions struct {
|
||||
ttl time.Duration
|
||||
timeout time.Duration
|
||||
maxRetries int
|
||||
}
|
||||
|
||||
type redisLock struct {
|
||||
client *redis.Client
|
||||
stasher Stasher
|
||||
checker storage.Checker
|
||||
options redisLockOptions
|
||||
}
|
||||
|
||||
func (s *redisLock) Stash(ctx context.Context, mod, ver string) (newVer string, err error) {
|
||||
@@ -42,17 +66,19 @@ func (s *redisLock) Stash(ctx context.Context, mod, ver string) (newVer string,
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
mv := config.FmtModVer(mod, ver)
|
||||
lockCtx, cancel := context.WithTimeout(ctx, s.options.timeout)
|
||||
defer cancel()
|
||||
|
||||
// Obtain a new lock with default settings
|
||||
lock, err := lock.Obtain(s.client, mv, time.Minute*5, &lock.Options{
|
||||
RetryStrategy: lock.LimitRetry(lock.LinearBackoff(time.Second), 60*5),
|
||||
// Obtain a new lock using lock options
|
||||
lock, err := redislock.Obtain(lockCtx, s.client, mv, s.options.ttl, &redislock.Options{
|
||||
RetryStrategy: redislock.LimitRetry(redislock.LinearBackoff(time.Second), s.options.maxRetries),
|
||||
})
|
||||
if err != nil {
|
||||
return ver, errors.E(op, err)
|
||||
}
|
||||
defer func() {
|
||||
const op errors.Op = "redis.Release"
|
||||
lockErr := lock.Release()
|
||||
lockErr := lock.Release(ctx)
|
||||
if err == nil && lockErr != nil {
|
||||
err = errors.E(op, lockErr)
|
||||
}
|
||||
|
||||
@@ -1,14 +1,16 @@
|
||||
package stash
|
||||
|
||||
import (
|
||||
"github.com/go-redis/redis/v7"
|
||||
"context"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/gomods/athens/pkg/config"
|
||||
"github.com/gomods/athens/pkg/errors"
|
||||
"github.com/gomods/athens/pkg/storage"
|
||||
)
|
||||
|
||||
// WithRedisSentinelLock returns a distributed singleflight
|
||||
// with a redis cluster that utilizes sentinel for quorum and failover
|
||||
func WithRedisSentinelLock(endpoints []string, master, password string, checker storage.Checker) (Wrapper, error) {
|
||||
func WithRedisSentinelLock(endpoints []string, master, password string, checker storage.Checker, lockConfig *config.RedisLockConfig) (Wrapper, error) {
|
||||
const op errors.Op = "stash.WithRedisSentinelLock"
|
||||
// The redis client constructor does not return an error when no endpoints
|
||||
// are provided, so we check for ourselves.
|
||||
@@ -20,11 +22,17 @@ func WithRedisSentinelLock(endpoints []string, master, password string, checker
|
||||
SentinelAddrs: endpoints,
|
||||
SentinelPassword: password,
|
||||
})
|
||||
_, err := client.Ping().Result()
|
||||
_, err := client.Ping(context.Background()).Result()
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
|
||||
lockOptions, err := lockOptionsFromConfig(lockConfig)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
|
||||
return func(s Stasher) Stasher {
|
||||
return &redisLock{client, s, checker}
|
||||
return &redisLock{client, s, checker, lockOptions}
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gomods/athens/pkg/config"
|
||||
"github.com/gomods/athens/pkg/storage"
|
||||
"github.com/gomods/athens/pkg/storage/mem"
|
||||
"golang.org/x/sync/errgroup"
|
||||
@@ -27,7 +28,7 @@ func TestWithRedisSentinelLock(t *testing.T) {
|
||||
}
|
||||
ms := &mockRedisStasher{strg: strg}
|
||||
|
||||
wrapper, err := WithRedisSentinelLock([]string{endpoint}, masterName, password, storage.WithChecker(strg))
|
||||
wrapper, err := WithRedisSentinelLock([]string{endpoint}, masterName, password, storage.WithChecker(strg), config.DefaultRedisLockConfig())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gomods/athens/pkg/config"
|
||||
"github.com/gomods/athens/pkg/storage"
|
||||
"github.com/gomods/athens/pkg/storage/mem"
|
||||
"golang.org/x/sync/errgroup"
|
||||
@@ -28,7 +29,7 @@ func TestWithRedisLock(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ms := &mockRedisStasher{strg: strg}
|
||||
wrapper, err := WithRedisLock(endpoint, password, storage.WithChecker(strg))
|
||||
wrapper, err := WithRedisLock(endpoint, password, storage.WithChecker(strg), config.DefaultRedisLockConfig())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -63,7 +64,7 @@ func TestWithRedisLockWithPassword(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ms := &mockRedisStasher{strg: strg}
|
||||
wrapper, err := WithRedisLock(endpoint, password, storage.WithChecker(strg))
|
||||
wrapper, err := WithRedisLock(endpoint, password, storage.WithChecker(strg), config.DefaultRedisLockConfig())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -97,7 +98,7 @@ func TestWithRedisLockWithWrongPassword(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = WithRedisLock(endpoint, password, storage.WithChecker(strg))
|
||||
_, err = WithRedisLock(endpoint, password, storage.WithChecker(strg), config.DefaultRedisLockConfig())
|
||||
if err == nil {
|
||||
t.Fatal("Expected Connection Error")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user