pkg/stash: Add Azure Blob implementation (#1162)

* add azblob sf stasher

* use http status const

* rm line

* cleanup, comments

* add test

* fix test

* skip test it no account key provided

* fix stash

* introduce stash timeout
This commit is contained in:
marpio
2019-04-02 06:53:59 +02:00
committed by Manu Gupta
parent c31610c1ad
commit 9974c16093
4 changed files with 270 additions and 2 deletions
+5
View File
@@ -96,6 +96,11 @@ func getSingleFlight(c *config.Config, checker storage.Checker) (stash.Wrapper,
return nil, fmt.Errorf("gcp SingleFlight only works with a gcp storage type and not: %v", c.StorageType)
}
return stash.WithGCSLock, nil
case "azureblob":
if c.StorageType != "azureblob" {
return nil, fmt.Errorf("azureblob SingleFlight only works with a azureblob storage type and not: %v", c.StorageType)
}
return stash.WithAzureBlobLock(c.Storage.AzureBlob, c.TimeoutDuration(), checker)
default:
return nil, fmt.Errorf("unrecognized single flight type: %v", c.SingleFlightType)
}
+8 -2
View File
@@ -59,7 +59,7 @@ FilterFile = ""
Timeout = 300
# StorageType sets the type of storage backend the proxy will use.
# Possible values are memory, disk, mongo, gcp, minio, s3
# Possible values are memory, disk, mongo, gcp, minio, s3, azureblob
# Defaults to memory
# Env override: ATHENS_STORAGE_TYPE
StorageType = "memory"
@@ -169,13 +169,19 @@ StatsExporter = "prometheus"
# we want to make sure only the first request gets to store the module,
# and the second request will wait for the first one to finish so that
# it doesn't override the storage.
# Options are ["memory", "etcd", "redis", "gcp"]
# Options are ["memory", "etcd", "redis", "gcp", "azureblob"]
# The default option is "memory" which means that only one instance of Athens
# should be used.
# The "gcp" single flight will assume that you have a "gcp" StorageType
# and therefore it will use its strong-consistency features to ensure
# that only one module is ever written even when concurrent saves happen
# at the same time.
# The "azureblob" single flight will assume that you have a "azureblob" StorageType
# and therefore it will use its strong-consistency features to ensure
# that only one module is ever written even when concurrent saves happen
# at the same time.
# Env override: ATHENS_SINGLE_FLIGHT_TYPE
SingleFlightType = "memory"
+162
View File
@@ -0,0 +1,162 @@
package stash
import (
"bytes"
"context"
"fmt"
"net/http"
"net/url"
"time"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/gomods/athens/pkg/config"
"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/observ"
"github.com/gomods/athens/pkg/storage"
"github.com/google/uuid"
)
// WithAzureBlobLock returns a distributed singleflight
// using a Azure Blob Storage backend. See the config.toml documentation for details.
func WithAzureBlobLock(conf *config.AzureBlobConfig, timeout time.Duration, checker storage.Checker) (Wrapper, error) {
const op errors.Op = "stash.WithAzureBlobLock"
accountURL, err := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", conf.AccountName))
if err != nil {
return nil, errors.E(op, err)
}
cred, err := azblob.NewSharedKeyCredential(conf.AccountName, conf.AccountKey)
if err != nil {
return nil, errors.E(op, err)
}
pipe := azblob.NewPipeline(cred, azblob.PipelineOptions{})
serviceURL := azblob.NewServiceURL(*accountURL, pipe)
containerURL := serviceURL.NewContainerURL(conf.ContainerName)
return func(s Stasher) Stasher {
return &azblobLock{containerURL, s, checker}
}, nil
}
type azblobLock struct {
containerURL azblob.ContainerURL
stasher Stasher
checker storage.Checker
}
type stashRes struct {
v string
err error
}
func (s *azblobLock) Stash(ctx context.Context, mod, ver string) (newVer string, err error) {
const op errors.Op = "azblobLock.Stash"
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
ctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()
leaseBlobName := "lease/" + config.FmtModVer(mod, ver)
leaseBlobURL := s.containerURL.NewBlockBlobURL(leaseBlobName)
leaseID, err := s.acquireLease(ctx, leaseBlobURL)
if err != nil {
return ver, errors.E(op, err)
}
defer func() {
const op errors.Op = "azblobLock.Unlock"
relErr := s.releaseLease(ctx, leaseBlobURL, leaseID)
if err == nil && relErr != nil {
err = errors.E(op, relErr)
}
}()
ok, err := s.checker.Exists(ctx, mod, ver)
if err != nil {
return ver, errors.E(op, err)
}
if ok {
return ver, nil
}
sChan := make(chan stashRes)
go func() {
v, err := s.stasher.Stash(ctx, mod, ver)
sChan <- stashRes{v, err}
}()
for {
select {
case sr := <-sChan:
if sr.err != nil {
err = errors.E(op, sr.err)
return ver, err
}
newVer = sr.v
return newVer, nil
case <-time.After(10 * time.Second):
err := s.renewLease(ctx, leaseBlobURL, leaseID)
if err != nil {
return ver, errors.E(op, err)
}
case <-ctx.Done():
return ver, errors.E(op, ctx.Err())
}
}
}
func (s *azblobLock) releaseLease(ctx context.Context, blobURL azblob.BlockBlobURL, leaseID string) error {
const op errors.Op = "azblobLock.releaseLease"
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
_, err := blobURL.ReleaseLease(ctx, leaseID, azblob.ModifiedAccessConditions{})
return err
}
func (s *azblobLock) renewLease(ctx context.Context, blobURL azblob.BlockBlobURL, leaseID string) error {
const op errors.Op = "azblobLock.renewLease"
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
_, err := blobURL.RenewLease(ctx, leaseID, azblob.ModifiedAccessConditions{})
return err
}
func (s *azblobLock) acquireLease(ctx context.Context, blobURL azblob.BlockBlobURL) (string, error) {
const op errors.Op = "azblobLock.acquireLease"
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
tctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
// first we need to create a blob which can be then leased
_, err := blobURL.Upload(tctx, bytes.NewReader([]byte{1}), azblob.BlobHTTPHeaders{}, nil, azblob.BlobAccessConditions{})
if err != nil {
// if the blob is already leased we will get http.StatusPreconditionFailed while writing to that blob
stgErr, ok := err.(azblob.StorageError)
if !ok || stgErr.Response().StatusCode != http.StatusPreconditionFailed {
return "", errors.E(op, err)
}
}
leaseID, err := uuid.NewRandom()
if err != nil {
return "", errors.E(op, err)
}
for {
// acquire lease for 15 sec (it's the min value)
res, err := blobURL.AcquireLease(tctx, leaseID.String(), 15, azblob.ModifiedAccessConditions{})
if err != nil {
// if the blob is already leased we will get http.StatusConflict - wait and try again
if stgErr, ok := err.(azblob.StorageError); ok && stgErr.Response().StatusCode == http.StatusConflict {
select {
case <-time.After(1 * time.Second):
continue
case <-tctx.Done():
return "", tctx.Err()
}
}
return "", errors.E(op, err)
}
return res.LeaseID(), nil
}
}
+95
View File
@@ -0,0 +1,95 @@
package stash
import (
"context"
"fmt"
"os"
"strings"
"sync"
"testing"
"time"
"github.com/gomods/athens/pkg/config"
"github.com/gomods/athens/pkg/storage"
"github.com/gomods/athens/pkg/storage/mem"
"golang.org/x/sync/errgroup"
)
// TestWithAzureBlob requires a real AzureBlob backend implementation
// and it will ensure that saving to modules at the same time
// is done synchronously so that only the first module gets saved.
func TestWithAzureBlob(t *testing.T) {
cfg := getAzureTestConfig()
if cfg == nil {
t.SkipNow()
}
strg, err := mem.NewStorage()
if err != nil {
t.Fatal(err)
}
ms := &mockAzureBlobStasher{strg: strg}
wpr, err := WithAzureBlobLock(cfg, time.Second*10, strg)
if err != nil {
t.Fatal(err)
}
s := wpr(ms)
var eg errgroup.Group
for i := 0; i < 5; i++ {
eg.Go(func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
_, err := s.Stash(ctx, "mod", "ver")
return err
})
}
err = eg.Wait()
if err != nil {
t.Fatal(err)
}
}
// mockAzureBlobStasher is like mockStasher
// but leverages in memory storage
// so that azure blob can determine
// whether to call the underlying stasher or not.
type mockAzureBlobStasher struct {
strg storage.Backend
mu sync.Mutex
num int
}
func (ms *mockAzureBlobStasher) Stash(ctx context.Context, mod, ver string) (string, error) {
time.Sleep(time.Millisecond * 100) // allow for second requests to come in.
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.num == 0 {
err := ms.strg.Save(
ctx,
mod,
ver,
[]byte("mod file"),
strings.NewReader("zip file"),
[]byte("info file"),
)
if err != nil {
return "", err
}
ms.num++
return "", nil
}
return "", fmt.Errorf("second time error")
}
func getAzureTestConfig() *config.AzureBlobConfig {
key := os.Getenv("ATHENS_AZURE_ACCOUNT_KEY")
if key == "" {
return nil
}
return &config.AzureBlobConfig{
AccountName: "athens_drone_azure_account",
AccountKey: key,
ContainerName: "athens_drone_azure_container",
}
}