diff --git a/.travis.yml b/.travis.yml index 5948d7e1..09f8060d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,7 +15,7 @@ env: - ATHENS_MONGO_STORAGE_URL=mongodb://127.0.0.1:27017 - CODE_COV=1 script: - - test -z $(gofmt -s -l $GO_FILES) # Fail if a .go file hasn't been formatted with gofmt + - test -z $(gofmt -s -l $GO_FILES | tee /dev/stderr) # Fail if a .go file hasn't been formatted with gofmt - golint -set_exit_status $(go list ./...) # Linter - go test -race -coverprofile cover.out -covermode atomic ./... # Run all the tests with the race detector and code coverage enabled before_script: diff --git a/cmd/olympus/actions/merge_db.go b/cmd/olympus/actions/merge_db.go index c330f65f..873f54dc 100644 --- a/cmd/olympus/actions/merge_db.go +++ b/cmd/olympus/actions/merge_db.go @@ -1,14 +1,14 @@ package actions import ( + "context" "io/ioutil" "log" "time" - "github.com/gomods/athens/pkg/storage" - "github.com/gomods/athens/pkg/cdn" "github.com/gomods/athens/pkg/eventlog" + "github.com/gomods/athens/pkg/storage" ) // mergeDB merges diff into the module database. @@ -22,7 +22,7 @@ import ( // - Delete operation adds tombstone to module metadata k/v store // // Both could be fixed by putting each 'for' loop into a (global) critical section -func mergeDB(originURL string, diff dbDiff, eLog eventlog.Eventlog, storage storage.Backend) error { +func mergeDB(ctx context.Context, originURL string, diff dbDiff, eLog eventlog.Eventlog, storage storage.Backend) error { for _, added := range diff.Added { if _, err := eLog.ReadSingle(added.Module, added.Version); err != nil { // the module/version already exists, is deprecated, or is @@ -46,7 +46,7 @@ func mergeDB(originURL string, diff dbDiff, eLog eventlog.Eventlog, storage stor continue } - if err := storage.Save(added.Module, added.Version, data.Mod, zipBytes, data.Info); err != nil { + if err := storage.Save(ctx, added.Module, added.Version, data.Mod, zipBytes, data.Info); err != nil { log.Printf("error saving new module %s/%s to CDN (%s)", added.Module, added.Version, err) continue } diff --git a/cmd/olympus/actions/package_downloader.go b/cmd/olympus/actions/package_downloader.go index fa3053c0..f68e480b 100644 --- a/cmd/olympus/actions/package_downloader.go +++ b/cmd/olympus/actions/package_downloader.go @@ -1,6 +1,7 @@ package actions import ( + "context" "errors" "io/ioutil" "path/filepath" @@ -51,7 +52,7 @@ func GetPackageDownloaderJob(s storage.Backend, e eventlog.Eventlog, w worker.Wo } // save it - if err := s.Save(module, version, modBytes, zipBytes, infoBytes); err != nil { + if err := s.Save(context.Background(), module, version, modBytes, zipBytes, infoBytes); err != nil { return err } diff --git a/cmd/proxy/actions/cache_miss_fetcher.go b/cmd/proxy/actions/cache_miss_fetcher.go index 9fce6303..9746383b 100644 --- a/cmd/proxy/actions/cache_miss_fetcher.go +++ b/cmd/proxy/actions/cache_miss_fetcher.go @@ -1,6 +1,7 @@ package actions import ( + "context" "errors" "fmt" "io/ioutil" @@ -18,7 +19,7 @@ const ( OlympusGlobalEndpointOverrideKey = "OLYMPUS_GLOBAL_ENDPOINT" ) -// GetProcessCacheMissJob porcesses queue of cache misses and downloads sources from active Olympus +// GetProcessCacheMissJob processes queue of cache misses and downloads sources from active Olympus func GetProcessCacheMissJob(s storage.Backend, w worker.Worker) worker.Handler { return func(args worker.Args) (err error) { module, version, err := parseArgs(args) @@ -43,7 +44,7 @@ func GetProcessCacheMissJob(s storage.Backend, w worker.Worker) worker.Handler { return err } - if err = s.Save(module, version, v.Mod, zip, v.Info); err != nil { + if err = s.Save(context.Background(), module, version, v.Mod, zip, v.Info); err != nil { process(module, version, args, w) } diff --git a/cmd/proxy/actions/fetch.go b/cmd/proxy/actions/fetch.go index 77117871..2d7871e6 100644 --- a/cmd/proxy/actions/fetch.go +++ b/cmd/proxy/actions/fetch.go @@ -61,7 +61,7 @@ func fetchHandler(store storage.Saver) func(c buffalo.Context) error { return fmt.Errorf("coudln't find .info file (%s)", err) } - saveErr := store.Save(moduleName, version, modBytes, zipBytes, infoBytes) + saveErr := store.Save(c, moduleName, version, modBytes, zipBytes, infoBytes) if storage.IsVersionAlreadyExistsErr(saveErr) { return c.Error(http.StatusConflict, saveErr) } else if err != nil { diff --git a/cmd/proxy/actions/upload.go b/cmd/proxy/actions/upload.go index 1b0401e2..b57577bd 100644 --- a/cmd/proxy/actions/upload.go +++ b/cmd/proxy/actions/upload.go @@ -24,7 +24,7 @@ func uploadHandler(store storage.Saver) func(c buffalo.Context) error { if c.Bind(payload); err != nil { return errors.WithStack(err) } - saveErr := store.Save(mod, version, payload.Module, payload.Zip, payload.Info) + saveErr := store.Save(c, mod, version, payload.Module, payload.Zip, payload.Info) if storage.IsVersionAlreadyExistsErr(saveErr) { return c.Error(http.StatusConflict, saveErr) } else if err != nil { diff --git a/pkg/storage/azurecdn/storage.go b/pkg/storage/azurecdn/storage.go index b895917b..862ba718 100644 --- a/pkg/storage/azurecdn/storage.go +++ b/pkg/storage/azurecdn/storage.go @@ -3,7 +3,9 @@ package azurecdn import ( "bytes" "context" + "errors" "fmt" + "io" "net/url" "github.com/Azure/azure-storage-blob-go/2017-07-29/azblob" @@ -37,9 +39,7 @@ func (s Storage) BaseURL() *url.URL { } // Save implements the (github.com/gomods/athens/pkg/storage).Saver interface. -func (s *Storage) Save(module, version string, mod, zip, info []byte) error { - ctx := context.Background() - +func (s *Storage) Save(ctx context.Context, module, version string, mod, zip, info []byte) error { pipe := azblob.NewPipeline(s.cred, azblob.PipelineOptions{}) serviceURL := azblob.NewServiceURL(*s.accountURL, pipe) // rules on container names: @@ -59,18 +59,37 @@ func (s *Storage) Save(module, version string, mod, zip, info []byte) error { } emptyMeta := map[string]string{} emptyBlobAccessCond := azblob.BlobAccessConditions{} - // TODO: do these in parallel - if _, err := infoBlobURL.Upload(ctx, bytes.NewReader(info), httpHeaders("application/json"), emptyMeta, emptyBlobAccessCond); err != nil { - // TODO: log - return err + + const numUpload = 3 + uploadErrs := make(chan error, numUpload) + + upload := func(url azblob.BlockBlobURL, content io.ReadSeeker, contentType string) { + _, err := url.Upload(ctx, content, httpHeaders(contentType), emptyMeta, emptyBlobAccessCond) + uploadErrs <- err } - if _, err := modBlobURL.Upload(ctx, bytes.NewReader(info), httpHeaders("text/plain"), emptyMeta, emptyBlobAccessCond); err != nil { - // TODO: log - return err + + go upload(infoBlobURL, bytes.NewReader(info), "application/json") + go upload(modBlobURL, bytes.NewReader(mod), "text/plain") + go upload(zipBlobURL, bytes.NewReader(zip), "application/octet-stream") + + encountered := make([]error, 0, numUpload) + for i := 0; i < numUpload; i++ { + select { + case err := <-uploadErrs: + if err != nil { + encountered = append(encountered, err) + } + case <-ctx.Done(): + return ctx.Err() + } } - if _, err := zipBlobURL.Upload(ctx, bytes.NewReader(zip), httpHeaders("application/octet-stream"), emptyMeta, emptyBlobAccessCond); err != nil { - // TODO: log - return err + + if len(encountered) > 0 { + message := bytes.NewBufferString("encountered multiple errors during save:\n") + for _, err := range encountered { + fmt.Fprintln(message, err.Error()) + } + return errors.New(message.String()) } // TODO: take out lease on the /list file and add the version to it diff --git a/pkg/storage/backendconnector.go b/pkg/storage/backendconnector.go index adc1d4c8..5afa557a 100644 --- a/pkg/storage/backendconnector.go +++ b/pkg/storage/backendconnector.go @@ -1,5 +1,7 @@ package storage +import "context" + // BackendConnector is a regular storage backend with Connect functionality type BackendConnector interface { Backend @@ -29,8 +31,8 @@ func (n noOpConnectedBackend) Get(module, vsn string) (*Version, error) { func (n noOpConnectedBackend) List(module string) ([]string, error) { return n.backend.List(module) } -func (n noOpConnectedBackend) Save(module, version string, mod, zip, info []byte) error { - return n.backend.Save(module, version, mod, zip, info) +func (n noOpConnectedBackend) Save(ctx context.Context, module, version string, mod, zip, info []byte) error { + return n.backend.Save(ctx, module, version, mod, zip, info) } func (n noOpConnectedBackend) Delete(module, version string) error { return n.backend.Delete(module, version) diff --git a/pkg/storage/fs/fs_test.go b/pkg/storage/fs/fs_test.go index 6ffdb1f5..a5c947a1 100644 --- a/pkg/storage/fs/fs_test.go +++ b/pkg/storage/fs/fs_test.go @@ -1,6 +1,7 @@ package fs import ( + "context" "io/ioutil" "path/filepath" ) @@ -16,7 +17,7 @@ func (d *FsTests) TestLocationFuncs() { func (d *FsTests) TestGetSaveListRoundTrip() { r := d.Require() - r.NoError(d.storage.Save(module, version, mod, zip, info)) + r.NoError(d.storage.Save(context.Background(), module, version, mod, zip, info)) listedVersions, err := d.storage.List(module) r.NoError(err) r.Equal(1, len(listedVersions)) diff --git a/pkg/storage/fs/lister_test.go b/pkg/storage/fs/lister_test.go index 2bc87993..2c359e16 100644 --- a/pkg/storage/fs/lister_test.go +++ b/pkg/storage/fs/lister_test.go @@ -1,10 +1,14 @@ package fs +import ( + "context" +) + func (d *FsTests) TestList() { r := d.Require() versions := []string{"v1.0.0", "v1.1.0", "v1.2.0"} for _, version := range versions { - r.NoError(d.storage.Save(module, version, mod, zip, info)) + r.NoError(d.storage.Save(context.Background(), module, version, mod, zip, info)) } retVersions, err := d.storage.List(module) r.NoError(err) diff --git a/pkg/storage/fs/saver.go b/pkg/storage/fs/saver.go index 076d0880..f6da11ec 100644 --- a/pkg/storage/fs/saver.go +++ b/pkg/storage/fs/saver.go @@ -1,13 +1,14 @@ package fs import ( + "context" "os" "path/filepath" "github.com/spf13/afero" ) -func (s *storageImpl) Save(module, vsn string, mod, zip, info []byte) error { +func (s *storageImpl) Save(_ context.Context, module, vsn string, mod, zip, info []byte) error { dir := s.versionLocation(module, vsn) // TODO: 777 is not the best filemode, use something better diff --git a/pkg/storage/mem/mem_test.go b/pkg/storage/mem/mem_test.go index 65a0de47..b05aa7dc 100644 --- a/pkg/storage/mem/mem_test.go +++ b/pkg/storage/mem/mem_test.go @@ -1,6 +1,7 @@ package mem import ( + "context" "io/ioutil" "testing" @@ -40,7 +41,7 @@ func (d *MemTests) TestGetSaveListRoundTrip() { d.Require().NoError(err) // save and list modules - r.NoError(storage.Save(module, version, mod, zip, info)) + r.NoError(storage.Save(context.Background(), module, version, mod, zip, info)) listedVersions, err := storage.List(module) r.NoError(err) r.Equal(1, len(listedVersions)) diff --git a/pkg/storage/minio/lister_test.go b/pkg/storage/minio/lister_test.go index 76726bbe..03d2c0b5 100644 --- a/pkg/storage/minio/lister_test.go +++ b/pkg/storage/minio/lister_test.go @@ -1,10 +1,14 @@ package minio +import ( + "context" +) + func (d *MinioTests) TestList() { r := d.Require() versions := []string{"v1.0.0", "v1.1.0", "v1.2.0"} for _, version := range versions { - r.NoError(d.storage.Save(module, version, mod, zip, info)) + r.NoError(d.storage.Save(context.Background(), module, version, mod, zip, info)) } retVersions, err := d.storage.List(module) r.NoError(err) diff --git a/pkg/storage/minio/minio_test.go b/pkg/storage/minio/minio_test.go index 862658ab..7baebab5 100644 --- a/pkg/storage/minio/minio_test.go +++ b/pkg/storage/minio/minio_test.go @@ -1,12 +1,13 @@ package minio import ( + "context" "io/ioutil" ) func (d *MinioTests) TestGetSaveListRoundTrip() { r := d.Require() - r.NoError(d.storage.Save(module, version, mod, zip, info)) + r.NoError(d.storage.Save(context.Background(), module, version, mod, zip, info)) listedVersions, err := d.storage.List(module) r.NoError(err) r.Equal(1, len(listedVersions)) diff --git a/pkg/storage/minio/saver.go b/pkg/storage/minio/saver.go index 81a4935b..e2223dd3 100644 --- a/pkg/storage/minio/saver.go +++ b/pkg/storage/minio/saver.go @@ -2,11 +2,12 @@ package minio import ( "bytes" + "context" minio "github.com/minio/minio-go" ) -func (s *storageImpl) Save(module, vsn string, mod, zip, info []byte) error { +func (s *storageImpl) Save(_ context.Context, module, vsn string, mod, zip, info []byte) error { dir := s.versionLocation(module, vsn) modFileName := dir + "/" + "go.mod" zipFileName := dir + "/" + "source.zip" diff --git a/pkg/storage/mongo/lister_test.go b/pkg/storage/mongo/lister_test.go index 1ff4a5a5..bdae8a0e 100644 --- a/pkg/storage/mongo/lister_test.go +++ b/pkg/storage/mongo/lister_test.go @@ -1,10 +1,14 @@ package mongo +import ( + "context" +) + func (m *MongoTests) TestList() { r := m.Require() versions := []string{"v1.0.0", "v1.1.0", "v1.2.0"} for _, version := range versions { - m.storage.Save(module, version, mod, zip, info) + m.storage.Save(context.Background(), module, version, mod, zip, info) } retVersions, err := m.storage.List(module) r.NoError(err) diff --git a/pkg/storage/mongo/mongo_test.go b/pkg/storage/mongo/mongo_test.go index d7c96563..88f14346 100644 --- a/pkg/storage/mongo/mongo_test.go +++ b/pkg/storage/mongo/mongo_test.go @@ -1,12 +1,13 @@ package mongo import ( + "context" "io/ioutil" ) func (m *MongoTests) TestGetSaveListRoundTrip() { r := m.Require() - m.storage.Save(module, version, mod, zip, info) + m.storage.Save(context.Background(), module, version, mod, zip, info) listedVersions, err := m.storage.List(module) r.NoError(err) r.Equal(1, len(listedVersions)) diff --git a/pkg/storage/mongo/saver.go b/pkg/storage/mongo/saver.go index e25a582c..a57fb73e 100644 --- a/pkg/storage/mongo/saver.go +++ b/pkg/storage/mongo/saver.go @@ -1,9 +1,13 @@ package mongo -import "github.com/gomods/athens/pkg/storage" +import ( + "context" + + "github.com/gomods/athens/pkg/storage" +) // Save stores a module in mongo storage. -func (s *ModuleStore) Save(module, version string, mod, zip, info []byte) error { +func (s *ModuleStore) Save(_ context.Context, module, version string, mod, zip, info []byte) error { m := &storage.Module{ Module: module, Version: version, diff --git a/pkg/storage/olympus/saver.go b/pkg/storage/olympus/saver.go index ac74c66d..4239bc7e 100644 --- a/pkg/storage/olympus/saver.go +++ b/pkg/storage/olympus/saver.go @@ -1,8 +1,12 @@ package olympus +import ( + "context" +) + // Save stores a module in olympus. // This actually does not store anything just reports cache miss -func (s *ModuleStore) Save(module, version string, _, _ []byte) error { +func (s *ModuleStore) Save(_ context.Context, module, version string, _, _, _ []byte) error { // dummy implementation so Olympus Store can be used everywhere as Backend iface return nil } diff --git a/pkg/storage/rdbms/lister_test.go b/pkg/storage/rdbms/lister_test.go index da64f22f..e41763de 100644 --- a/pkg/storage/rdbms/lister_test.go +++ b/pkg/storage/rdbms/lister_test.go @@ -1,10 +1,12 @@ package rdbms +import "context" + func (rd *RDBMSTestSuite) TestList() { r := rd.Require() versions := []string{"v1.0.0", "v1.1.0", "v1.2.0"} for _, version := range versions { - rd.storage.Save(module, version, mod, zip, info) + rd.storage.Save(context.Background(), module, version, mod, zip, info) } retVersions, err := rd.storage.List(module) r.NoError(err) diff --git a/pkg/storage/rdbms/rdbms_test.go b/pkg/storage/rdbms/rdbms_test.go index e45a5411..90f42522 100644 --- a/pkg/storage/rdbms/rdbms_test.go +++ b/pkg/storage/rdbms/rdbms_test.go @@ -1,12 +1,13 @@ package rdbms import ( + "context" "io/ioutil" ) func (rd *RDBMSTestSuite) TestGetSaveListRoundTrip() { r := rd.Require() - err := rd.storage.Save(module, version, mod, zip, info) + err := rd.storage.Save(context.Background(), module, version, mod, zip, info) r.NoError(err) listedVersions, err := rd.storage.List(module) r.NoError(err) diff --git a/pkg/storage/rdbms/saver.go b/pkg/storage/rdbms/saver.go index 0c91c58e..e9f79c2e 100644 --- a/pkg/storage/rdbms/saver.go +++ b/pkg/storage/rdbms/saver.go @@ -1,11 +1,13 @@ package rdbms import ( + "context" + "github.com/gomods/athens/pkg/storage/rdbms/models" ) // Save stores a module in rdbms storage. -func (r *ModuleStore) Save(module, version string, mod, zip, info []byte) error { +func (r *ModuleStore) Save(_ context.Context, module, version string, mod, zip, info []byte) error { m := &models.Module{ Module: module, Version: version, diff --git a/pkg/storage/saver.go b/pkg/storage/saver.go index 7f431bab..a720319f 100644 --- a/pkg/storage/saver.go +++ b/pkg/storage/saver.go @@ -1,6 +1,8 @@ package storage +import "context" + // Saver saves module metadata and its source to underlying storage type Saver interface { - Save(module, version string, mod, zip, info []byte) error + Save(ctx context.Context, module, version string, mod, zip, info []byte) error }