diff --git a/cmd/proxy/actions/app_proxy.go b/cmd/proxy/actions/app_proxy.go index 7aefb504..55940101 100644 --- a/cmd/proxy/actions/app_proxy.go +++ b/cmd/proxy/actions/app_proxy.go @@ -91,6 +91,11 @@ func getSingleFlight(c *config.Config, checker storage.Checker) (stash.Wrapper, return nil, fmt.Errorf("Redis config must be present") } return stash.WithRedisLock(c.SingleFlight.Redis.Endpoint, checker) + case "gcp": + if c.StorageType != "gcp" { + return nil, fmt.Errorf("gcp SingleFlight only works with a gcp storage type and not: %v", c.StorageType) + } + return stash.WithGCSLock, nil default: return nil, fmt.Errorf("unrecognized single flight type: %v", c.SingleFlightType) } diff --git a/config.dev.toml b/config.dev.toml index 06030d2b..b48663b4 100755 --- a/config.dev.toml +++ b/config.dev.toml @@ -156,9 +156,13 @@ StatsExporter = "prometheus" # we want to make sure only the first request gets to store the module, # and the second request will wait for the first one to finish so that # it doesn't override the storage. -# Options are ["memory", "etcd", "redis"] +# Options are ["memory", "etcd", "redis", "gcp"] # The default option is "memory" which means that only one instance of Athens # should be used. +# The "gcp" single flight will assume that you have a "gcp" StorageType +# and therefore it will use its strong-consistency features to ensure +# that only one module is ever written even when concurrent saves happen +# at the same time. # Env override: ATHENS_SINGLE_FLIGHT_TYPE SingleFlightType = "memory" diff --git a/go.mod b/go.mod index a58e07f0..9f41fd97 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/gogo/protobuf v1.2.0 // indirect github.com/google/go-cmp v0.2.0 github.com/google/martian v2.1.0+incompatible // indirect + github.com/google/uuid v1.1.1 github.com/googleapis/gax-go v2.0.0+incompatible // indirect github.com/gopherjs/gopherjs v0.0.0-20180825215210-0210a2f0f73c // indirect github.com/gorilla/mux v1.6.2 diff --git a/go.sum b/go.sum index 91fe3e12..46f709c0 100644 --- a/go.sum +++ b/go.sum @@ -66,6 +66,8 @@ github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPg github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go v2.0.0+incompatible h1:j0GKcs05QVmm7yesiZq2+9cxHkNK9YM6zKx4D2qucQU= github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= github.com/gopherjs/gopherjs v0.0.0-20180825215210-0210a2f0f73c h1:16eHWuMGvCjSfgRJKqIzapE78onvvTbdi1rMkU00lZw= diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 8feb7e8b..f47e2e5a 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -46,6 +46,14 @@ func (e Error) Error() string { return e.Err.Error() } +// Is is a shorthand for checking an error against a kind. +func Is(err error, kind int) bool { + if err == nil { + return false + } + return Kind(err) == kind +} + // Op describes any independent function or // method in Athens. A series of operations // forms a more readable stack trace. diff --git a/pkg/stash/with_gcs.go b/pkg/stash/with_gcs.go new file mode 100644 index 00000000..3e2386e5 --- /dev/null +++ b/pkg/stash/with_gcs.go @@ -0,0 +1,33 @@ +package stash + +import ( + "context" + + "github.com/gomods/athens/pkg/errors" + "github.com/gomods/athens/pkg/observ" +) + +// WithGCSLock returns a distributed singleflight +// using a GCS backend. See the config.toml documentation for details. +func WithGCSLock(s Stasher) Stasher { + return &gcsLock{s} +} + +type gcsLock struct { + stasher Stasher +} + +func (s *gcsLock) Stash(ctx context.Context, mod, ver string) (newVer string, err error) { + const op errors.Op = "gcslock.Stash" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + newVer, err = s.stasher.Stash(ctx, mod, ver) + if err != nil { + // already been saved before, move on. + if errors.Is(err, errors.KindAlreadyExists) { + return ver, nil + } + return ver, errors.E(op, err) + } + return newVer, nil +} diff --git a/pkg/stash/with_gcs_test.go b/pkg/stash/with_gcs_test.go new file mode 100644 index 00000000..89b97da1 --- /dev/null +++ b/pkg/stash/with_gcs_test.go @@ -0,0 +1,127 @@ +package stash + +import ( + "bytes" + "context" + "io/ioutil" + "os" + "strings" + "testing" + "time" + + "github.com/gomods/athens/pkg/config" + "github.com/gomods/athens/pkg/errors" + "github.com/gomods/athens/pkg/storage" + "github.com/gomods/athens/pkg/storage/gcp" + "github.com/google/uuid" + "golang.org/x/sync/errgroup" +) + +// TestWithGCS requires a real GCP backend implementation +// and it will ensure that saving to modules at the same time +// is done synchronously so that only the first module gets saved. +func TestWithGCS(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + defer cancel() + const ( + mod = "stashmod" + ver = "v1.0.0" + ) + strg := getStorage(t) + strg.Delete(ctx, mod, ver) + defer strg.Delete(ctx, mod, ver) + + // sanity check + _, err := strg.GoMod(ctx, mod, ver) + if !errors.Is(err, errors.KindNotFound) { + t.Fatalf("expected the stash bucket to return a NotFound error but got: %v", err) + } + + var eg errgroup.Group + for i := 0; i < 5; i++ { + content := uuid.New().String() + ms := &mockGCPStasher{strg, content} + s := WithGCSLock(ms) + eg.Go(func() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + _, err := s.Stash(ctx, "stashmod", "v1.0.0") + return err + }) + } + + err = eg.Wait() + if err != nil { + t.Fatal(err) + } + info, err := strg.Info(ctx, mod, ver) + if err != nil { + t.Fatal(err) + } + modContent, err := strg.GoMod(ctx, mod, ver) + if err != nil { + t.Fatal(err) + } + zip, err := strg.Zip(ctx, mod, ver) + if err != nil { + t.Fatal(err) + } + defer zip.Close() + zipContent, err := ioutil.ReadAll(zip) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(info, modContent) { + t.Fatalf("expected info and go.mod to be equal but info was {%v} and content was {%v}", string(info), string(modContent)) + } + if !bytes.Equal(info, zipContent) { + t.Fatalf("expected info and zip to be equal but info was {%v} and content was {%v}", string(info), string(zipContent)) + } +} + +// mockGCPStasher is like mockStasher +// but leverages in memory storage +// so that redis can determine +// whether to call the underlying stasher or not. +type mockGCPStasher struct { + strg storage.Backend + content string +} + +func (ms *mockGCPStasher) Stash(ctx context.Context, mod, ver string) (string, error) { + err := ms.strg.Save( + ctx, + mod, + ver, + []byte(ms.content), + strings.NewReader(ms.content), + []byte(ms.content), + ) + return "", err +} + +func getStorage(t *testing.T) *gcp.Storage { + t.Helper() + cfg := getTestConfig() + if cfg == nil { + t.SkipNow() + } + + s, err := gcp.New(context.Background(), cfg, config.GetTimeoutDuration(30)) + if err != nil { + t.Fatal(err) + } + + return s +} + +func getTestConfig() *config.GCPConfig { + creds := os.Getenv("GCS_SERVICE_ACCOUNT") + if creds == "" { + return nil + } + return &config.GCPConfig{ + Bucket: "athens_drone_stash_bucket", + JSONKey: creds, + } +} diff --git a/pkg/storage/gcp/saver.go b/pkg/storage/gcp/saver.go index 4182cd01..d66a559a 100644 --- a/pkg/storage/gcp/saver.go +++ b/pkg/storage/gcp/saver.go @@ -4,12 +4,12 @@ import ( "bytes" "context" "io" - "log" + "cloud.google.com/go/storage" + "github.com/gomods/athens/pkg/config" "github.com/gomods/athens/pkg/errors" "github.com/gomods/athens/pkg/observ" - - moduploader "github.com/gomods/athens/pkg/storage/module" + googleapi "google.golang.org/api/googleapi" ) // Save uploads the module's .mod, .zip and .info files for a given version @@ -23,40 +23,48 @@ func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, const op errors.Op = "gcp.Save" ctx, span := observ.StartSpan(ctx, op.String()) defer span.End() - exists, err := s.Exists(ctx, module, version) + gomodPath := config.PackageVersionedName(module, version, "mod") + err := s.upload(ctx, gomodPath, bytes.NewReader(mod)) if err != nil { - return errors.E(op, err, errors.M(module), errors.V(version)) + return errors.E(op, err) } - if exists { - return errors.E(op, "already exists", errors.M(module), errors.V(version), errors.KindAlreadyExists) - } - - err = moduploader.Upload(ctx, module, version, bytes.NewReader(info), bytes.NewReader(mod), zip, s.upload, s.timeout) + zipPath := config.PackageVersionedName(module, version, "zip") + err = s.upload(ctx, zipPath, zip) if err != nil { - return errors.E(op, err, errors.M(module), errors.V(version)) + return errors.E(op, err) + } + infoPath := config.PackageVersionedName(module, version, "info") + err = s.upload(ctx, infoPath, bytes.NewReader(info)) + if err != nil { + return errors.E(op, err) } - - // TODO: take out lease on the /list file and add the version to it - // - // Do that only after module source+metadata is uploaded return nil } -func (s *Storage) upload(ctx context.Context, path, contentType string, stream io.Reader) error { +func (s *Storage) upload(ctx context.Context, path string, stream io.Reader) error { const op errors.Op = "gcp.upload" ctx, span := observ.StartSpan(ctx, op.String()) defer span.End() - wc := s.bucket.Object(path).NewWriter(ctx) - defer func(wc io.WriteCloser) { - if err := wc.Close(); err != nil { - log.Printf("WARNING: failed to close storage object writer: %s", err) - } - }(wc) + wc := s.bucket.Object(path).If(storage.Conditions{ + DoesNotExist: true, + }).NewWriter(ctx) + // NOTE: content type is auto detected on GCP side and ACL defaults to public // Once we support private storage buckets this may need refactoring // unless there is a way to set the default perms in the project. if _, err := io.Copy(wc, stream); err != nil { + wc.Close() return err } + + err := wc.Close() + if err != nil { + kind := errors.KindBadRequest + apiErr, ok := err.(*googleapi.Error) + if ok && apiErr.Code == 412 { + kind = errors.KindAlreadyExists + } + return errors.E(op, err, kind) + } return nil }