Download module version from CDN (#283)

* add download function which downloads a ver from cdn

* use cdn download func

* save after rename

* review feedback

* just to trigger a build

* pass timeout and move to pkg/module

* use multierror

* rm empty line
This commit is contained in:
marpio
2018-07-19 00:31:30 +02:00
committed by Aaron Schlesinger
parent 4a64bf10a7
commit 9913736bc7
4 changed files with 145 additions and 19 deletions
+6 -5
View File
@@ -5,8 +5,9 @@ import (
"log"
"time"
"github.com/gomods/athens/pkg/cdn"
"github.com/gomods/athens/pkg/config/env"
"github.com/gomods/athens/pkg/eventlog"
"github.com/gomods/athens/pkg/module"
"github.com/gomods/athens/pkg/storage"
multierror "github.com/hashicorp/go-multierror"
)
@@ -22,10 +23,10 @@ import (
// - Delete operation adds tombstone to module metadata k/v store
//
// Both could be fixed by putting each 'for' loop into a (global) critical section
func mergeDB(ctx context.Context, originURL string, diff dbDiff, eLog eventlog.Eventlog, storage storage.Backend) error {
func mergeDB(ctx context.Context, originURL string, diff dbDiff, eLog eventlog.Eventlog, storage storage.Backend, downloader module.Downloader) error {
var errors error
for _, added := range diff.Added {
if err := add(ctx, added, originURL, eLog, storage); err != nil {
if err := add(ctx, added, originURL, eLog, storage, downloader); err != nil {
errors = multierror.Append(errors, err)
}
}
@@ -42,7 +43,7 @@ func mergeDB(ctx context.Context, originURL string, diff dbDiff, eLog eventlog.E
return errors
}
func add(ctx context.Context, event eventlog.Event, originURL string, eLog eventlog.Eventlog, storage storage.Backend) error {
func add(ctx context.Context, event eventlog.Event, originURL string, eLog eventlog.Eventlog, storage storage.Backend, downloader module.Downloader) error {
if _, err := eLog.ReadSingle(event.Module, event.Version); err != nil {
// the module/version already exists, is deprecated, or is
// tombstoned, so nothing to do
@@ -50,7 +51,7 @@ func add(ctx context.Context, event eventlog.Event, originURL string, eLog event
}
// download code from the origin
data, err := cdn.Download(originURL, event.Module, event.Version)
data, err := downloader(ctx, env.Timeout(), originURL, event.Module, event.Version)
if err != nil {
log.Printf("error downloading new module %s/%s from %s (%s)", event.Module, event.Version, originURL, err)
return err
+2 -1
View File
@@ -8,6 +8,7 @@ import (
"github.com/gobuffalo/buffalo"
"github.com/gobuffalo/buffalo/worker"
"github.com/gomods/athens/pkg/eventlog"
"github.com/gomods/athens/pkg/module"
"github.com/gomods/athens/pkg/payloads"
"github.com/gomods/athens/pkg/storage"
)
@@ -46,7 +47,7 @@ func GetProcessPushNotificationJob(storage storage.Backend, eLog eventlog.Eventl
if err != nil {
return err
}
return mergeDB(ctx, pn.OriginURL, *diff, eLog, storage)
return mergeDB(ctx, pn.OriginURL, *diff, eLog, storage, module.Download)
}
}
-13
View File
@@ -1,13 +0,0 @@
package cdn
import (
"github.com/gomods/athens/pkg/storage"
)
// Download downloads the module/version from url. Returns a storage.Version
// representing the downloaded module/version or a non-nil error if something
// went wrong
func Download(url, module, version string) (*storage.Version, error) {
// TODO
return nil, nil
}
+137
View File
@@ -0,0 +1,137 @@
package module
import (
"context"
"io"
"io/ioutil"
"net/http"
"net/url"
"path"
"sync"
"time"
"github.com/gomods/athens/pkg/config"
"github.com/gomods/athens/pkg/storage"
multierror "github.com/hashicorp/go-multierror"
)
// Downloader downloads a module version from a URL exposing Download Protocol endpoints
type Downloader func(ctx context.Context, timeout time.Duration, baseURL, module, version string) (*storage.Version, error)
// Download downloads the module/version from url. Returns a storage.Version
// representing the downloaded module/version or a non-nil error if something went wrong
func Download(ctx context.Context, timeout time.Duration, baseURL, module, version string) (*storage.Version, error) {
tctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
var info []byte
var infoErr error
var mod []byte
var modErr error
var zip io.ReadCloser
var zipErr error
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
infoReq, err := getRequest(tctx, baseURL, module, version, ".info")
if err != nil {
info, infoErr = nil, err
return
}
infoStream, err := getResBody(infoReq, timeout)
if err != nil {
info, infoErr = nil, err
return
}
info, infoErr = getBytes(infoStream)
}()
go func() {
defer wg.Done()
modReq, err := getRequest(tctx, baseURL, module, version, ".mod")
if err != nil {
mod, modErr = nil, err
return
}
modStream, err := getResBody(modReq, timeout)
if err != nil {
mod, modErr = nil, err
return
}
mod, modErr = getBytes(modStream)
}()
go func() {
defer wg.Done()
zipReq, err := getRequest(tctx, baseURL, module, version, ".zip")
if err != nil {
zip, zipErr = nil, err
return
}
zip, zipErr = getResBody(zipReq, timeout)
}()
wg.Wait()
var errors error
if infoErr != nil {
errors = multierror.Append(errors, infoErr)
}
if modErr != nil {
errors = multierror.Append(errors, modErr)
}
if zipErr != nil {
errors = multierror.Append(errors, zipErr)
}
if errors != nil {
return nil, errors
}
ver := storage.Version{
Info: info,
Mod: mod,
Zip: zip,
}
return &ver, nil
}
func getBytes(rb io.ReadCloser) ([]byte, error) {
defer rb.Close()
return ioutil.ReadAll(rb)
}
func getResBody(req *http.Request, timeout time.Duration) (io.ReadCloser, error) {
client := http.Client{Timeout: timeout}
res, err := client.Do(req)
if err != nil {
return nil, err
}
return res.Body, nil
}
func getRequest(ctx context.Context, baseURL, module, version, ext string) (*http.Request, error) {
u, err := join(baseURL, module, version, ext)
if err != nil {
return nil, err
}
req, err := http.NewRequest(http.MethodGet, u, nil)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
return req, nil
}
func join(baseURL string, module, version, ext string) (string, error) {
u, err := url.Parse(baseURL)
if err != nil {
return "", err
}
packageVersionedName := config.PackageVersionedName(module, version, ext)
u.Path = path.Join(u.Path, packageVersionedName)
return u.String(), nil
}