mirror of
https://github.com/gomods/athens
synced 2026-02-03 11:00:32 +00:00
Implement storage.Backend for S3 storage (#750)
* Implement storage.Backend for S3 storage * Adapt S3 storage test after refactor, Fix skiped minio tests * Add defer to Close calls in s3.getter methods * CR fixes * Fix doc align * Tweak err checks * Fix parse test for S3 configs and cleanup error messages in storage.s3
This commit is contained in:
committed by
Marwan Sulaiman
parent
f5259a388d
commit
480d8c8e8c
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/gomods/athens/pkg/storage/mem"
|
||||
"github.com/gomods/athens/pkg/storage/minio"
|
||||
"github.com/gomods/athens/pkg/storage/mongo"
|
||||
"github.com/gomods/athens/pkg/storage/s3"
|
||||
"github.com/spf13/afero"
|
||||
)
|
||||
|
||||
@@ -50,6 +51,11 @@ func GetStorage(storageType string, storageConfig *config.StorageConfig) (storag
|
||||
return nil, errors.E(op, "Invalid CDN Storage Configuration")
|
||||
}
|
||||
return gcp.New(context.Background(), storageConfig.GCP, storageConfig.CDN)
|
||||
case "s3":
|
||||
if storageConfig.S3 == nil {
|
||||
return nil, errors.E(op, "Invalid S3 Storage Configuration")
|
||||
}
|
||||
return s3.New(storageConfig.S3, storageConfig.CDN)
|
||||
default:
|
||||
return nil, fmt.Errorf("storage type %s is unknown", storageType)
|
||||
}
|
||||
|
||||
+27
-1
@@ -71,7 +71,7 @@ EnableCSRFProtection = false
|
||||
|
||||
[Proxy]
|
||||
# StorageType sets the type of storage backend the proxy will use.
|
||||
# Possible values are memory, disk, mongo, gcp, minio
|
||||
# Possible values are memory, disk, mongo, gcp, minio, s3
|
||||
# Defaults to memory
|
||||
# Env override: ATHENS_STORAGE_TYPE
|
||||
StorageType = "memory"
|
||||
@@ -248,3 +248,29 @@ EnableCSRFProtection = false
|
||||
# Should be used for testing or development only
|
||||
# Env override: ATHENS_MONGO_INSECURE
|
||||
Insecure = false
|
||||
|
||||
[Storage.S3]
|
||||
# Access Key for S3 storage
|
||||
# Env override: AWS_REGION
|
||||
Region = "MY_AWS_REGION"
|
||||
|
||||
# Access Key for S3 storage
|
||||
# Env override: AWS_ACCESS_KEY_ID
|
||||
Key = "MY_AWS_ACCESS_KEY_ID"
|
||||
|
||||
# Secret Key for S3 storage
|
||||
# Env override: AWS_SECRET_ACCESS_KEY
|
||||
Secret = "MY_AWS_SECRET_ACCESS_KEY"
|
||||
|
||||
# Session Token for S3 storage
|
||||
# Env override: AWS_SESSION_TOKEN
|
||||
Token = ""
|
||||
|
||||
# Timeout for networks calls made to S3 in seconds
|
||||
# Defaults to Global Timeout
|
||||
Timeout = 300
|
||||
|
||||
# S3 Bucket to use for storage
|
||||
# Defaults to gomods
|
||||
# Env override: ATHENS_S3_BUCKET_NAME
|
||||
Bucket = "MY_S3_BUCKET_NAME"
|
||||
|
||||
@@ -50,6 +50,10 @@ func compareStorageConfigs(parsedStorage *StorageConfig, expStorage *StorageConf
|
||||
if !eq {
|
||||
t.Errorf("Parsed Example Storage configuration did not match expected values. Expected: %+v. Actual: %+v", expStorage.GCP, parsedStorage.GCP)
|
||||
}
|
||||
eq = cmp.Equal(parsedStorage.S3, expStorage.S3)
|
||||
if !eq {
|
||||
t.Errorf("Parsed Example Storage configuration did not match expected values. Expected: %+v. Actual: %+v", expStorage.S3, parsedStorage.S3)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnvOverrides(t *testing.T) {
|
||||
@@ -150,6 +154,16 @@ func TestStorageEnvOverrides(t *testing.T) {
|
||||
Timeout: globalTimeout,
|
||||
},
|
||||
},
|
||||
S3: &S3Config{
|
||||
Region: "s3Region",
|
||||
Key: "s3Key",
|
||||
Secret: "s3Secret",
|
||||
Token: "s3Token",
|
||||
Bucket: "s3Bucket",
|
||||
TimeoutConf: TimeoutConf{
|
||||
Timeout: globalTimeout,
|
||||
},
|
||||
},
|
||||
}
|
||||
envVars := getEnvMap(&Config{Storage: expStorage})
|
||||
envVarBackup := map[string]string{}
|
||||
@@ -186,6 +200,7 @@ func TestParseExampleConfig(t *testing.T) {
|
||||
EnableSSL: false,
|
||||
},
|
||||
Mongo: &MongoConfig{},
|
||||
S3: &S3Config{},
|
||||
},
|
||||
}
|
||||
// unset all environment variables
|
||||
@@ -250,6 +265,16 @@ func TestParseExampleConfig(t *testing.T) {
|
||||
},
|
||||
InsecureConn: false,
|
||||
},
|
||||
S3: &S3Config{
|
||||
Region: "MY_AWS_REGION",
|
||||
Key: "MY_AWS_ACCESS_KEY_ID",
|
||||
Secret: "MY_AWS_SECRET_ACCESS_KEY",
|
||||
Token: "",
|
||||
Bucket: "MY_S3_BUCKET_NAME",
|
||||
TimeoutConf: TimeoutConf{
|
||||
Timeout: globalTimeout,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
expConf := &Config{
|
||||
@@ -348,6 +373,13 @@ func getEnvMap(config *Config) map[string]string {
|
||||
envVars["ATHENS_MONGO_CERT_PATH"] = storage.Mongo.CertPath
|
||||
envVars["ATHENS_MONGO_INSECURE"] = strconv.FormatBool(storage.Mongo.InsecureConn)
|
||||
}
|
||||
if storage.S3 != nil {
|
||||
envVars["AWS_REGION"] = storage.S3.Region
|
||||
envVars["AWS_ACCESS_KEY_ID"] = storage.S3.Key
|
||||
envVars["AWS_SECRET_ACCESS_KEY"] = storage.S3.Secret
|
||||
envVars["AWS_SESSION_TOKEN"] = storage.S3.Token
|
||||
envVars["ATHENS_S3_BUCKET_NAME"] = storage.S3.Bucket
|
||||
}
|
||||
}
|
||||
return envVars
|
||||
}
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
package config
|
||||
|
||||
// S3Config specifies the properties required to use S3 as the storage backend
|
||||
type S3Config struct {
|
||||
TimeoutConf
|
||||
Region string `validate:"required" envconfig:"AWS_REGION"`
|
||||
Key string `validate:"required" envconfig:"AWS_ACCESS_KEY_ID"`
|
||||
Secret string `validate:"required" envconfig:"AWS_SECRET_ACCESS_KEY"`
|
||||
Token string `envconfig:"AWS_SESSION_TOKEN"`
|
||||
Bucket string `validate:"required" envconfig:"ATHENS_S3_BUCKET_NAME"`
|
||||
}
|
||||
@@ -9,6 +9,7 @@ type StorageConfig struct {
|
||||
GCP *GCPConfig
|
||||
Minio *MinioConfig
|
||||
Mongo *MongoConfig
|
||||
S3 *S3Config
|
||||
}
|
||||
|
||||
func setStorageTimeouts(s *StorageConfig, defaultTimeout int) {
|
||||
@@ -27,6 +28,9 @@ func setStorageTimeouts(s *StorageConfig, defaultTimeout int) {
|
||||
if s.Mongo != nil && s.Mongo.Timeout == 0 {
|
||||
s.Mongo.Timeout = defaultTimeout
|
||||
}
|
||||
if s.S3 != nil && s.S3.Timeout == 0 {
|
||||
s.S3.Timeout = defaultTimeout
|
||||
}
|
||||
}
|
||||
|
||||
// envconfig initializes *all* struct pointers, even if there are no corresponding defaults or env variables
|
||||
@@ -63,4 +67,10 @@ func deleteInvalidStorageConfigs(s *StorageConfig) {
|
||||
s.Mongo = nil
|
||||
}
|
||||
}
|
||||
|
||||
if s.S3 != nil {
|
||||
if err := validate.Struct(s.S3); err != nil {
|
||||
s.S3 = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,84 +0,0 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/gomods/athens/pkg/config"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
var (
|
||||
testConfigFile = filepath.Join("..", "..", "..", "config.dev.toml")
|
||||
)
|
||||
|
||||
type S3Tests struct {
|
||||
suite.Suite
|
||||
uploader *s3UploaderMock
|
||||
storage *Storage
|
||||
}
|
||||
|
||||
func Test_ActionSuite(t *testing.T) {
|
||||
uploaderMock := newUploaderMock()
|
||||
conf, err := config.GetConf(testConfigFile)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to parse config file: %s", err.Error())
|
||||
}
|
||||
if conf.Storage == nil || conf.Storage.CDN == nil {
|
||||
t.Fatalf("Invalid CDN Config provided")
|
||||
}
|
||||
storage, err := NewWithUploader("test", uploaderMock, conf.Storage.CDN)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
suite.Run(t, &S3Tests{uploader: uploaderMock, storage: storage})
|
||||
}
|
||||
|
||||
// Verify returns error if S3 state differs from expected one
|
||||
func Verify(um *s3UploaderMock, value map[string][]byte) error {
|
||||
um.lock.Lock()
|
||||
defer um.lock.Unlock()
|
||||
|
||||
expectedLength := len(value)
|
||||
actualLength := len(um.db)
|
||||
if expectedLength != actualLength {
|
||||
return fmt.Errorf("Length does not match. Expected: %d. Actual: %d", expectedLength, actualLength)
|
||||
}
|
||||
|
||||
for k, v := range value {
|
||||
actual, ok := um.db[k]
|
||||
if !ok {
|
||||
return fmt.Errorf("Missing element %s", k)
|
||||
}
|
||||
|
||||
if !sliceEqualCheck(v, actual) {
|
||||
return fmt.Errorf("Value for key %s does not match. Expected: %v, Actual: %v", k, v, actual)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func sliceEqualCheck(a, b []byte) bool {
|
||||
if a == nil && b == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
if a == nil || b == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
|
||||
for i := range a {
|
||||
if a[i] != b[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/gomods/athens/pkg/config"
|
||||
"github.com/gomods/athens/pkg/errors"
|
||||
"github.com/gomods/athens/pkg/observ"
|
||||
)
|
||||
|
||||
const (
|
||||
s3ErrorCodeNotFound = "NotFound"
|
||||
)
|
||||
|
||||
// Exists implements the (./pkg/storage).Checker interface
|
||||
// returning true if the module at version exists in storage
|
||||
func (s *Storage) Exists(ctx context.Context, module, version string) (bool, error) {
|
||||
const op errors.Op = "s3.Exists"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
|
||||
pkgName := config.PackageVersionedName(module, version, "mod")
|
||||
hoParams := &s3.HeadObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(pkgName),
|
||||
}
|
||||
|
||||
if _, err := s.s3API.HeadObjectWithContext(ctx, hoParams); err != nil {
|
||||
if err.(awserr.Error).Code() == s3ErrorCodeNotFound {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return false, errors.E(op, err, errors.M(module))
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/gomods/athens/pkg/errors"
|
||||
"github.com/gomods/athens/pkg/observ"
|
||||
modupl "github.com/gomods/athens/pkg/storage/module"
|
||||
)
|
||||
|
||||
// Delete implements the (./pkg/storage).Deleter interface and
|
||||
// removes a version of a module from storage. Returning ErrNotFound
|
||||
// if the version does not exist.
|
||||
func (s *Storage) Delete(ctx context.Context, module, version string) error {
|
||||
const op errors.Op = "s3.Delete"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
exists, err := s.Exists(ctx, module, version)
|
||||
if err != nil {
|
||||
return errors.E(op, err, errors.M(module), errors.V(version))
|
||||
}
|
||||
if !exists {
|
||||
return errors.E(op, errors.M(module), errors.V(version), errors.KindNotFound)
|
||||
}
|
||||
|
||||
return modupl.Delete(ctx, module, version, s.remove, s.s3Conf.TimeoutDuration())
|
||||
}
|
||||
|
||||
func (s *Storage) remove(ctx context.Context, path string) error {
|
||||
const op errors.Op = "s3.Delete"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
delParams := &s3.DeleteObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(path),
|
||||
}
|
||||
|
||||
if _, err := s.s3API.DeleteObjectWithContext(ctx, delParams); err != nil {
|
||||
return errors.E(op, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
/*
|
||||
Package s3 provides a storage driver to upload module files to
|
||||
amazon s3 storage bucket.
|
||||
|
||||
Configuration
|
||||
|
||||
Environment variables:
|
||||
AWS_REGION // region for this storage, e.g 'us-west-2'
|
||||
AWS_ACCESS_KEY_ID
|
||||
AWS_SECRET_ACCESS_KEY
|
||||
AWS_SESSION_TOKEN // [optional]
|
||||
ATHENS_S3_BUCKET_NAME
|
||||
|
||||
For information how to get your keyId and access key turn to official aws docs: https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/setting-up.html
|
||||
|
||||
Example:
|
||||
|
||||
Bash:
|
||||
export AWS_REGION="us-west-2"
|
||||
Fish:
|
||||
set -x AWS_REGION us-west-2
|
||||
|
||||
*/
|
||||
package s3
|
||||
@@ -0,0 +1,105 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/gomods/athens/pkg/config"
|
||||
"github.com/gomods/athens/pkg/errors"
|
||||
"github.com/gomods/athens/pkg/observ"
|
||||
)
|
||||
|
||||
// Info implements the (./pkg/storage).Getter interface
|
||||
func (s *Storage) Info(ctx context.Context, module, version string) ([]byte, error) {
|
||||
const op errors.Op = "s3.Info"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
exists, err := s.Exists(ctx, module, version)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err, errors.M(module), errors.V(version))
|
||||
}
|
||||
if !exists {
|
||||
return nil, errors.E(op, errors.M(module), errors.V(version), errors.KindNotFound)
|
||||
}
|
||||
|
||||
infoReader, err := s.open(ctx, config.PackageVersionedName(module, version, "info"))
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err, errors.M(module), errors.V(version))
|
||||
}
|
||||
defer infoReader.Close()
|
||||
|
||||
infoBytes, err := ioutil.ReadAll(infoReader)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err, errors.M(module), errors.V(version))
|
||||
}
|
||||
return infoBytes, nil
|
||||
}
|
||||
|
||||
// GoMod implements the (./pkg/storage).Getter interface
|
||||
func (s *Storage) GoMod(ctx context.Context, module, version string) ([]byte, error) {
|
||||
const op errors.Op = "s3.GoMod"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
exists, err := s.Exists(ctx, module, version)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err, errors.M(module), errors.V(version))
|
||||
}
|
||||
if !exists {
|
||||
return nil, errors.E(op, errors.M(module), errors.V(version), errors.KindNotFound)
|
||||
}
|
||||
|
||||
modReader, err := s.open(ctx, config.PackageVersionedName(module, version, "mod"))
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err, errors.M(module), errors.V(version))
|
||||
}
|
||||
defer modReader.Close()
|
||||
|
||||
modBytes, err := ioutil.ReadAll(modReader)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, fmt.Errorf("could not get new reader for mod file: %s", err), errors.M(module), errors.V(version))
|
||||
}
|
||||
|
||||
return modBytes, nil
|
||||
}
|
||||
|
||||
// Zip implements the (./pkg/storage).Getter interface
|
||||
func (s *Storage) Zip(ctx context.Context, module, version string) (io.ReadCloser, error) {
|
||||
const op errors.Op = "s3.Zip"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
exists, err := s.Exists(ctx, module, version)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err, errors.M(module), errors.V(version))
|
||||
}
|
||||
if !exists {
|
||||
return nil, errors.E(op, errors.M(module), errors.V(version), errors.KindNotFound)
|
||||
}
|
||||
|
||||
zipReader, err := s.open(ctx, config.PackageVersionedName(module, version, "zip"))
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err, errors.M(module), errors.V(version))
|
||||
}
|
||||
|
||||
return zipReader, nil
|
||||
}
|
||||
|
||||
func (s *Storage) open(ctx context.Context, path string) (io.ReadCloser, error) {
|
||||
const op errors.Op = "s3.open"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
getParams := &s3.GetObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(path),
|
||||
}
|
||||
|
||||
goo, err := s.s3API.GetObjectWithContext(ctx, getParams)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
|
||||
return goo.Body, nil
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/gomods/athens/pkg/errors"
|
||||
"github.com/gomods/athens/pkg/observ"
|
||||
)
|
||||
|
||||
// List implements the (./pkg/storage).Lister interface
|
||||
// It returns a list of versions, if any, for a given module
|
||||
func (s *Storage) List(ctx context.Context, module string) ([]string, error) {
|
||||
const op errors.Op = "s3.List"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
|
||||
lsParams := &s3.ListObjectsInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Prefix: aws.String(module),
|
||||
}
|
||||
|
||||
loo, err := s.s3API.ListObjectsWithContext(ctx, lsParams)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err, errors.M(module))
|
||||
}
|
||||
|
||||
return extractVersions(loo.Contents), nil
|
||||
}
|
||||
|
||||
func extractVersions(objects []*s3.Object) []string {
|
||||
var versions []string
|
||||
|
||||
for _, o := range objects {
|
||||
if strings.HasSuffix(*o.Key, ".info") {
|
||||
segments := strings.Split(*o.Key, "/")
|
||||
|
||||
if len(segments) <= 0 {
|
||||
continue
|
||||
}
|
||||
// version should be last segment w/ .info suffix
|
||||
last := segments[len(segments)-1]
|
||||
version := strings.TrimSuffix(last, ".info")
|
||||
versions = append(versions, version)
|
||||
}
|
||||
}
|
||||
return versions
|
||||
}
|
||||
@@ -1,42 +0,0 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
)
|
||||
|
||||
type s3UploaderMock struct {
|
||||
db map[string][]byte
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func newUploaderMock() *s3UploaderMock {
|
||||
u := &s3UploaderMock{}
|
||||
u.db = make(map[string][]byte)
|
||||
return u
|
||||
}
|
||||
|
||||
func (u *s3UploaderMock) Upload(input *s3manager.UploadInput, opts ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) {
|
||||
content, err := ioutil.ReadAll(input.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u.lock.Lock()
|
||||
u.db[*input.Key] = content
|
||||
u.lock.Unlock()
|
||||
return &s3manager.UploadOutput{}, nil
|
||||
}
|
||||
|
||||
func (u *s3UploaderMock) UploadWithContext(ctx aws.Context, input *s3manager.UploadInput, opts ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) {
|
||||
content, err := ioutil.ReadAll(input.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u.lock.Lock()
|
||||
u.db[*input.Key] = content
|
||||
u.lock.Unlock()
|
||||
return &s3manager.UploadOutput{}, nil
|
||||
}
|
||||
@@ -0,0 +1,79 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3iface"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager/s3manageriface"
|
||||
"github.com/gomods/athens/pkg/config"
|
||||
"github.com/gomods/athens/pkg/errors"
|
||||
)
|
||||
|
||||
// Storage implements (./pkg/storage).Backend and
|
||||
// also provides a function to fetch the location of a module
|
||||
// Storage uses amazon aws go SDK which expects these env variables
|
||||
// - AWS_REGION - region for this storage, e.g 'us-west-2'
|
||||
// - AWS_ACCESS_KEY_ID -
|
||||
// - AWS_SECRET_ACCESS_KEY -
|
||||
// - AWS_SESSION_TOKEN - [optional]
|
||||
// For information how to get your keyId and access key turn to official aws docs: https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/setting-up.html
|
||||
type Storage struct {
|
||||
bucket string
|
||||
baseURI *url.URL
|
||||
uploader s3manageriface.UploaderAPI
|
||||
s3API s3iface.S3API
|
||||
s3Conf *config.S3Config
|
||||
cdnConf *config.CDNConfig
|
||||
}
|
||||
|
||||
// New creates a new AWS S3 CDN saver
|
||||
func New(s3Conf *config.S3Config, cdnConf *config.CDNConfig, options ...func(*aws.Config)) (*Storage, error) {
|
||||
const op errors.Op = "s3.New"
|
||||
u, err := url.Parse(fmt.Sprintf("https://%s.s3.amazonaws.com", s3Conf.Bucket))
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
|
||||
awsConfig := &aws.Config{
|
||||
Credentials: credentials.NewStaticCredentials(s3Conf.Key, s3Conf.Secret, s3Conf.Token),
|
||||
Region: aws.String(s3Conf.Region),
|
||||
}
|
||||
|
||||
for _, o := range options {
|
||||
o(awsConfig)
|
||||
}
|
||||
|
||||
// create a session
|
||||
sess, err := session.NewSession(awsConfig)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
uploader := s3manager.NewUploader(sess)
|
||||
|
||||
return &Storage{
|
||||
bucket: s3Conf.Bucket,
|
||||
uploader: uploader,
|
||||
s3API: uploader.S3,
|
||||
baseURI: u,
|
||||
cdnConf: cdnConf,
|
||||
s3Conf: s3Conf,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// BaseURL returns the base URL that stores all modules. It can be used
|
||||
// in the "meta" tag redirect response to vgo.
|
||||
//
|
||||
// For example:
|
||||
//
|
||||
// <meta name="go-import" content="gomods.com/athens mod BaseURL()">
|
||||
func (s Storage) BaseURL() *url.URL {
|
||||
if s.cdnConf == nil {
|
||||
return s.baseURI
|
||||
}
|
||||
return s.cdnConf.CDNEndpointWithDefault(s.baseURI)
|
||||
}
|
||||
@@ -0,0 +1,69 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/gomods/athens/pkg/config"
|
||||
"github.com/gomods/athens/pkg/storage/compliance"
|
||||
)
|
||||
|
||||
func TestBackend(t *testing.T) {
|
||||
backend := getStorage(t)
|
||||
compliance.RunTests(t, backend, backend.clear)
|
||||
}
|
||||
|
||||
func BenchmarkBackend(b *testing.B) {
|
||||
backend := getStorage(b)
|
||||
compliance.RunBenchmarks(b, backend, backend.clear)
|
||||
}
|
||||
|
||||
func (s *Storage) clear() error {
|
||||
ctx := context.TODO()
|
||||
objects, err := s.s3API.ListObjectsWithContext(ctx, &s3.ListObjectsInput{Bucket: aws.String(s.bucket)})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, o := range objects.Contents {
|
||||
delParams := &s3.DeleteObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: o.Key,
|
||||
}
|
||||
|
||||
_, err := s.s3API.DeleteObjectWithContext(ctx, delParams)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getStorage(t testing.TB) *Storage {
|
||||
options := func(conf *aws.Config) {
|
||||
conf.Endpoint = aws.String("127.0.0.1:9001")
|
||||
conf.DisableSSL = aws.Bool(true)
|
||||
conf.S3ForcePathStyle = aws.Bool(true)
|
||||
}
|
||||
backend, err := New(
|
||||
&config.S3Config{
|
||||
Key: "minio",
|
||||
Secret: "minio123",
|
||||
Bucket: "gomods",
|
||||
Region: "us-west-1",
|
||||
TimeoutConf: config.TimeoutConf{
|
||||
Timeout: 300,
|
||||
},
|
||||
},
|
||||
nil,
|
||||
options,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return backend
|
||||
}
|
||||
@@ -1,35 +0,0 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/gomods/athens/pkg/config"
|
||||
)
|
||||
|
||||
var (
|
||||
mod = []byte("123")
|
||||
zip = []byte("456")
|
||||
info = []byte("789")
|
||||
)
|
||||
|
||||
func (d *S3Tests) TestSave() {
|
||||
r := d.Require()
|
||||
|
||||
versions := []string{"v1.0.0", "v1.1.0", "v1.2.0"}
|
||||
expectedValues := make(map[string][]byte)
|
||||
for i, version := range versions {
|
||||
module := fmt.Sprintf("module-%d", i)
|
||||
vmod := append(mod, []byte(version)...)
|
||||
vinfo := append(info, []byte(version)...)
|
||||
vzip := append(zip, []byte(version)...)
|
||||
|
||||
r.NoError(d.storage.Save(context.Background(), module, version, vmod, bytes.NewReader(vzip), vinfo))
|
||||
expectedValues[config.PackageVersionedName(module, version, "info")] = vinfo
|
||||
expectedValues[config.PackageVersionedName(module, version, "mod")] = vmod
|
||||
expectedValues[config.PackageVersionedName(module, version, "zip")] = vzip
|
||||
}
|
||||
|
||||
r.NoError(Verify(d.uploader, expectedValues))
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
"github.com/gomods/athens/pkg/errors"
|
||||
"github.com/gomods/athens/pkg/observ"
|
||||
moduploader "github.com/gomods/athens/pkg/storage/module"
|
||||
)
|
||||
|
||||
// Save implements the (github.com/gomods/athens/pkg/storage).Saver interface.
|
||||
func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error {
|
||||
const op errors.Op = "s3.Save"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
err := moduploader.Upload(ctx, module, version, bytes.NewReader(info), bytes.NewReader(mod), zip, s.upload, s.s3Conf.TimeoutDuration())
|
||||
// TODO: take out lease on the /list file and add the version to it
|
||||
//
|
||||
// Do that only after module source+metadata is uploaded
|
||||
if err != nil {
|
||||
return errors.E(op, err, errors.M(module), errors.V(version))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Storage) upload(ctx context.Context, path, contentType string, stream io.Reader) error {
|
||||
const op errors.Op = "s3.upload"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
upParams := &s3manager.UploadInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(path),
|
||||
Body: stream,
|
||||
ContentType: aws.String(contentType),
|
||||
}
|
||||
|
||||
if _, err := s.uploader.UploadWithContext(ctx, upParams); err != nil {
|
||||
return errors.E(op, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1,113 +0,0 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager/s3manageriface"
|
||||
"github.com/gomods/athens/pkg/config"
|
||||
"github.com/gomods/athens/pkg/errors"
|
||||
"github.com/gomods/athens/pkg/observ"
|
||||
moduploader "github.com/gomods/athens/pkg/storage/module"
|
||||
)
|
||||
|
||||
// Storage implements (github.com/gomods/athens/pkg/storage).Saver and
|
||||
// also provides a function to fetch the location of a module
|
||||
// Storage uses amazon aws go SDK which expects these env variables
|
||||
// - AWS_REGION - region for this storage, e.g 'us-west-2'
|
||||
// - AWS_ACCESS_KEY_ID -
|
||||
// - AWS_SECRET_ACCESS_KEY -
|
||||
// - AWS_SESSION_TOKEN - [optional]
|
||||
// For information how to get your keyId and access key turn to official aws docs: https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/setting-up.html
|
||||
type Storage struct {
|
||||
bucket string
|
||||
baseURI *url.URL
|
||||
uploader s3manageriface.UploaderAPI
|
||||
cdnConf *config.CDNConfig
|
||||
}
|
||||
|
||||
// New creates a new AWS S3 CDN saver
|
||||
func New(bucketName string, cdnConf *config.CDNConfig) (*Storage, error) {
|
||||
const op errors.Op = "s3.New"
|
||||
u, err := url.Parse(fmt.Sprintf("http://%s.s3.amazonaws.com", bucketName))
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
|
||||
// create a session
|
||||
sess, err := session.NewSession()
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
uploader := s3manager.NewUploader(sess)
|
||||
|
||||
return &Storage{
|
||||
bucket: bucketName,
|
||||
uploader: uploader,
|
||||
baseURI: u,
|
||||
cdnConf: cdnConf,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewWithUploader creates a new AWS S3 CDN saver with provided uploader
|
||||
func NewWithUploader(bucketName string, uploader s3manageriface.UploaderAPI, cdnConf *config.CDNConfig) (*Storage, error) {
|
||||
const op errors.Op = "s3.NewWithUploader"
|
||||
u, err := url.Parse(fmt.Sprintf("http://%s.s3.amazonaws.com", bucketName))
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
|
||||
return &Storage{
|
||||
bucket: bucketName,
|
||||
uploader: uploader,
|
||||
baseURI: u,
|
||||
cdnConf: cdnConf,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// BaseURL returns the base URL that stores all modules. It can be used
|
||||
// in the "meta" tag redirect response to vgo.
|
||||
//
|
||||
// For example:
|
||||
//
|
||||
// <meta name="go-import" content="gomods.com/athens mod BaseURL()">
|
||||
func (s Storage) BaseURL() *url.URL {
|
||||
return s.cdnConf.CDNEndpointWithDefault(s.baseURI)
|
||||
}
|
||||
|
||||
// Save implements the (github.com/gomods/athens/pkg/storage).Saver interface.
|
||||
func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error {
|
||||
const op errors.Op = "s3.Save"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
err := moduploader.Upload(ctx, module, version, bytes.NewReader(info), bytes.NewReader(mod), zip, s.upload, s.cdnConf.TimeoutDuration())
|
||||
// TODO: take out lease on the /list file and add the version to it
|
||||
//
|
||||
// Do that only after module source+metadata is uploaded
|
||||
if err != nil {
|
||||
return errors.E(op, err, errors.M(module), errors.V(version))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Storage) upload(ctx context.Context, path, contentType string, stream io.Reader) error {
|
||||
const op errors.Op = "s3.upload"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
upParams := &s3manager.UploadInput{
|
||||
Bucket: &s.bucket,
|
||||
Key: &path,
|
||||
Body: stream,
|
||||
ContentType: &contentType,
|
||||
}
|
||||
_, err := s.uploader.UploadWithContext(ctx, upParams)
|
||||
if err != nil {
|
||||
return errors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user