pkg/storage: add External implementation (#1587)

* pkg/storage: add External implementation

* fix conflicts

* use newly instantiated client
This commit is contained in:
Marwan Sulaiman
2020-03-27 13:35:52 -04:00
committed by GitHub
parent a36be996b6
commit 3c4db4ce86
12 changed files with 421 additions and 19 deletions
+7
View File
@@ -9,6 +9,7 @@ import (
"github.com/gomods/athens/pkg/errors" "github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/storage" "github.com/gomods/athens/pkg/storage"
"github.com/gomods/athens/pkg/storage/azureblob" "github.com/gomods/athens/pkg/storage/azureblob"
"github.com/gomods/athens/pkg/storage/external"
"github.com/gomods/athens/pkg/storage/fs" "github.com/gomods/athens/pkg/storage/fs"
"github.com/gomods/athens/pkg/storage/gcp" "github.com/gomods/athens/pkg/storage/gcp"
"github.com/gomods/athens/pkg/storage/mem" "github.com/gomods/athens/pkg/storage/mem"
@@ -60,6 +61,12 @@ func GetStorage(storageType string, storageConfig *config.StorageConfig, timeout
return nil, errors.E(op, "Invalid AzureBlob Storage Configuration") return nil, errors.E(op, "Invalid AzureBlob Storage Configuration")
} }
return azureblob.New(storageConfig.AzureBlob, timeout) return azureblob.New(storageConfig.AzureBlob, timeout)
case "external":
if storageConfig.External == nil {
return nil, errors.E(op, "Invalid External Storage Configuration")
}
// TODO(marwan-at-work): add client tracing
return external.NewClient(storageConfig.External.URL, nil), nil
default: default:
return nil, fmt.Errorf("storage type %s is unknown", storageType) return nil, fmt.Errorf("storage type %s is unknown", storageType)
} }
+9 -1
View File
@@ -113,7 +113,7 @@ RobotsFile = "robots.txt"
Timeout = 300 Timeout = 300
# StorageType sets the type of storage backend the proxy will use. # StorageType sets the type of storage backend the proxy will use.
# Possible values are memory, disk, mongo, gcp, minio, s3, azureblob # Possible values are memory, disk, mongo, gcp, minio, s3, azureblob, external
# Defaults to memory # Defaults to memory
# Env override: ATHENS_STORAGE_TYPE # Env override: ATHENS_STORAGE_TYPE
StorageType = "memory" StorageType = "memory"
@@ -467,3 +467,11 @@ SingleFlightType = "memory"
# Name of container in the blob storage # Name of container in the blob storage
# Env override: ATHENS_AZURE_CONTAINER_NAME # Env override: ATHENS_AZURE_CONTAINER_NAME
ContainerName = "MY_AZURE_BLOB_CONTAINER_NAME" ContainerName = "MY_AZURE_BLOB_CONTAINER_NAME"
[Storage.External]
# URL is the external storage URL that Athens
# will use to interact with the backend storage layer.
# See https://docs.gomods.io/configuration/storage for implementation
# details.
# Env override: ATHENS_EXTERNAL_STORAGE_URL
URL = ""
+40
View File
@@ -27,6 +27,8 @@ The Athens proxy supports many storage types:
- [Configuration:](#configuration-7) - [Configuration:](#configuration-7)
- [Azure Blob Storage](#azure-blob-storage) - [Azure Blob Storage](#azure-blob-storage)
- [Configuration:](#configuration-8) - [Configuration:](#configuration-8)
- [External Storage](#external-storage)
- [Configuration:](#configuration-9)
All of them can be configured using `config.toml` file. You need to set a valid driver in `StorageType` value or you can set it in environment variable `ATHENS_STORAGE_TYPE` on your server. All of them can be configured using `config.toml` file. You need to set a valid driver in `StorageType` value or you can set it in environment variable `ATHENS_STORAGE_TYPE` on your server.
Also for most of the drivers you need to provide additional configuration data which will be described below. Also for most of the drivers you need to provide additional configuration data which will be described below.
@@ -336,6 +338,44 @@ It assumes that you already have the following:
# Env override: ATHENS_AZURE_CONTAINER_NAME # Env override: ATHENS_AZURE_CONTAINER_NAME
ContainerName = "MY_AZURE_BLOB_CONTAINER_NAME" ContainerName = "MY_AZURE_BLOB_CONTAINER_NAME"
## External Storage
External storage lets Athens connect to your own implementation of a storage backend.
All you have to do is implement the (storage.Backend)[https://github.com/gomods/athens/blob/master/pkg/storage/backend.go#L4] interface and run it behind an http server.
Once you implement the backend server, you must then configure Athens to use that storage backend as such:
##### Configuration:
# Env override: ATHENS_STORAGE_TYPE
StorageType = "external"
[Storage]
[Storage.External]
# Env override: ATHENS_EXTERNAL_STORAGE_URL
URL = "http://localhost:9090"
Athens provides a convenience wrapper that lets you implement a storage backend with ease. See the following example:
```golang
package main
import (
"github.com/gomods/athens/pkg/storage"
"github.com/gomods/athens/pkg/storage/external"
)
// TODO: implement storage.Backend
type myCustomStorage struct {
storage.Backend
}
func main() {
handler := external.NewServer(&myCustomStorage{})
http.ListenAndServe(":9090", handler)
}
```
## Running multiple Athens pointed at the same storage ## Running multiple Athens pointed at the same storage
Athens has the ability to run concurrently pointed at the same storage medium, using Athens has the ability to run concurrently pointed at the same storage medium, using
+2
View File
@@ -286,6 +286,8 @@ func validateConfig(config Config) error {
return validate.Struct(config.Storage.S3) return validate.Struct(config.Storage.S3)
case "azureblob": case "azureblob":
return validate.Struct(config.Storage.AzureBlob) return validate.Struct(config.Storage.AzureBlob)
case "external":
return validate.Struct(config.Storage.External)
default: default:
return fmt.Errorf("storage type %s is unknown", config.StorageType) return fmt.Errorf("storage type %s is unknown", config.StorageType)
} }
+6
View File
@@ -0,0 +1,6 @@
package config
// External specifies configuration for an external http storage
type External struct {
URL string `validate:"required" envconfig:"ATHENS_EXTERNAL_STORAGE_URL"`
}
+1
View File
@@ -8,4 +8,5 @@ type StorageConfig struct {
Mongo *MongoConfig Mongo *MongoConfig
S3 *S3Config S3 *S3Config
AzureBlob *AzureBlobConfig AzureBlob *AzureBlobConfig
External *External
} }
+7 -7
View File
@@ -33,7 +33,7 @@ func RunTests(t *testing.T, b storage.Backend, clearBackend func() error) {
// returns a KindNotFound error when asking for // returns a KindNotFound error when asking for
// non existing modules. // non existing modules.
func testNotFound(t *testing.T, b storage.Backend) { func testNotFound(t *testing.T, b storage.Backend) {
mod, ver := "xxx", "yyy" mod, ver := "github.com/gomods/athens", "yyy"
ctx := context.Background() ctx := context.Background()
err := b.Delete(ctx, mod, ver) err := b.Delete(ctx, mod, ver)
@@ -106,7 +106,7 @@ func testListSuffix(t *testing.T, b storage.Backend) {
func testList(t *testing.T, b storage.Backend) { func testList(t *testing.T, b storage.Backend) {
ctx := context.Background() ctx := context.Background()
modname := "listMod" modname := "github.com/gomods/athens"
versions := []string{"v1.1.0", "v1.2.0", "v1.3.0"} versions := []string{"v1.1.0", "v1.2.0", "v1.3.0"}
for _, version := range versions { for _, version := range versions {
mock := getMockModule() mock := getMockModule()
@@ -133,7 +133,7 @@ func testList(t *testing.T, b storage.Backend) {
// testGet saves and retrieves a module successfully. // testGet saves and retrieves a module successfully.
func testGet(t *testing.T, b storage.Backend) { func testGet(t *testing.T, b storage.Backend) {
ctx := context.Background() ctx := context.Background()
modname := "getTestModule" modname := "github.com/gomods/athens"
ver := "v1.2.3" ver := "v1.2.3"
mock := getMockModule() mock := getMockModule()
zipBts, _ := ioutil.ReadAll(mock.Zip) zipBts, _ := ioutil.ReadAll(mock.Zip)
@@ -146,7 +146,7 @@ func testGet(t *testing.T, b storage.Backend) {
mod, err := b.GoMod(ctx, modname, ver) mod, err := b.GoMod(ctx, modname, ver)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, mock.Mod, mod) require.Equal(t, string(mock.Mod), string(mod))
zip, err := b.Zip(ctx, modname, ver) zip, err := b.Zip(ctx, modname, ver)
require.NoError(t, err) require.NoError(t, err)
@@ -157,7 +157,7 @@ func testGet(t *testing.T, b storage.Backend) {
func testExists(t *testing.T, b storage.Backend) { func testExists(t *testing.T, b storage.Backend) {
ctx := context.Background() ctx := context.Background()
modname := "getTestModule" modname := "github.com/gomods/athens"
ver := "v1.2.3" ver := "v1.2.3"
mock := getMockModule() mock := getMockModule()
zipBts, _ := ioutil.ReadAll(mock.Zip) zipBts, _ := ioutil.ReadAll(mock.Zip)
@@ -171,7 +171,7 @@ func testExists(t *testing.T, b storage.Backend) {
func testShouldNotExist(t *testing.T, b storage.Backend) { func testShouldNotExist(t *testing.T, b storage.Backend) {
ctx := context.Background() ctx := context.Background()
mod := "shouldNotExist" mod := "github.com/gomods/shouldNotExist"
ver := "v1.2.3-pre.1" ver := "v1.2.3-pre.1"
mock := getMockModule() mock := getMockModule()
zipBts, _ := ioutil.ReadAll(mock.Zip) zipBts, _ := ioutil.ReadAll(mock.Zip)
@@ -193,7 +193,7 @@ func testShouldNotExist(t *testing.T, b storage.Backend) {
// afterwards. // afterwards.
func testDelete(t *testing.T, b storage.Backend) { func testDelete(t *testing.T, b storage.Backend) {
ctx := context.Background() ctx := context.Background()
modname := "deleteModule" modname := "github.com/gomods/athens"
version := fmt.Sprintf("%s%d", "delete", rand.Int()) version := fmt.Sprintf("%s%d", "delete", rand.Int())
mock := getMockModule() mock := getMockModule()
+183
View File
@@ -0,0 +1,183 @@
package external
import (
"bufio"
"context"
"fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
"strings"
"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/storage"
"golang.org/x/mod/module"
)
type service struct {
url string
c *http.Client
}
// NewClient returns an external storage client
func NewClient(url string, c *http.Client) storage.Backend {
if c == nil {
c = &http.Client{}
}
url = strings.TrimSuffix(url, "/")
return &service{url, c}
}
func (s *service) List(ctx context.Context, mod string) ([]string, error) {
const op errors.Op = "external.List"
body, err := s.getRequest(ctx, mod, "list", "")
if err != nil {
return nil, errors.E(op, err)
}
list := []string{}
scnr := bufio.NewScanner(body)
for scnr.Scan() {
list = append(list, scnr.Text())
}
if scnr.Err() != nil {
return nil, errors.E(op, scnr.Err())
}
return list, nil
}
func (s *service) Info(ctx context.Context, mod, ver string) ([]byte, error) {
const op errors.Op = "external.Info"
body, err := s.getRequest(ctx, mod, ver, "info")
if err != nil {
return nil, errors.E(op, err)
}
info, err := ioutil.ReadAll(body)
if err != nil {
return nil, errors.E(op, err)
}
return info, nil
}
func (s *service) GoMod(ctx context.Context, mod, ver string) ([]byte, error) {
const op errors.Op = "external.GoMod"
body, err := s.getRequest(ctx, mod, ver, "mod")
if err != nil {
return nil, errors.E(op, err)
}
modFile, err := ioutil.ReadAll(body)
if err != nil {
return nil, errors.E(op, err)
}
return modFile, nil
}
func (s *service) Zip(ctx context.Context, mod, ver string) (io.ReadCloser, error) {
const op errors.Op = "external.Zip"
body, err := s.getRequest(ctx, mod, ver, "zip")
if err != nil {
return nil, errors.E(op, err)
}
return body, nil
}
func (s *service) Save(ctx context.Context, mod, ver string, modFile []byte, zip io.Reader, info []byte) error {
const op errors.Op = "external.Save"
var err error
mod, err = module.EscapePath(mod)
if err != nil {
panic(err)
}
url := s.url + "/" + mod + "/@v/" + ver + ".save"
pr, pw := io.Pipe()
mw := multipart.NewWriter(pw)
go func() {
err := upload(mw, modFile, info, zip)
pw.CloseWithError(err)
}()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, pr)
if err != nil {
return errors.E(op, err)
}
req.Header.Add("Content-Type", mw.FormDataContentType())
resp, err := s.c.Do(req)
if err != nil {
return errors.E(op, err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
bts, _ := ioutil.ReadAll(resp.Body)
return errors.E(op, fmt.Errorf("unexpected status code: %v - body: %s", resp.StatusCode, bts), resp.StatusCode)
}
return nil
}
func (s *service) Delete(ctx context.Context, mod, ver string) error {
const op errors.Op = "external.Delete"
body, err := s.doRequest(ctx, "DELETE", mod, ver, "delete")
if err != nil {
return errors.E(op, err)
}
defer body.Close()
return nil
}
func upload(mw *multipart.Writer, mod, info []byte, zip io.Reader) error {
defer mw.Close()
infoW, err := mw.CreateFormFile("mod.info", "mod.info")
if err != nil {
return fmt.Errorf("error creating info file: %v", err)
}
_, err = infoW.Write(info)
if err != nil {
return fmt.Errorf("error writing info file: %v", err)
}
modW, err := mw.CreateFormFile("mod.mod", "mod.mod")
if err != nil {
return fmt.Errorf("error creating mod file: %v", err)
}
_, err = modW.Write(mod)
if err != nil {
return fmt.Errorf("error writing mod file: %v", err)
}
zipW, err := mw.CreateFormFile("mod.zip", "mod.zip")
if err != nil {
return fmt.Errorf("error creating zip file: %v", err)
}
_, err = io.Copy(zipW, zip)
if err != nil {
return fmt.Errorf("error writing zip file: %v", err)
}
return nil
}
func (s *service) getRequest(ctx context.Context, mod, ver, ext string) (io.ReadCloser, error) {
return s.doRequest(ctx, "GET", mod, ver, ext)
}
func (s *service) doRequest(ctx context.Context, method, mod, ver, ext string) (io.ReadCloser, error) {
const op errors.Op = "external.doRequest"
var err error
mod, err = module.EscapePath(mod)
if err != nil {
return nil, errors.E(op, err)
}
url := s.url + "/" + mod + "/@v/" + ver
if ext != "" {
url += "." + ext
}
req, err := http.NewRequestWithContext(ctx, method, url, nil)
if err != nil {
return nil, errors.E(op, err)
}
resp, err := s.c.Do(req)
if err != nil {
return nil, errors.E(op, err)
}
if resp.StatusCode != 200 {
body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
return nil, errors.E(op, fmt.Errorf("none 200 status code: %v - body: %s", resp.StatusCode, body), resp.StatusCode)
}
return resp.Body, nil
}
+22
View File
@@ -0,0 +1,22 @@
package external
import (
"net/http/httptest"
"testing"
"github.com/gomods/athens/pkg/storage/compliance"
"github.com/gomods/athens/pkg/storage/mem"
)
func TestExternal(t *testing.T) {
strg, err := mem.NewStorage()
if err != nil {
t.Fatal(err)
}
handler := NewServer(strg)
srv := httptest.NewServer(handler)
defer srv.Close()
externalStrg := NewClient(srv.URL, nil)
clear := strg.(interface{ Clear() error }).Clear
compliance.RunTests(t, externalStrg, clear)
}
+133
View File
@@ -0,0 +1,133 @@
package external
import (
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
"github.com/gomods/athens/pkg/download"
"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/paths"
"github.com/gomods/athens/pkg/storage"
"github.com/gorilla/mux"
"golang.org/x/mod/zip"
)
// NewServer takes a storage.Backend implementation of your
// choice, and returns a new http.Handler that Athens can
// reach out to for storage operations
func NewServer(strg storage.Backend) http.Handler {
r := mux.NewRouter()
r.HandleFunc(download.PathList, func(w http.ResponseWriter, r *http.Request) {
mod := mux.Vars(r)["module"]
list, err := strg.List(r.Context(), mod)
if err != nil {
http.Error(w, err.Error(), errors.Kind(err))
return
}
fmt.Fprintf(w, "%s", strings.Join(list, "\n"))
}).Methods(http.MethodGet)
r.HandleFunc(download.PathVersionInfo, func(w http.ResponseWriter, r *http.Request) {
params, err := paths.GetAllParams(r)
if err != nil {
http.Error(w, err.Error(), 400)
return
}
info, err := strg.Info(r.Context(), params.Module, params.Version)
if err != nil {
http.Error(w, err.Error(), errors.Kind(err))
return
}
w.Write(info)
}).Methods(http.MethodGet)
r.HandleFunc(download.PathVersionModule, func(w http.ResponseWriter, r *http.Request) {
params, err := paths.GetAllParams(r)
if err != nil {
http.Error(w, err.Error(), 400)
return
}
mod, err := strg.GoMod(r.Context(), params.Module, params.Version)
if err != nil {
http.Error(w, err.Error(), errors.Kind(err))
return
}
w.Write(mod)
}).Methods(http.MethodGet)
r.HandleFunc(download.PathVersionZip, func(w http.ResponseWriter, r *http.Request) {
params, err := paths.GetAllParams(r)
if err != nil {
http.Error(w, err.Error(), 400)
return
}
zip, err := strg.Zip(r.Context(), params.Module, params.Version)
if err != nil {
http.Error(w, err.Error(), errors.Kind(err))
return
}
defer zip.Close()
io.Copy(w, zip)
}).Methods(http.MethodGet)
r.HandleFunc("/{module:.+}/@v/{version}.save", func(w http.ResponseWriter, r *http.Request) {
params, err := paths.GetAllParams(r)
if err != nil {
fmt.Println("REALLY?", err)
http.Error(w, err.Error(), 400)
return
}
err = r.ParseMultipartForm(zip.MaxZipFile + zip.MaxGoMod)
if err != nil {
fmt.Printf("parse: %v\n", err)
http.Error(w, err.Error(), 400)
return
}
infoFile, _, err := r.FormFile("mod.info")
if err != nil {
http.Error(w, err.Error(), 400)
return
}
defer infoFile.Close()
info, err := ioutil.ReadAll(infoFile)
if err != nil {
http.Error(w, err.Error(), 400)
return
}
modReader, _, err := r.FormFile("mod.mod")
if err != nil {
http.Error(w, err.Error(), 400)
return
}
defer modReader.Close()
modFile, err := ioutil.ReadAll(modReader)
if err != nil {
http.Error(w, err.Error(), 400)
return
}
modZ, _, err := r.FormFile("mod.zip")
if err != nil {
http.Error(w, err.Error(), 400)
return
}
defer modZ.Close()
err = strg.Save(r.Context(), params.Module, params.Version, modFile, modZ, info)
if err != nil {
http.Error(w, err.Error(), 400)
return
}
}).Methods(http.MethodPost)
r.HandleFunc("/{module:.+}/@v/{version}.delete", func(w http.ResponseWriter, r *http.Request) {
params, err := paths.GetAllParams(r)
if err != nil {
http.Error(w, err.Error(), 400)
return
}
err = strg.Delete(r.Context(), params.Module, params.Version)
if err != nil {
http.Error(w, err.Error(), errors.Kind(err))
return
}
}).Methods(http.MethodDelete)
return r
}
+8
View File
@@ -2,6 +2,7 @@ package fs
import ( import (
"fmt" "fmt"
"os"
"path/filepath" "path/filepath"
"github.com/gomods/athens/pkg/errors" "github.com/gomods/athens/pkg/errors"
@@ -37,3 +38,10 @@ func NewStorage(rootDir string, filesystem afero.Fs) (storage.Backend, error) {
} }
return &storageImpl{rootDir: rootDir, filesystem: filesystem}, nil return &storageImpl{rootDir: rootDir, filesystem: filesystem}, nil
} }
func (s *storageImpl) Clear() error {
if err := s.filesystem.RemoveAll(s.rootDir); err != nil {
return err
}
return s.filesystem.Mkdir(s.rootDir, os.ModeDir|os.ModePerm)
}
+3 -11
View File
@@ -1,7 +1,6 @@
package fs package fs
import ( import (
"os"
"testing" "testing"
"github.com/gomods/athens/pkg/storage/compliance" "github.com/gomods/athens/pkg/storage/compliance"
@@ -12,27 +11,20 @@ import (
func TestBackend(t *testing.T) { func TestBackend(t *testing.T) {
fs := afero.NewMemMapFs() fs := afero.NewMemMapFs()
b := getStorage(t, fs) b := getStorage(t, fs)
compliance.RunTests(t, b, b.clear) compliance.RunTests(t, b, b.Clear)
fs.RemoveAll(b.rootDir) fs.RemoveAll(b.rootDir)
} }
func BenchmarkBackend(b *testing.B) { func BenchmarkBackend(b *testing.B) {
fs := afero.NewOsFs() fs := afero.NewOsFs()
backend := getStorage(b, fs) backend := getStorage(b, fs)
compliance.RunBenchmarks(b, backend, backend.clear) compliance.RunBenchmarks(b, backend, backend.Clear)
fs.RemoveAll(backend.rootDir) fs.RemoveAll(backend.rootDir)
} }
func BenchmarkMemory(b *testing.B) { func BenchmarkMemory(b *testing.B) {
backend := getStorage(b, afero.NewMemMapFs()) backend := getStorage(b, afero.NewMemMapFs())
compliance.RunBenchmarks(b, backend, backend.clear) compliance.RunBenchmarks(b, backend, backend.Clear)
}
func (s *storageImpl) clear() error {
if err := s.filesystem.RemoveAll(s.rootDir); err != nil {
return err
}
return s.filesystem.Mkdir(s.rootDir, os.ModeDir|os.ModePerm)
} }
func getStorage(tb testing.TB, fs afero.Fs) *storageImpl { func getStorage(tb testing.TB, fs afero.Fs) *storageImpl {