Add Context to Saver Interface (#179)

* Adding context to storage.Saver parameters

* +`buffalo.Context` as first param in storage.Saver

Fixes #177

* Parallelizing Upload to Azure Blob Storage

Fixes #175

* Fixing import clausing

* go fmt ./...

* Populating default context

* CI: Echo files modified by gofmt check to stderr

Doing so allows users to see which files still need to be formatted.

* Upgrading Go, then go fmt ./...

* Addressing Feedback

* more responding to feedback

* Adding worker save Default Context

* Removing tracing, buffalo.Context usage in Storage interface

github.com/bketelsen/buffet relies on using a `buffalo.Context`
however, often a `storage.Saver` is invoked from a buffalo worker,
which has no buffalo.Context associated with it. Therefore, we had
two options: create a buffalo context for each worker's Job, or
remove the buffalo.Context dependency in the Saver interface.

* Fixing missed changes

* Review feedback
This commit is contained in:
Martin Strobel
2018-06-21 10:47:31 -07:00
committed by Aaron Schlesinger
parent da5b269c2d
commit d0a18e7dd4
23 changed files with 97 additions and 41 deletions
+1 -1
View File
@@ -15,7 +15,7 @@ env:
- ATHENS_MONGO_STORAGE_URL=mongodb://127.0.0.1:27017
- CODE_COV=1
script:
- test -z $(gofmt -s -l $GO_FILES) # Fail if a .go file hasn't been formatted with gofmt
- test -z $(gofmt -s -l $GO_FILES | tee /dev/stderr) # Fail if a .go file hasn't been formatted with gofmt
- golint -set_exit_status $(go list ./...) # Linter
- go test -race -coverprofile cover.out -covermode atomic ./... # Run all the tests with the race detector and code coverage enabled
before_script:
+4 -4
View File
@@ -1,14 +1,14 @@
package actions
import (
"context"
"io/ioutil"
"log"
"time"
"github.com/gomods/athens/pkg/storage"
"github.com/gomods/athens/pkg/cdn"
"github.com/gomods/athens/pkg/eventlog"
"github.com/gomods/athens/pkg/storage"
)
// mergeDB merges diff into the module database.
@@ -22,7 +22,7 @@ import (
// - Delete operation adds tombstone to module metadata k/v store
//
// Both could be fixed by putting each 'for' loop into a (global) critical section
func mergeDB(originURL string, diff dbDiff, eLog eventlog.Eventlog, storage storage.Backend) error {
func mergeDB(ctx context.Context, originURL string, diff dbDiff, eLog eventlog.Eventlog, storage storage.Backend) error {
for _, added := range diff.Added {
if _, err := eLog.ReadSingle(added.Module, added.Version); err != nil {
// the module/version already exists, is deprecated, or is
@@ -46,7 +46,7 @@ func mergeDB(originURL string, diff dbDiff, eLog eventlog.Eventlog, storage stor
continue
}
if err := storage.Save(added.Module, added.Version, data.Mod, zipBytes, data.Info); err != nil {
if err := storage.Save(ctx, added.Module, added.Version, data.Mod, zipBytes, data.Info); err != nil {
log.Printf("error saving new module %s/%s to CDN (%s)", added.Module, added.Version, err)
continue
}
+2 -1
View File
@@ -1,6 +1,7 @@
package actions
import (
"context"
"errors"
"io/ioutil"
"path/filepath"
@@ -51,7 +52,7 @@ func GetPackageDownloaderJob(s storage.Backend, e eventlog.Eventlog, w worker.Wo
}
// save it
if err := s.Save(module, version, modBytes, zipBytes, infoBytes); err != nil {
if err := s.Save(context.Background(), module, version, modBytes, zipBytes, infoBytes); err != nil {
return err
}
+3 -2
View File
@@ -1,6 +1,7 @@
package actions
import (
"context"
"errors"
"fmt"
"io/ioutil"
@@ -18,7 +19,7 @@ const (
OlympusGlobalEndpointOverrideKey = "OLYMPUS_GLOBAL_ENDPOINT"
)
// GetProcessCacheMissJob porcesses queue of cache misses and downloads sources from active Olympus
// GetProcessCacheMissJob processes queue of cache misses and downloads sources from active Olympus
func GetProcessCacheMissJob(s storage.Backend, w worker.Worker) worker.Handler {
return func(args worker.Args) (err error) {
module, version, err := parseArgs(args)
@@ -43,7 +44,7 @@ func GetProcessCacheMissJob(s storage.Backend, w worker.Worker) worker.Handler {
return err
}
if err = s.Save(module, version, v.Mod, zip, v.Info); err != nil {
if err = s.Save(context.Background(), module, version, v.Mod, zip, v.Info); err != nil {
process(module, version, args, w)
}
+1 -1
View File
@@ -61,7 +61,7 @@ func fetchHandler(store storage.Saver) func(c buffalo.Context) error {
return fmt.Errorf("coudln't find .info file (%s)", err)
}
saveErr := store.Save(moduleName, version, modBytes, zipBytes, infoBytes)
saveErr := store.Save(c, moduleName, version, modBytes, zipBytes, infoBytes)
if storage.IsVersionAlreadyExistsErr(saveErr) {
return c.Error(http.StatusConflict, saveErr)
} else if err != nil {
+1 -1
View File
@@ -24,7 +24,7 @@ func uploadHandler(store storage.Saver) func(c buffalo.Context) error {
if c.Bind(payload); err != nil {
return errors.WithStack(err)
}
saveErr := store.Save(mod, version, payload.Module, payload.Zip, payload.Info)
saveErr := store.Save(c, mod, version, payload.Module, payload.Zip, payload.Info)
if storage.IsVersionAlreadyExistsErr(saveErr) {
return c.Error(http.StatusConflict, saveErr)
} else if err != nil {
+32 -13
View File
@@ -3,7 +3,9 @@ package azurecdn
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/url"
"github.com/Azure/azure-storage-blob-go/2017-07-29/azblob"
@@ -37,9 +39,7 @@ func (s Storage) BaseURL() *url.URL {
}
// Save implements the (github.com/gomods/athens/pkg/storage).Saver interface.
func (s *Storage) Save(module, version string, mod, zip, info []byte) error {
ctx := context.Background()
func (s *Storage) Save(ctx context.Context, module, version string, mod, zip, info []byte) error {
pipe := azblob.NewPipeline(s.cred, azblob.PipelineOptions{})
serviceURL := azblob.NewServiceURL(*s.accountURL, pipe)
// rules on container names:
@@ -59,18 +59,37 @@ func (s *Storage) Save(module, version string, mod, zip, info []byte) error {
}
emptyMeta := map[string]string{}
emptyBlobAccessCond := azblob.BlobAccessConditions{}
// TODO: do these in parallel
if _, err := infoBlobURL.Upload(ctx, bytes.NewReader(info), httpHeaders("application/json"), emptyMeta, emptyBlobAccessCond); err != nil {
// TODO: log
return err
const numUpload = 3
uploadErrs := make(chan error, numUpload)
upload := func(url azblob.BlockBlobURL, content io.ReadSeeker, contentType string) {
_, err := url.Upload(ctx, content, httpHeaders(contentType), emptyMeta, emptyBlobAccessCond)
uploadErrs <- err
}
if _, err := modBlobURL.Upload(ctx, bytes.NewReader(info), httpHeaders("text/plain"), emptyMeta, emptyBlobAccessCond); err != nil {
// TODO: log
return err
go upload(infoBlobURL, bytes.NewReader(info), "application/json")
go upload(modBlobURL, bytes.NewReader(mod), "text/plain")
go upload(zipBlobURL, bytes.NewReader(zip), "application/octet-stream")
encountered := make([]error, 0, numUpload)
for i := 0; i < numUpload; i++ {
select {
case err := <-uploadErrs:
if err != nil {
encountered = append(encountered, err)
}
case <-ctx.Done():
return ctx.Err()
}
}
if _, err := zipBlobURL.Upload(ctx, bytes.NewReader(zip), httpHeaders("application/octet-stream"), emptyMeta, emptyBlobAccessCond); err != nil {
// TODO: log
return err
if len(encountered) > 0 {
message := bytes.NewBufferString("encountered multiple errors during save:\n")
for _, err := range encountered {
fmt.Fprintln(message, err.Error())
}
return errors.New(message.String())
}
// TODO: take out lease on the /list file and add the version to it
+4 -2
View File
@@ -1,5 +1,7 @@
package storage
import "context"
// BackendConnector is a regular storage backend with Connect functionality
type BackendConnector interface {
Backend
@@ -29,8 +31,8 @@ func (n noOpConnectedBackend) Get(module, vsn string) (*Version, error) {
func (n noOpConnectedBackend) List(module string) ([]string, error) {
return n.backend.List(module)
}
func (n noOpConnectedBackend) Save(module, version string, mod, zip, info []byte) error {
return n.backend.Save(module, version, mod, zip, info)
func (n noOpConnectedBackend) Save(ctx context.Context, module, version string, mod, zip, info []byte) error {
return n.backend.Save(ctx, module, version, mod, zip, info)
}
func (n noOpConnectedBackend) Delete(module, version string) error {
return n.backend.Delete(module, version)
+2 -1
View File
@@ -1,6 +1,7 @@
package fs
import (
"context"
"io/ioutil"
"path/filepath"
)
@@ -16,7 +17,7 @@ func (d *FsTests) TestLocationFuncs() {
func (d *FsTests) TestGetSaveListRoundTrip() {
r := d.Require()
r.NoError(d.storage.Save(module, version, mod, zip, info))
r.NoError(d.storage.Save(context.Background(), module, version, mod, zip, info))
listedVersions, err := d.storage.List(module)
r.NoError(err)
r.Equal(1, len(listedVersions))
+5 -1
View File
@@ -1,10 +1,14 @@
package fs
import (
"context"
)
func (d *FsTests) TestList() {
r := d.Require()
versions := []string{"v1.0.0", "v1.1.0", "v1.2.0"}
for _, version := range versions {
r.NoError(d.storage.Save(module, version, mod, zip, info))
r.NoError(d.storage.Save(context.Background(), module, version, mod, zip, info))
}
retVersions, err := d.storage.List(module)
r.NoError(err)
+2 -1
View File
@@ -1,13 +1,14 @@
package fs
import (
"context"
"os"
"path/filepath"
"github.com/spf13/afero"
)
func (s *storageImpl) Save(module, vsn string, mod, zip, info []byte) error {
func (s *storageImpl) Save(_ context.Context, module, vsn string, mod, zip, info []byte) error {
dir := s.versionLocation(module, vsn)
// TODO: 777 is not the best filemode, use something better
+2 -1
View File
@@ -1,6 +1,7 @@
package mem
import (
"context"
"io/ioutil"
"testing"
@@ -40,7 +41,7 @@ func (d *MemTests) TestGetSaveListRoundTrip() {
d.Require().NoError(err)
// save and list modules
r.NoError(storage.Save(module, version, mod, zip, info))
r.NoError(storage.Save(context.Background(), module, version, mod, zip, info))
listedVersions, err := storage.List(module)
r.NoError(err)
r.Equal(1, len(listedVersions))
+5 -1
View File
@@ -1,10 +1,14 @@
package minio
import (
"context"
)
func (d *MinioTests) TestList() {
r := d.Require()
versions := []string{"v1.0.0", "v1.1.0", "v1.2.0"}
for _, version := range versions {
r.NoError(d.storage.Save(module, version, mod, zip, info))
r.NoError(d.storage.Save(context.Background(), module, version, mod, zip, info))
}
retVersions, err := d.storage.List(module)
r.NoError(err)
+2 -1
View File
@@ -1,12 +1,13 @@
package minio
import (
"context"
"io/ioutil"
)
func (d *MinioTests) TestGetSaveListRoundTrip() {
r := d.Require()
r.NoError(d.storage.Save(module, version, mod, zip, info))
r.NoError(d.storage.Save(context.Background(), module, version, mod, zip, info))
listedVersions, err := d.storage.List(module)
r.NoError(err)
r.Equal(1, len(listedVersions))
+2 -1
View File
@@ -2,11 +2,12 @@ package minio
import (
"bytes"
"context"
minio "github.com/minio/minio-go"
)
func (s *storageImpl) Save(module, vsn string, mod, zip, info []byte) error {
func (s *storageImpl) Save(_ context.Context, module, vsn string, mod, zip, info []byte) error {
dir := s.versionLocation(module, vsn)
modFileName := dir + "/" + "go.mod"
zipFileName := dir + "/" + "source.zip"
+5 -1
View File
@@ -1,10 +1,14 @@
package mongo
import (
"context"
)
func (m *MongoTests) TestList() {
r := m.Require()
versions := []string{"v1.0.0", "v1.1.0", "v1.2.0"}
for _, version := range versions {
m.storage.Save(module, version, mod, zip, info)
m.storage.Save(context.Background(), module, version, mod, zip, info)
}
retVersions, err := m.storage.List(module)
r.NoError(err)
+2 -1
View File
@@ -1,12 +1,13 @@
package mongo
import (
"context"
"io/ioutil"
)
func (m *MongoTests) TestGetSaveListRoundTrip() {
r := m.Require()
m.storage.Save(module, version, mod, zip, info)
m.storage.Save(context.Background(), module, version, mod, zip, info)
listedVersions, err := m.storage.List(module)
r.NoError(err)
r.Equal(1, len(listedVersions))
+6 -2
View File
@@ -1,9 +1,13 @@
package mongo
import "github.com/gomods/athens/pkg/storage"
import (
"context"
"github.com/gomods/athens/pkg/storage"
)
// Save stores a module in mongo storage.
func (s *ModuleStore) Save(module, version string, mod, zip, info []byte) error {
func (s *ModuleStore) Save(_ context.Context, module, version string, mod, zip, info []byte) error {
m := &storage.Module{
Module: module,
Version: version,
+5 -1
View File
@@ -1,8 +1,12 @@
package olympus
import (
"context"
)
// Save stores a module in olympus.
// This actually does not store anything just reports cache miss
func (s *ModuleStore) Save(module, version string, _, _ []byte) error {
func (s *ModuleStore) Save(_ context.Context, module, version string, _, _, _ []byte) error {
// dummy implementation so Olympus Store can be used everywhere as Backend iface
return nil
}
+3 -1
View File
@@ -1,10 +1,12 @@
package rdbms
import "context"
func (rd *RDBMSTestSuite) TestList() {
r := rd.Require()
versions := []string{"v1.0.0", "v1.1.0", "v1.2.0"}
for _, version := range versions {
rd.storage.Save(module, version, mod, zip, info)
rd.storage.Save(context.Background(), module, version, mod, zip, info)
}
retVersions, err := rd.storage.List(module)
r.NoError(err)
+2 -1
View File
@@ -1,12 +1,13 @@
package rdbms
import (
"context"
"io/ioutil"
)
func (rd *RDBMSTestSuite) TestGetSaveListRoundTrip() {
r := rd.Require()
err := rd.storage.Save(module, version, mod, zip, info)
err := rd.storage.Save(context.Background(), module, version, mod, zip, info)
r.NoError(err)
listedVersions, err := rd.storage.List(module)
r.NoError(err)
+3 -1
View File
@@ -1,11 +1,13 @@
package rdbms
import (
"context"
"github.com/gomods/athens/pkg/storage/rdbms/models"
)
// Save stores a module in rdbms storage.
func (r *ModuleStore) Save(module, version string, mod, zip, info []byte) error {
func (r *ModuleStore) Save(_ context.Context, module, version string, mod, zip, info []byte) error {
m := &models.Module{
Module: module,
Version: version,
+3 -1
View File
@@ -1,6 +1,8 @@
package storage
import "context"
// Saver saves module metadata and its source to underlying storage
type Saver interface {
Save(module, version string, mod, zip, info []byte) error
Save(ctx context.Context, module, version string, mod, zip, info []byte) error
}