diff --git a/pkg/storage/gcp/gcp.go b/pkg/storage/gcp/gcp.go index b502fe58..7a646980 100644 --- a/pkg/storage/gcp/gcp.go +++ b/pkg/storage/gcp/gcp.go @@ -15,8 +15,9 @@ import ( // Storage implements the (./pkg/storage).Backend interface. type Storage struct { - bucket *storage.BucketHandle - timeout time.Duration + bucket *storage.BucketHandle + timeout time.Duration + // Deprecated: left for config backwards compatibility. staleThreshold time.Duration } diff --git a/pkg/storage/gcp/saver.go b/pkg/storage/gcp/saver.go index 2378395c..24a4ed79 100644 --- a/pkg/storage/gcp/saver.go +++ b/pkg/storage/gcp/saver.go @@ -13,10 +13,6 @@ import ( googleapi "google.golang.org/api/googleapi" ) -// Fallback for how long we consider an "in_progress" metadata key stale, -// due to failure to remove it. -const fallbackInProgressStaleThreshold = 2 * time.Minute - // Save uploads the module's .mod, .zip and .info files for a given version // It expects a context, which can be provided using context.Background // from the standard library until context has been threaded down the stack. @@ -28,24 +24,11 @@ func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, const op errors.Op = "gcp.save" ctx, span := observ.StartSpan(ctx, op.String()) defer span.End() - gomodPath := config.PackageVersionedName(module, version, "mod") - innerErr := s.save(ctx, module, version, mod, zip, info) - if errors.Is(innerErr, errors.KindAlreadyExists) { - // Cache hit. - return errors.E(op, innerErr) + err := s.save(ctx, module, version, mod, zip, info) + if err != nil { + return errors.E(op, err) } - // No cache hit. Remove the metadata lock if it is there. - inProgress, outerErr := s.checkUploadInProgress(ctx, gomodPath) - if outerErr != nil { - return errors.E(op, outerErr) - } - if inProgress { - outerErr = s.removeInProgressMetadata(ctx, gomodPath) - if outerErr != nil { - return errors.E(op, outerErr) - } - } - return innerErr + return err } // SetStaleThreshold sets the threshold of how long we consider @@ -58,108 +41,55 @@ func (s *Storage) save(ctx context.Context, module, version string, mod []byte, const op errors.Op = "gcp.save" ctx, span := observ.StartSpan(ctx, op.String()) defer span.End() + gomodPath := config.PackageVersionedName(module, version, "mod") - seenAlreadyExists := 0 - err := s.upload(ctx, gomodPath, bytes.NewReader(mod), true) - // If it already exists, check the object metadata to see if the - // other two are still uploading in progress somewhere else. If they - // are, return a cache hit. If not, continue on to the other two, - // and only return a cache hit if all three exist. - if errors.Is(err, errors.KindAlreadyExists) { - inProgress, progressErr := s.checkUploadInProgress(ctx, gomodPath) - if progressErr != nil { - return errors.E(op, progressErr) - } - if inProgress { - // err is known to be errors.KindAlreadyExists at this point, so - // this is a cache hit return. - return errors.E(op, err) - } - seenAlreadyExists++ - } else if err != nil { - // Other errors + err := s.upload(ctx, gomodPath, bytes.NewReader(mod), false) + // KindAlreadyExists means the file is uploaded (somewhere else) successfully. + if err != nil && !errors.Is(err, errors.KindAlreadyExists) { return errors.E(op, err) } + zipPath := config.PackageVersionedName(module, version, "zip") - err = s.upload(ctx, zipPath, zip, false) - if errors.Is(err, errors.KindAlreadyExists) { - seenAlreadyExists++ - } else if err != nil { + err = s.upload(ctx, zipPath, zip, true) + if err != nil && !errors.Is(err, errors.KindAlreadyExists) { return errors.E(op, err) } + infoPath := config.PackageVersionedName(module, version, "info") err = s.upload(ctx, infoPath, bytes.NewReader(info), false) - // Have all three returned errors.KindAlreadyExists? - if errors.Is(err, errors.KindAlreadyExists) { - if seenAlreadyExists == 2 { - return errors.E(op, err) - } - } else if err != nil { + if err != nil && !errors.Is(err, errors.KindAlreadyExists) { return errors.E(op, err) } + return nil } -func (s *Storage) removeInProgressMetadata(ctx context.Context, gomodPath string) error { - const op errors.Op = "gcp.removeInProgressMetadata" - ctx, span := observ.StartSpan(ctx, op.String()) - defer span.End() - _, err := s.bucket.Object(gomodPath).Update(ctx, storage.ObjectAttrsToUpdate{ - Metadata: map[string]string{}, - }) - if err != nil { - return errors.E(op, err) - } - return nil -} - -func (s *Storage) checkUploadInProgress(ctx context.Context, gomodPath string) (bool, error) { - const op errors.Op = "gcp.checkUploadInProgress" - ctx, span := observ.StartSpan(ctx, op.String()) - defer span.End() - attrs, err := s.bucket.Object(gomodPath).Attrs(ctx) - if err != nil { - return false, errors.E(op, err) - } - // If we have a config-set lock threshold, i.e. we are using the GCP - // slightflight backend, use it. Otherwise, use the fallback, which - // is arguably irrelevant when not using GCP for singleflighting. - threshold := fallbackInProgressStaleThreshold - if s.staleThreshold > 0 { - threshold = s.staleThreshold - } - if attrs.Metadata != nil { - _, ok := attrs.Metadata["in_progress"] - if ok { - // In case the final call to remove the metadata fails for some reason, - // we have a threshold after which we consider this to be stale. - if time.Since(attrs.Created) > threshold { - return false, nil - } - return true, nil - } - } - return false, nil -} - -func (s *Storage) upload(ctx context.Context, path string, stream io.Reader, first bool) error { +func (s *Storage) upload(ctx context.Context, path string, stream io.Reader, checkBefore bool) error { const op errors.Op = "gcp.upload" ctx, span := observ.StartSpan(ctx, op.String()) defer span.End() cancelCtx, cancel := context.WithCancel(ctx) defer cancel() + if checkBefore { + // Check whether the file already exists before uploading. + // Note that this is not for preventing the same file from being uploaded multiple times, + // but only a small optimization to avoid unnecessary uploads for large files (in particular .zip file). + _, err := s.bucket.Object(path).Attrs(cancelCtx) + if err == nil { + // The file already exists, no need to upload it again. + return nil + } else if !errors.IsErr(err, storage.ErrObjectNotExist) { + // Not expected error, return it. + return errors.E(op, err) + } + // Otherwise, the error is ErrObjectNotExist, so we should upload the file. + } + wc := s.bucket.Object(path).If(storage.Conditions{ DoesNotExist: true, }).NewWriter(cancelCtx) - // We set this metadata only for the first of the three files uploaded, - // for use as a singleflight lock. - if first { - wc.Metadata = make(map[string]string) - wc.Metadata["in_progress"] = "true" - } - // NOTE: content type is auto detected on GCP side and ACL defaults to public // Once we support private storage buckets this may need refactoring // unless there is a way to set the default perms in the project.