mirror of
https://github.com/gomods/athens
synced 2026-02-03 08:40:31 +00:00
Fix GCP Storage Backend Locking Issue (#2051)
This commit is contained in:
@@ -15,8 +15,9 @@ import (
|
||||
|
||||
// Storage implements the (./pkg/storage).Backend interface.
|
||||
type Storage struct {
|
||||
bucket *storage.BucketHandle
|
||||
timeout time.Duration
|
||||
bucket *storage.BucketHandle
|
||||
timeout time.Duration
|
||||
// Deprecated: left for config backwards compatibility.
|
||||
staleThreshold time.Duration
|
||||
}
|
||||
|
||||
|
||||
+30
-100
@@ -13,10 +13,6 @@ import (
|
||||
googleapi "google.golang.org/api/googleapi"
|
||||
)
|
||||
|
||||
// Fallback for how long we consider an "in_progress" metadata key stale,
|
||||
// due to failure to remove it.
|
||||
const fallbackInProgressStaleThreshold = 2 * time.Minute
|
||||
|
||||
// Save uploads the module's .mod, .zip and .info files for a given version
|
||||
// It expects a context, which can be provided using context.Background
|
||||
// from the standard library until context has been threaded down the stack.
|
||||
@@ -28,24 +24,11 @@ func (s *Storage) Save(ctx context.Context, module, version string, mod []byte,
|
||||
const op errors.Op = "gcp.save"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
gomodPath := config.PackageVersionedName(module, version, "mod")
|
||||
innerErr := s.save(ctx, module, version, mod, zip, info)
|
||||
if errors.Is(innerErr, errors.KindAlreadyExists) {
|
||||
// Cache hit.
|
||||
return errors.E(op, innerErr)
|
||||
err := s.save(ctx, module, version, mod, zip, info)
|
||||
if err != nil {
|
||||
return errors.E(op, err)
|
||||
}
|
||||
// No cache hit. Remove the metadata lock if it is there.
|
||||
inProgress, outerErr := s.checkUploadInProgress(ctx, gomodPath)
|
||||
if outerErr != nil {
|
||||
return errors.E(op, outerErr)
|
||||
}
|
||||
if inProgress {
|
||||
outerErr = s.removeInProgressMetadata(ctx, gomodPath)
|
||||
if outerErr != nil {
|
||||
return errors.E(op, outerErr)
|
||||
}
|
||||
}
|
||||
return innerErr
|
||||
return err
|
||||
}
|
||||
|
||||
// SetStaleThreshold sets the threshold of how long we consider
|
||||
@@ -58,108 +41,55 @@ func (s *Storage) save(ctx context.Context, module, version string, mod []byte,
|
||||
const op errors.Op = "gcp.save"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
|
||||
gomodPath := config.PackageVersionedName(module, version, "mod")
|
||||
seenAlreadyExists := 0
|
||||
err := s.upload(ctx, gomodPath, bytes.NewReader(mod), true)
|
||||
// If it already exists, check the object metadata to see if the
|
||||
// other two are still uploading in progress somewhere else. If they
|
||||
// are, return a cache hit. If not, continue on to the other two,
|
||||
// and only return a cache hit if all three exist.
|
||||
if errors.Is(err, errors.KindAlreadyExists) {
|
||||
inProgress, progressErr := s.checkUploadInProgress(ctx, gomodPath)
|
||||
if progressErr != nil {
|
||||
return errors.E(op, progressErr)
|
||||
}
|
||||
if inProgress {
|
||||
// err is known to be errors.KindAlreadyExists at this point, so
|
||||
// this is a cache hit return.
|
||||
return errors.E(op, err)
|
||||
}
|
||||
seenAlreadyExists++
|
||||
} else if err != nil {
|
||||
// Other errors
|
||||
err := s.upload(ctx, gomodPath, bytes.NewReader(mod), false)
|
||||
// KindAlreadyExists means the file is uploaded (somewhere else) successfully.
|
||||
if err != nil && !errors.Is(err, errors.KindAlreadyExists) {
|
||||
return errors.E(op, err)
|
||||
}
|
||||
|
||||
zipPath := config.PackageVersionedName(module, version, "zip")
|
||||
err = s.upload(ctx, zipPath, zip, false)
|
||||
if errors.Is(err, errors.KindAlreadyExists) {
|
||||
seenAlreadyExists++
|
||||
} else if err != nil {
|
||||
err = s.upload(ctx, zipPath, zip, true)
|
||||
if err != nil && !errors.Is(err, errors.KindAlreadyExists) {
|
||||
return errors.E(op, err)
|
||||
}
|
||||
|
||||
infoPath := config.PackageVersionedName(module, version, "info")
|
||||
err = s.upload(ctx, infoPath, bytes.NewReader(info), false)
|
||||
// Have all three returned errors.KindAlreadyExists?
|
||||
if errors.Is(err, errors.KindAlreadyExists) {
|
||||
if seenAlreadyExists == 2 {
|
||||
return errors.E(op, err)
|
||||
}
|
||||
} else if err != nil {
|
||||
if err != nil && !errors.Is(err, errors.KindAlreadyExists) {
|
||||
return errors.E(op, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Storage) removeInProgressMetadata(ctx context.Context, gomodPath string) error {
|
||||
const op errors.Op = "gcp.removeInProgressMetadata"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
_, err := s.bucket.Object(gomodPath).Update(ctx, storage.ObjectAttrsToUpdate{
|
||||
Metadata: map[string]string{},
|
||||
})
|
||||
if err != nil {
|
||||
return errors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Storage) checkUploadInProgress(ctx context.Context, gomodPath string) (bool, error) {
|
||||
const op errors.Op = "gcp.checkUploadInProgress"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
attrs, err := s.bucket.Object(gomodPath).Attrs(ctx)
|
||||
if err != nil {
|
||||
return false, errors.E(op, err)
|
||||
}
|
||||
// If we have a config-set lock threshold, i.e. we are using the GCP
|
||||
// slightflight backend, use it. Otherwise, use the fallback, which
|
||||
// is arguably irrelevant when not using GCP for singleflighting.
|
||||
threshold := fallbackInProgressStaleThreshold
|
||||
if s.staleThreshold > 0 {
|
||||
threshold = s.staleThreshold
|
||||
}
|
||||
if attrs.Metadata != nil {
|
||||
_, ok := attrs.Metadata["in_progress"]
|
||||
if ok {
|
||||
// In case the final call to remove the metadata fails for some reason,
|
||||
// we have a threshold after which we consider this to be stale.
|
||||
if time.Since(attrs.Created) > threshold {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (s *Storage) upload(ctx context.Context, path string, stream io.Reader, first bool) error {
|
||||
func (s *Storage) upload(ctx context.Context, path string, stream io.Reader, checkBefore bool) error {
|
||||
const op errors.Op = "gcp.upload"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
cancelCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
if checkBefore {
|
||||
// Check whether the file already exists before uploading.
|
||||
// Note that this is not for preventing the same file from being uploaded multiple times,
|
||||
// but only a small optimization to avoid unnecessary uploads for large files (in particular .zip file).
|
||||
_, err := s.bucket.Object(path).Attrs(cancelCtx)
|
||||
if err == nil {
|
||||
// The file already exists, no need to upload it again.
|
||||
return nil
|
||||
} else if !errors.IsErr(err, storage.ErrObjectNotExist) {
|
||||
// Not expected error, return it.
|
||||
return errors.E(op, err)
|
||||
}
|
||||
// Otherwise, the error is ErrObjectNotExist, so we should upload the file.
|
||||
}
|
||||
|
||||
wc := s.bucket.Object(path).If(storage.Conditions{
|
||||
DoesNotExist: true,
|
||||
}).NewWriter(cancelCtx)
|
||||
|
||||
// We set this metadata only for the first of the three files uploaded,
|
||||
// for use as a singleflight lock.
|
||||
if first {
|
||||
wc.Metadata = make(map[string]string)
|
||||
wc.Metadata["in_progress"] = "true"
|
||||
}
|
||||
|
||||
// NOTE: content type is auto detected on GCP side and ACL defaults to public
|
||||
// Once we support private storage buckets this may need refactoring
|
||||
// unless there is a way to set the default perms in the project.
|
||||
|
||||
Reference in New Issue
Block a user