gcp/saver: Only return errors.KindAlreadyExists if all three exist (#1957)

* gcp/saver: Only return errors.KindAlreadyExists if all three exist

In #1124, a GCP lock type was added as a singleflight backend. As part of this work, the GCP backend's Save() was made serial, likely because moduploader.Upload requires a call to Exists() before it, rendering the GCP lock less useful, by doubling the calls to GCS.

However, by doing this, the existence check was now only checking the existence of the mod file, and not the info or zip. This meant that if during a Save, the zip or info uploads failed, on subsequent rquests, that when using the GCP singleflight backend, Athens would assume everything had been stashed and saved properly, and then fail to serve up the info or zip that had failed upload, meaning the cache was in an unhealable broklen state, requiring a manual intervention.

To fix this, without breaking the singleflight behavior, introduce a metadata key that is set on the mod file during its initial upload, indicating that a Stash is still in progress on subsequent files, which gets removed once all three files are uploaded successfully, which can be checked if it it is determined that the mod file already exists. That way we can return a errors.KindAlreadyExists if a Stash is in progress, but also properly return it when a Stash is *not* currently in progress if and only if all three files exist on GCS, which prevents the cache from becoming permanently poisoned.

One note is that it is possible the GCS call to remove the metadata key fails, which would mean it is left on the mod object forever. To avoid this, consider it stale after 2 minutes.

---------

Signed-off-by: Derek Buitenhuis <derek.buitenhuis@gmail.com>
Co-authored-by: Matt <matt.ouille@protonmail.com>
This commit is contained in:
Derek Buitenhuis
2024-06-02 20:32:54 +01:00
committed by GitHub
parent c1891f148e
commit 0ef761cc8b
10 changed files with 255 additions and 17 deletions
+3 -3
View File
@@ -101,7 +101,7 @@ func addProxyRoutes(
lister := module.NewVCSLister(c.GoBinary, c.GoBinaryEnvVars, fs) lister := module.NewVCSLister(c.GoBinary, c.GoBinaryEnvVars, fs)
checker := storage.WithChecker(s) checker := storage.WithChecker(s)
withSingleFlight, err := getSingleFlight(l, c, checker) withSingleFlight, err := getSingleFlight(l, c, s, checker)
if err != nil { if err != nil {
return err return err
} }
@@ -137,7 +137,7 @@ func (l *athensLoggerForRedis) Printf(ctx context.Context, format string, v ...a
l.logger.WithContext(ctx).Printf(format, v...) l.logger.WithContext(ctx).Printf(format, v...)
} }
func getSingleFlight(l *log.Logger, c *config.Config, checker storage.Checker) (stash.Wrapper, error) { func getSingleFlight(l *log.Logger, c *config.Config, s storage.Backend, checker storage.Checker) (stash.Wrapper, error) {
switch c.SingleFlightType { switch c.SingleFlightType {
case "", "memory": case "", "memory":
return stash.WithSingleflight, nil return stash.WithSingleflight, nil
@@ -173,7 +173,7 @@ func getSingleFlight(l *log.Logger, c *config.Config, checker storage.Checker) (
if c.StorageType != "gcp" { if c.StorageType != "gcp" {
return nil, fmt.Errorf("gcp SingleFlight only works with a gcp storage type and not: %v", c.StorageType) return nil, fmt.Errorf("gcp SingleFlight only works with a gcp storage type and not: %v", c.StorageType)
} }
return stash.WithGCSLock, nil return stash.WithGCSLock(c.SingleFlight.GCP.StaleThreshold, s)
case "azureblob": case "azureblob":
if c.StorageType != "azureblob" { if c.StorageType != "azureblob" {
return nil, fmt.Errorf("azureblob SingleFlight only works with a azureblob storage type and not: %v", c.StorageType) return nil, fmt.Errorf("azureblob SingleFlight only works with a azureblob storage type and not: %v", c.StorageType)
+4
View File
@@ -377,6 +377,10 @@ ShutdownTimeout = 60
# Max retries while acquiring the lock. Defaults to 10. # Max retries while acquiring the lock. Defaults to 10.
# Env override: ATHENS_REDIS_LOCK_MAX_RETRIES # Env override: ATHENS_REDIS_LOCK_MAX_RETRIES
MaxRetries = 10 MaxRetries = 10
[SingleFlight.GCP]
# Threshold for how long to wait in seconds for an in-progress GCP upload to
# be considered to have failed to unlock.
StaleThreshold = 120
[Storage] [Storage]
# Only storage backends that are specified in Proxy.StorageType are required here # Only storage backends that are specified in Proxy.StorageType are required here
[Storage.CDN] [Storage.CDN]
+11
View File
@@ -492,3 +492,14 @@ Optionally, like `redis`, you can also specify a password to connect to the `red
SentinelPassword = "sekret" SentinelPassword = "sekret"
Distributed lock options can be customised for redis sentinal as well, in a similar manner as described above for redis. Distributed lock options can be customised for redis sentinal as well, in a similar manner as described above for redis.
### Using GCP as a singleflight mechanism
The GCP singleflight mechanism does not required configuration, and works out of the box. It has a
single option with which it can be customized:
[SingleFlight.GCP]
# Threshold for how long to wait in seconds for an in-progress GCP upload to
# be considered to have failed to unlock.
StaleThreshold = 120
+1
View File
@@ -181,6 +181,7 @@ func defaultConfig() *Config {
SentinelPassword: "sekret", SentinelPassword: "sekret",
LockConfig: DefaultRedisLockConfig(), LockConfig: DefaultRedisLockConfig(),
}, },
GCP: DefaultGCPConfig(),
}, },
Index: &Index{ Index: &Index{
MySQL: &MySQL{ MySQL: &MySQL{
+3
View File
@@ -255,6 +255,7 @@ func TestParseExampleConfig(t *testing.T) {
LockConfig: DefaultRedisLockConfig(), LockConfig: DefaultRedisLockConfig(),
}, },
Etcd: &Etcd{Endpoints: "localhost:2379,localhost:22379,localhost:32379"}, Etcd: &Etcd{Endpoints: "localhost:2379,localhost:22379,localhost:32379"},
GCP: DefaultGCPConfig(),
} }
expConf := &Config{ expConf := &Config{
@@ -391,6 +392,8 @@ func getEnvMap(config *Config) map[string]string {
} else if singleFlight.Etcd != nil { } else if singleFlight.Etcd != nil {
envVars["ATHENS_SINGLE_FLIGHT_TYPE"] = "etcd" envVars["ATHENS_SINGLE_FLIGHT_TYPE"] = "etcd"
envVars["ATHENS_ETCD_ENDPOINTS"] = singleFlight.Etcd.Endpoints envVars["ATHENS_ETCD_ENDPOINTS"] = singleFlight.Etcd.Endpoints
} else if singleFlight.GCP != nil {
envVars["ATHENS_GCP_STALE_THRESHOLD"] = strconv.Itoa(singleFlight.GCP.StaleThreshold)
} }
} }
return envVars return envVars
+13
View File
@@ -7,6 +7,7 @@ type SingleFlight struct {
Etcd *Etcd Etcd *Etcd
Redis *Redis Redis *Redis
RedisSentinel *RedisSentinel RedisSentinel *RedisSentinel
GCP *GCP
} }
// Etcd holds client side configuration // Etcd holds client side configuration
@@ -48,3 +49,15 @@ func DefaultRedisLockConfig() *RedisLockConfig {
MaxRetries: 10, MaxRetries: 10,
} }
} }
// GCP is the configuration for GCP locking.
type GCP struct {
StaleThreshold int `envconfig:"ATHENS_GCP_STALE_THRESHOLD"`
}
// DefaultGCPConfig returns the default GCP locking configuration.
func DefaultGCPConfig() *GCP {
return &GCP{
StaleThreshold: 120,
}
}
+19 -2
View File
@@ -2,15 +2,32 @@ package stash
import ( import (
"context" "context"
"fmt"
"time"
"github.com/gomods/athens/pkg/errors" "github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/observ" "github.com/gomods/athens/pkg/observ"
"github.com/gomods/athens/pkg/storage"
"github.com/gomods/athens/pkg/storage/gcp"
) )
// WithGCSLock returns a distributed singleflight // WithGCSLock returns a distributed singleflight
// using a GCS backend. See the config.toml documentation for details. // using a GCS backend. See the config.toml documentation for details.
func WithGCSLock(s Stasher) Stasher { func WithGCSLock(staleThreshold int, s storage.Backend) (Wrapper, error) {
return &gcsLock{s} if staleThreshold <= 0 {
return nil, errors.E("stash.WithGCSLock", fmt.Errorf("invalid stale threshold"))
}
// Since we *must* be using a GCP stoagfe backend, we can abuse this
// fact to mutate it, so that we can get our threshold into Save().
// Your instincts are correct, this is kind of gross.
gs, ok := s.(*gcp.Storage)
if !ok {
return nil, errors.E("stash.WithGCSLock", fmt.Errorf("GCP singleflight can only be used with GCP storage"))
}
gs.SetStaleThreshold(time.Duration(staleThreshold) * time.Second)
return func(s Stasher) Stasher {
return &gcsLock{s}
}, nil
} }
type gcsLock struct { type gcsLock struct {
+78 -1
View File
@@ -3,6 +3,7 @@ package stash
import ( import (
"bytes" "bytes"
"context" "context"
"fmt"
"io" "io"
"os" "os"
"strings" "strings"
@@ -17,6 +18,12 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
type failReader int
func (f *failReader) Read([]byte) (int, error) {
return 0, fmt.Errorf("failure")
}
// TestWithGCS requires a real GCP backend implementation // TestWithGCS requires a real GCP backend implementation
// and it will ensure that saving to modules at the same time // and it will ensure that saving to modules at the same time
// is done synchronously so that only the first module gets saved. // is done synchronously so that only the first module gets saved.
@@ -41,7 +48,11 @@ func TestWithGCS(t *testing.T) {
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
content := uuid.New().String() content := uuid.New().String()
ms := &mockGCPStasher{strg, content} ms := &mockGCPStasher{strg, content}
s := WithGCSLock(ms) gs, err := WithGCSLock(120, strg)
if err != nil {
t.Fatal(err)
}
s := gs(ms)
eg.Go(func() error { eg.Go(func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel() defer cancel()
@@ -79,6 +90,72 @@ func TestWithGCS(t *testing.T) {
} }
} }
// TestWithGCSPartialFailure equires a real GCP backend implementation
// and ensures that if one of the non-singleflight-lock files fails to
// upload, that the cache does not remain poisoned.
func TestWithGCSPartialFailure(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
const (
mod = "stashmod"
ver = "v1.0.0"
)
strg := getStorage(t)
strg.Delete(ctx, mod, ver)
defer strg.Delete(ctx, mod, ver)
// sanity check
_, err := strg.GoMod(ctx, mod, ver)
if !errors.Is(err, errors.KindNotFound) {
t.Fatalf("expected the stash bucket to return a NotFound error but got: %v", err)
}
content := uuid.New().String()
ms := &mockGCPStasher{strg, content}
fr := new(failReader)
gs, err := WithGCSLock(120, strg)
if err != nil {
t.Fatal(err)
}
s := gs(ms)
// We simulate a failure by manually passing an io.Reader that will fail.
err = ms.strg.Save(ctx, "stashmod", "v1.0.0", []byte(ms.content), fr, []byte(ms.content))
if err == nil {
// We *want* to fail.
t.Fatal(err)
}
// Now try a Stash. This should upload the missing files.
_, err = s.Stash(ctx, "stashmod", "v1.0.0")
if err != nil {
t.Fatal(err)
}
info, err := strg.Info(ctx, mod, ver)
if err != nil {
t.Fatal(err)
}
modContent, err := strg.GoMod(ctx, mod, ver)
if err != nil {
t.Fatal(err)
}
zip, err := strg.Zip(ctx, mod, ver)
if err != nil {
t.Fatal(err)
}
defer zip.Close()
zipContent, err := io.ReadAll(zip)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(info, modContent) {
t.Fatalf("expected info and go.mod to be equal but info was {%v} and content was {%v}", string(info), string(modContent))
}
if !bytes.Equal(info, zipContent) {
t.Fatalf("expected info and zip to be equal but info was {%v} and content was {%v}", string(info), string(zipContent))
}
}
// mockGCPStasher is like mockStasher // mockGCPStasher is like mockStasher
// but leverages in memory storage // but leverages in memory storage
// so that redis can determine // so that redis can determine
+3 -2
View File
@@ -15,8 +15,9 @@ import (
// Storage implements the (./pkg/storage).Backend interface. // Storage implements the (./pkg/storage).Backend interface.
type Storage struct { type Storage struct {
bucket *storage.BucketHandle bucket *storage.BucketHandle
timeout time.Duration timeout time.Duration
staleThreshold time.Duration
} }
// New returns a new Storage instance backed by a Google Cloud Storage bucket. // New returns a new Storage instance backed by a Google Cloud Storage bucket.
+120 -9
View File
@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"io" "io"
"time"
"cloud.google.com/go/storage" "cloud.google.com/go/storage"
"github.com/gomods/athens/pkg/config" "github.com/gomods/athens/pkg/config"
@@ -12,6 +13,10 @@ import (
googleapi "google.golang.org/api/googleapi" googleapi "google.golang.org/api/googleapi"
) )
// Fallback for how long we consider an "in_progress" metadata key stale,
// due to failure to remove it.
const fallbackInProgressStaleThreshold = 2 * time.Minute
// Save uploads the module's .mod, .zip and .info files for a given version // Save uploads the module's .mod, .zip and .info files for a given version
// It expects a context, which can be provided using context.Background // It expects a context, which can be provided using context.Background
// from the standard library until context has been threaded down the stack. // from the standard library until context has been threaded down the stack.
@@ -20,40 +25,146 @@ import (
// Uploaded files are publicly accessible in the storage bucket as per // Uploaded files are publicly accessible in the storage bucket as per
// an ACL rule. // an ACL rule.
func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error { func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error {
const op errors.Op = "gcp.Save" const op errors.Op = "gcp.save"
ctx, span := observ.StartSpan(ctx, op.String()) ctx, span := observ.StartSpan(ctx, op.String())
defer span.End() defer span.End()
gomodPath := config.PackageVersionedName(module, version, "mod") gomodPath := config.PackageVersionedName(module, version, "mod")
err := s.upload(ctx, gomodPath, bytes.NewReader(mod)) innerErr := s.save(ctx, module, version, mod, zip, info)
if err != nil { if errors.Is(innerErr, errors.KindAlreadyExists) {
// Cache hit.
return errors.E(op, innerErr)
}
// No cache hit. Remove the metadata lock if it is there.
inProgress, outerErr := s.checkUploadInProgress(ctx, gomodPath)
if outerErr != nil {
return errors.E(op, outerErr)
}
if inProgress {
outerErr = s.removeInProgressMetadata(ctx, gomodPath)
if outerErr != nil {
return errors.E(op, outerErr)
}
}
return innerErr
}
// SetStaleThreshold sets the threshold of how long we consider
// a lock metadata stale after.
func (s *Storage) SetStaleThreshold(threshold time.Duration) {
s.staleThreshold = threshold
}
func (s *Storage) save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error {
const op errors.Op = "gcp.save"
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
gomodPath := config.PackageVersionedName(module, version, "mod")
seenAlreadyExists := 0
err := s.upload(ctx, gomodPath, bytes.NewReader(mod), true)
// If it already exists, check the object metadata to see if the
// other two are still uploading in progress somewhere else. If they
// are, return a cache hit. If not, continue on to the other two,
// and only return a cache hit if all three exist.
if errors.Is(err, errors.KindAlreadyExists) {
inProgress, progressErr := s.checkUploadInProgress(ctx, gomodPath)
if progressErr != nil {
return errors.E(op, progressErr)
}
if inProgress {
// err is known to be errors.KindAlreadyExists at this point, so
// this is a cache hit return.
return errors.E(op, err)
}
seenAlreadyExists++
} else if err != nil {
// Other errors
return errors.E(op, err) return errors.E(op, err)
} }
zipPath := config.PackageVersionedName(module, version, "zip") zipPath := config.PackageVersionedName(module, version, "zip")
err = s.upload(ctx, zipPath, zip) err = s.upload(ctx, zipPath, zip, false)
if err != nil { if errors.Is(err, errors.KindAlreadyExists) {
seenAlreadyExists++
} else if err != nil {
return errors.E(op, err) return errors.E(op, err)
} }
infoPath := config.PackageVersionedName(module, version, "info") infoPath := config.PackageVersionedName(module, version, "info")
err = s.upload(ctx, infoPath, bytes.NewReader(info)) err = s.upload(ctx, infoPath, bytes.NewReader(info), false)
// Have all three returned errors.KindAlreadyExists?
if errors.Is(err, errors.KindAlreadyExists) {
if seenAlreadyExists == 2 {
return errors.E(op, err)
}
} else if err != nil {
return errors.E(op, err)
}
return nil
}
func (s *Storage) removeInProgressMetadata(ctx context.Context, gomodPath string) error {
const op errors.Op = "gcp.removeInProgressMetadata"
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
_, err := s.bucket.Object(gomodPath).Update(ctx, storage.ObjectAttrsToUpdate{
Metadata: map[string]string{},
})
if err != nil { if err != nil {
return errors.E(op, err) return errors.E(op, err)
} }
return nil return nil
} }
func (s *Storage) upload(ctx context.Context, path string, stream io.Reader) error { func (s *Storage) checkUploadInProgress(ctx context.Context, gomodPath string) (bool, error) {
const op errors.Op = "gcp.checkUploadInProgress"
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
attrs, err := s.bucket.Object(gomodPath).Attrs(ctx)
if err != nil {
return false, errors.E(op, err)
}
// If we have a config-set lock threshold, i.e. we are using the GCP
// slightflight backend, use it. Otherwise, use the fallback, which
// is arguably irrelevant when not using GCP for singleflighting.
threshold := fallbackInProgressStaleThreshold
if s.staleThreshold > 0 {
threshold = s.staleThreshold
}
if attrs.Metadata != nil {
_, ok := attrs.Metadata["in_progress"]
if ok {
// In case the final call to remove the metadata fails for some reason,
// we have a threshold after which we consider this to be stale.
if time.Since(attrs.Created) > threshold {
return false, nil
}
return true, nil
}
}
return false, nil
}
func (s *Storage) upload(ctx context.Context, path string, stream io.Reader, first bool) error {
const op errors.Op = "gcp.upload" const op errors.Op = "gcp.upload"
ctx, span := observ.StartSpan(ctx, op.String()) ctx, span := observ.StartSpan(ctx, op.String())
defer span.End() defer span.End()
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()
wc := s.bucket.Object(path).If(storage.Conditions{ wc := s.bucket.Object(path).If(storage.Conditions{
DoesNotExist: true, DoesNotExist: true,
}).NewWriter(ctx) }).NewWriter(cancelCtx)
// We set this metadata only for the first of the three files uploaded,
// for use as a singleflight lock.
if first {
wc.ObjectAttrs.Metadata = make(map[string]string)
wc.ObjectAttrs.Metadata["in_progress"] = "true"
}
// NOTE: content type is auto detected on GCP side and ACL defaults to public // NOTE: content type is auto detected on GCP side and ACL defaults to public
// Once we support private storage buckets this may need refactoring // Once we support private storage buckets this may need refactoring
// unless there is a way to set the default perms in the project. // unless there is a way to set the default perms in the project.
if _, err := io.Copy(wc, stream); err != nil { if _, err := io.Copy(wc, stream); err != nil {
_ = wc.Close() // Purposely do not close it to avoid creating a partial file.
return err return err
} }