From 9974c16093b48f25ca2b2af38f995e6cc3c72337 Mon Sep 17 00:00:00 2001 From: marpio Date: Tue, 2 Apr 2019 06:53:59 +0200 Subject: [PATCH] pkg/stash: Add Azure Blob implementation (#1162) * add azblob sf stasher * use http status const * rm line * cleanup, comments * add test * fix test * skip test it no account key provided * fix stash * introduce stash timeout --- cmd/proxy/actions/app_proxy.go | 5 + config.dev.toml | 10 +- pkg/stash/with_azureblob.go | 162 +++++++++++++++++++++++++++++++ pkg/stash/with_azureblob_test.go | 95 ++++++++++++++++++ 4 files changed, 270 insertions(+), 2 deletions(-) create mode 100644 pkg/stash/with_azureblob.go create mode 100644 pkg/stash/with_azureblob_test.go diff --git a/cmd/proxy/actions/app_proxy.go b/cmd/proxy/actions/app_proxy.go index 55940101..4fa4b0a5 100644 --- a/cmd/proxy/actions/app_proxy.go +++ b/cmd/proxy/actions/app_proxy.go @@ -96,6 +96,11 @@ func getSingleFlight(c *config.Config, checker storage.Checker) (stash.Wrapper, return nil, fmt.Errorf("gcp SingleFlight only works with a gcp storage type and not: %v", c.StorageType) } return stash.WithGCSLock, nil + case "azureblob": + if c.StorageType != "azureblob" { + return nil, fmt.Errorf("azureblob SingleFlight only works with a azureblob storage type and not: %v", c.StorageType) + } + return stash.WithAzureBlobLock(c.Storage.AzureBlob, c.TimeoutDuration(), checker) default: return nil, fmt.Errorf("unrecognized single flight type: %v", c.SingleFlightType) } diff --git a/config.dev.toml b/config.dev.toml index ecc3b850..cfc5ba5a 100755 --- a/config.dev.toml +++ b/config.dev.toml @@ -59,7 +59,7 @@ FilterFile = "" Timeout = 300 # StorageType sets the type of storage backend the proxy will use. -# Possible values are memory, disk, mongo, gcp, minio, s3 +# Possible values are memory, disk, mongo, gcp, minio, s3, azureblob # Defaults to memory # Env override: ATHENS_STORAGE_TYPE StorageType = "memory" @@ -169,13 +169,19 @@ StatsExporter = "prometheus" # we want to make sure only the first request gets to store the module, # and the second request will wait for the first one to finish so that # it doesn't override the storage. -# Options are ["memory", "etcd", "redis", "gcp"] + +# Options are ["memory", "etcd", "redis", "gcp", "azureblob"] + # The default option is "memory" which means that only one instance of Athens # should be used. # The "gcp" single flight will assume that you have a "gcp" StorageType # and therefore it will use its strong-consistency features to ensure # that only one module is ever written even when concurrent saves happen # at the same time. +# The "azureblob" single flight will assume that you have a "azureblob" StorageType +# and therefore it will use its strong-consistency features to ensure +# that only one module is ever written even when concurrent saves happen +# at the same time. # Env override: ATHENS_SINGLE_FLIGHT_TYPE SingleFlightType = "memory" diff --git a/pkg/stash/with_azureblob.go b/pkg/stash/with_azureblob.go new file mode 100644 index 00000000..fd4bd0a8 --- /dev/null +++ b/pkg/stash/with_azureblob.go @@ -0,0 +1,162 @@ +package stash + +import ( + "bytes" + "context" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/gomods/athens/pkg/config" + "github.com/gomods/athens/pkg/errors" + "github.com/gomods/athens/pkg/observ" + "github.com/gomods/athens/pkg/storage" + "github.com/google/uuid" +) + +// WithAzureBlobLock returns a distributed singleflight +// using a Azure Blob Storage backend. See the config.toml documentation for details. +func WithAzureBlobLock(conf *config.AzureBlobConfig, timeout time.Duration, checker storage.Checker) (Wrapper, error) { + const op errors.Op = "stash.WithAzureBlobLock" + + accountURL, err := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", conf.AccountName)) + if err != nil { + return nil, errors.E(op, err) + } + cred, err := azblob.NewSharedKeyCredential(conf.AccountName, conf.AccountKey) + if err != nil { + return nil, errors.E(op, err) + } + pipe := azblob.NewPipeline(cred, azblob.PipelineOptions{}) + serviceURL := azblob.NewServiceURL(*accountURL, pipe) + + containerURL := serviceURL.NewContainerURL(conf.ContainerName) + + return func(s Stasher) Stasher { + return &azblobLock{containerURL, s, checker} + }, nil +} + +type azblobLock struct { + containerURL azblob.ContainerURL + stasher Stasher + checker storage.Checker +} + +type stashRes struct { + v string + err error +} + +func (s *azblobLock) Stash(ctx context.Context, mod, ver string) (newVer string, err error) { + const op errors.Op = "azblobLock.Stash" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, time.Minute*10) + defer cancel() + + leaseBlobName := "lease/" + config.FmtModVer(mod, ver) + leaseBlobURL := s.containerURL.NewBlockBlobURL(leaseBlobName) + + leaseID, err := s.acquireLease(ctx, leaseBlobURL) + if err != nil { + return ver, errors.E(op, err) + } + defer func() { + const op errors.Op = "azblobLock.Unlock" + relErr := s.releaseLease(ctx, leaseBlobURL, leaseID) + if err == nil && relErr != nil { + err = errors.E(op, relErr) + } + }() + ok, err := s.checker.Exists(ctx, mod, ver) + if err != nil { + return ver, errors.E(op, err) + } + if ok { + return ver, nil + } + sChan := make(chan stashRes) + go func() { + v, err := s.stasher.Stash(ctx, mod, ver) + sChan <- stashRes{v, err} + }() + + for { + select { + case sr := <-sChan: + if sr.err != nil { + err = errors.E(op, sr.err) + return ver, err + } + newVer = sr.v + return newVer, nil + case <-time.After(10 * time.Second): + err := s.renewLease(ctx, leaseBlobURL, leaseID) + if err != nil { + return ver, errors.E(op, err) + } + case <-ctx.Done(): + return ver, errors.E(op, ctx.Err()) + } + } +} + +func (s *azblobLock) releaseLease(ctx context.Context, blobURL azblob.BlockBlobURL, leaseID string) error { + const op errors.Op = "azblobLock.releaseLease" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + _, err := blobURL.ReleaseLease(ctx, leaseID, azblob.ModifiedAccessConditions{}) + return err +} + +func (s *azblobLock) renewLease(ctx context.Context, blobURL azblob.BlockBlobURL, leaseID string) error { + const op errors.Op = "azblobLock.renewLease" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + _, err := blobURL.RenewLease(ctx, leaseID, azblob.ModifiedAccessConditions{}) + return err +} + +func (s *azblobLock) acquireLease(ctx context.Context, blobURL azblob.BlockBlobURL) (string, error) { + const op errors.Op = "azblobLock.acquireLease" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + tctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + + // first we need to create a blob which can be then leased + _, err := blobURL.Upload(tctx, bytes.NewReader([]byte{1}), azblob.BlobHTTPHeaders{}, nil, azblob.BlobAccessConditions{}) + if err != nil { + // if the blob is already leased we will get http.StatusPreconditionFailed while writing to that blob + stgErr, ok := err.(azblob.StorageError) + if !ok || stgErr.Response().StatusCode != http.StatusPreconditionFailed { + return "", errors.E(op, err) + } + } + + leaseID, err := uuid.NewRandom() + if err != nil { + return "", errors.E(op, err) + } + for { + // acquire lease for 15 sec (it's the min value) + res, err := blobURL.AcquireLease(tctx, leaseID.String(), 15, azblob.ModifiedAccessConditions{}) + if err != nil { + // if the blob is already leased we will get http.StatusConflict - wait and try again + if stgErr, ok := err.(azblob.StorageError); ok && stgErr.Response().StatusCode == http.StatusConflict { + select { + case <-time.After(1 * time.Second): + continue + case <-tctx.Done(): + return "", tctx.Err() + } + } + return "", errors.E(op, err) + } + return res.LeaseID(), nil + } +} diff --git a/pkg/stash/with_azureblob_test.go b/pkg/stash/with_azureblob_test.go new file mode 100644 index 00000000..fabf38a1 --- /dev/null +++ b/pkg/stash/with_azureblob_test.go @@ -0,0 +1,95 @@ +package stash + +import ( + "context" + "fmt" + "os" + "strings" + "sync" + "testing" + "time" + + "github.com/gomods/athens/pkg/config" + "github.com/gomods/athens/pkg/storage" + "github.com/gomods/athens/pkg/storage/mem" + "golang.org/x/sync/errgroup" +) + +// TestWithAzureBlob requires a real AzureBlob backend implementation +// and it will ensure that saving to modules at the same time +// is done synchronously so that only the first module gets saved. +func TestWithAzureBlob(t *testing.T) { + cfg := getAzureTestConfig() + if cfg == nil { + t.SkipNow() + } + strg, err := mem.NewStorage() + if err != nil { + t.Fatal(err) + } + ms := &mockAzureBlobStasher{strg: strg} + wpr, err := WithAzureBlobLock(cfg, time.Second*10, strg) + if err != nil { + t.Fatal(err) + } + s := wpr(ms) + + var eg errgroup.Group + for i := 0; i < 5; i++ { + eg.Go(func() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + _, err := s.Stash(ctx, "mod", "ver") + return err + }) + } + + err = eg.Wait() + if err != nil { + t.Fatal(err) + } +} + +// mockAzureBlobStasher is like mockStasher +// but leverages in memory storage +// so that azure blob can determine +// whether to call the underlying stasher or not. +type mockAzureBlobStasher struct { + strg storage.Backend + mu sync.Mutex + num int +} + +func (ms *mockAzureBlobStasher) Stash(ctx context.Context, mod, ver string) (string, error) { + time.Sleep(time.Millisecond * 100) // allow for second requests to come in. + ms.mu.Lock() + defer ms.mu.Unlock() + if ms.num == 0 { + err := ms.strg.Save( + ctx, + mod, + ver, + []byte("mod file"), + strings.NewReader("zip file"), + []byte("info file"), + ) + if err != nil { + return "", err + } + ms.num++ + return "", nil + } + return "", fmt.Errorf("second time error") +} + +func getAzureTestConfig() *config.AzureBlobConfig { + key := os.Getenv("ATHENS_AZURE_ACCOUNT_KEY") + if key == "" { + return nil + } + return &config.AzureBlobConfig{ + AccountName: "athens_drone_azure_account", + AccountKey: key, + ContainerName: "athens_drone_azure_container", + } +}