mirror of
https://github.com/gomods/athens
synced 2026-02-03 11:00:32 +00:00
pkg/stash: Add GCS implementation (#1124)
* pkg/stash: Add GCS implementation * fix docs
This commit is contained in:
committed by
Aaron Schlesinger
parent
a79a836204
commit
081ec9126e
@@ -91,6 +91,11 @@ func getSingleFlight(c *config.Config, checker storage.Checker) (stash.Wrapper,
|
||||
return nil, fmt.Errorf("Redis config must be present")
|
||||
}
|
||||
return stash.WithRedisLock(c.SingleFlight.Redis.Endpoint, checker)
|
||||
case "gcp":
|
||||
if c.StorageType != "gcp" {
|
||||
return nil, fmt.Errorf("gcp SingleFlight only works with a gcp storage type and not: %v", c.StorageType)
|
||||
}
|
||||
return stash.WithGCSLock, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unrecognized single flight type: %v", c.SingleFlightType)
|
||||
}
|
||||
|
||||
+5
-1
@@ -156,9 +156,13 @@ StatsExporter = "prometheus"
|
||||
# we want to make sure only the first request gets to store the module,
|
||||
# and the second request will wait for the first one to finish so that
|
||||
# it doesn't override the storage.
|
||||
# Options are ["memory", "etcd", "redis"]
|
||||
# Options are ["memory", "etcd", "redis", "gcp"]
|
||||
# The default option is "memory" which means that only one instance of Athens
|
||||
# should be used.
|
||||
# The "gcp" single flight will assume that you have a "gcp" StorageType
|
||||
# and therefore it will use its strong-consistency features to ensure
|
||||
# that only one module is ever written even when concurrent saves happen
|
||||
# at the same time.
|
||||
# Env override: ATHENS_SINGLE_FLIGHT_TYPE
|
||||
SingleFlightType = "memory"
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ require (
|
||||
github.com/gogo/protobuf v1.2.0 // indirect
|
||||
github.com/google/go-cmp v0.2.0
|
||||
github.com/google/martian v2.1.0+incompatible // indirect
|
||||
github.com/google/uuid v1.1.1
|
||||
github.com/googleapis/gax-go v2.0.0+incompatible // indirect
|
||||
github.com/gopherjs/gopherjs v0.0.0-20180825215210-0210a2f0f73c // indirect
|
||||
github.com/gorilla/mux v1.6.2
|
||||
|
||||
@@ -66,6 +66,8 @@ github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPg
|
||||
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
|
||||
github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
|
||||
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
|
||||
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/googleapis/gax-go v2.0.0+incompatible h1:j0GKcs05QVmm7yesiZq2+9cxHkNK9YM6zKx4D2qucQU=
|
||||
github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20180825215210-0210a2f0f73c h1:16eHWuMGvCjSfgRJKqIzapE78onvvTbdi1rMkU00lZw=
|
||||
|
||||
@@ -46,6 +46,14 @@ func (e Error) Error() string {
|
||||
return e.Err.Error()
|
||||
}
|
||||
|
||||
// Is is a shorthand for checking an error against a kind.
|
||||
func Is(err error, kind int) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
return Kind(err) == kind
|
||||
}
|
||||
|
||||
// Op describes any independent function or
|
||||
// method in Athens. A series of operations
|
||||
// forms a more readable stack trace.
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
package stash
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/gomods/athens/pkg/errors"
|
||||
"github.com/gomods/athens/pkg/observ"
|
||||
)
|
||||
|
||||
// WithGCSLock returns a distributed singleflight
|
||||
// using a GCS backend. See the config.toml documentation for details.
|
||||
func WithGCSLock(s Stasher) Stasher {
|
||||
return &gcsLock{s}
|
||||
}
|
||||
|
||||
type gcsLock struct {
|
||||
stasher Stasher
|
||||
}
|
||||
|
||||
func (s *gcsLock) Stash(ctx context.Context, mod, ver string) (newVer string, err error) {
|
||||
const op errors.Op = "gcslock.Stash"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
newVer, err = s.stasher.Stash(ctx, mod, ver)
|
||||
if err != nil {
|
||||
// already been saved before, move on.
|
||||
if errors.Is(err, errors.KindAlreadyExists) {
|
||||
return ver, nil
|
||||
}
|
||||
return ver, errors.E(op, err)
|
||||
}
|
||||
return newVer, nil
|
||||
}
|
||||
@@ -0,0 +1,127 @@
|
||||
package stash
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gomods/athens/pkg/config"
|
||||
"github.com/gomods/athens/pkg/errors"
|
||||
"github.com/gomods/athens/pkg/storage"
|
||||
"github.com/gomods/athens/pkg/storage/gcp"
|
||||
"github.com/google/uuid"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// TestWithGCS requires a real GCP backend implementation
|
||||
// and it will ensure that saving to modules at the same time
|
||||
// is done synchronously so that only the first module gets saved.
|
||||
func TestWithGCS(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
|
||||
defer cancel()
|
||||
const (
|
||||
mod = "stashmod"
|
||||
ver = "v1.0.0"
|
||||
)
|
||||
strg := getStorage(t)
|
||||
strg.Delete(ctx, mod, ver)
|
||||
defer strg.Delete(ctx, mod, ver)
|
||||
|
||||
// sanity check
|
||||
_, err := strg.GoMod(ctx, mod, ver)
|
||||
if !errors.Is(err, errors.KindNotFound) {
|
||||
t.Fatalf("expected the stash bucket to return a NotFound error but got: %v", err)
|
||||
}
|
||||
|
||||
var eg errgroup.Group
|
||||
for i := 0; i < 5; i++ {
|
||||
content := uuid.New().String()
|
||||
ms := &mockGCPStasher{strg, content}
|
||||
s := WithGCSLock(ms)
|
||||
eg.Go(func() error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer cancel()
|
||||
_, err := s.Stash(ctx, "stashmod", "v1.0.0")
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
err = eg.Wait()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
info, err := strg.Info(ctx, mod, ver)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
modContent, err := strg.GoMod(ctx, mod, ver)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
zip, err := strg.Zip(ctx, mod, ver)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer zip.Close()
|
||||
zipContent, err := ioutil.ReadAll(zip)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !bytes.Equal(info, modContent) {
|
||||
t.Fatalf("expected info and go.mod to be equal but info was {%v} and content was {%v}", string(info), string(modContent))
|
||||
}
|
||||
if !bytes.Equal(info, zipContent) {
|
||||
t.Fatalf("expected info and zip to be equal but info was {%v} and content was {%v}", string(info), string(zipContent))
|
||||
}
|
||||
}
|
||||
|
||||
// mockGCPStasher is like mockStasher
|
||||
// but leverages in memory storage
|
||||
// so that redis can determine
|
||||
// whether to call the underlying stasher or not.
|
||||
type mockGCPStasher struct {
|
||||
strg storage.Backend
|
||||
content string
|
||||
}
|
||||
|
||||
func (ms *mockGCPStasher) Stash(ctx context.Context, mod, ver string) (string, error) {
|
||||
err := ms.strg.Save(
|
||||
ctx,
|
||||
mod,
|
||||
ver,
|
||||
[]byte(ms.content),
|
||||
strings.NewReader(ms.content),
|
||||
[]byte(ms.content),
|
||||
)
|
||||
return "", err
|
||||
}
|
||||
|
||||
func getStorage(t *testing.T) *gcp.Storage {
|
||||
t.Helper()
|
||||
cfg := getTestConfig()
|
||||
if cfg == nil {
|
||||
t.SkipNow()
|
||||
}
|
||||
|
||||
s, err := gcp.New(context.Background(), cfg, config.GetTimeoutDuration(30))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func getTestConfig() *config.GCPConfig {
|
||||
creds := os.Getenv("GCS_SERVICE_ACCOUNT")
|
||||
if creds == "" {
|
||||
return nil
|
||||
}
|
||||
return &config.GCPConfig{
|
||||
Bucket: "athens_drone_stash_bucket",
|
||||
JSONKey: creds,
|
||||
}
|
||||
}
|
||||
+30
-22
@@ -4,12 +4,12 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"log"
|
||||
|
||||
"cloud.google.com/go/storage"
|
||||
"github.com/gomods/athens/pkg/config"
|
||||
"github.com/gomods/athens/pkg/errors"
|
||||
"github.com/gomods/athens/pkg/observ"
|
||||
|
||||
moduploader "github.com/gomods/athens/pkg/storage/module"
|
||||
googleapi "google.golang.org/api/googleapi"
|
||||
)
|
||||
|
||||
// Save uploads the module's .mod, .zip and .info files for a given version
|
||||
@@ -23,40 +23,48 @@ func (s *Storage) Save(ctx context.Context, module, version string, mod []byte,
|
||||
const op errors.Op = "gcp.Save"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
exists, err := s.Exists(ctx, module, version)
|
||||
gomodPath := config.PackageVersionedName(module, version, "mod")
|
||||
err := s.upload(ctx, gomodPath, bytes.NewReader(mod))
|
||||
if err != nil {
|
||||
return errors.E(op, err, errors.M(module), errors.V(version))
|
||||
return errors.E(op, err)
|
||||
}
|
||||
if exists {
|
||||
return errors.E(op, "already exists", errors.M(module), errors.V(version), errors.KindAlreadyExists)
|
||||
}
|
||||
|
||||
err = moduploader.Upload(ctx, module, version, bytes.NewReader(info), bytes.NewReader(mod), zip, s.upload, s.timeout)
|
||||
zipPath := config.PackageVersionedName(module, version, "zip")
|
||||
err = s.upload(ctx, zipPath, zip)
|
||||
if err != nil {
|
||||
return errors.E(op, err, errors.M(module), errors.V(version))
|
||||
return errors.E(op, err)
|
||||
}
|
||||
infoPath := config.PackageVersionedName(module, version, "info")
|
||||
err = s.upload(ctx, infoPath, bytes.NewReader(info))
|
||||
if err != nil {
|
||||
return errors.E(op, err)
|
||||
}
|
||||
|
||||
// TODO: take out lease on the /list file and add the version to it
|
||||
//
|
||||
// Do that only after module source+metadata is uploaded
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Storage) upload(ctx context.Context, path, contentType string, stream io.Reader) error {
|
||||
func (s *Storage) upload(ctx context.Context, path string, stream io.Reader) error {
|
||||
const op errors.Op = "gcp.upload"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
wc := s.bucket.Object(path).NewWriter(ctx)
|
||||
defer func(wc io.WriteCloser) {
|
||||
if err := wc.Close(); err != nil {
|
||||
log.Printf("WARNING: failed to close storage object writer: %s", err)
|
||||
}
|
||||
}(wc)
|
||||
wc := s.bucket.Object(path).If(storage.Conditions{
|
||||
DoesNotExist: true,
|
||||
}).NewWriter(ctx)
|
||||
|
||||
// NOTE: content type is auto detected on GCP side and ACL defaults to public
|
||||
// Once we support private storage buckets this may need refactoring
|
||||
// unless there is a way to set the default perms in the project.
|
||||
if _, err := io.Copy(wc, stream); err != nil {
|
||||
wc.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
err := wc.Close()
|
||||
if err != nil {
|
||||
kind := errors.KindBadRequest
|
||||
apiErr, ok := err.(*googleapi.Error)
|
||||
if ok && apiErr.Code == 412 {
|
||||
kind = errors.KindAlreadyExists
|
||||
}
|
||||
return errors.E(op, err, kind)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user