mirror of
https://github.com/gomods/athens
synced 2026-02-10 11:08:10 +00:00
* switch proxy to config file pull in single flight changes * changes for single-flight * intermediate stage. All tests passing. pkg still has env refs * remove all env references * delete config/env entirely * fix failing tests * create the config.toml file as part of dev setup * create config file only if it doesn't exist * update Dockerfiles to use config file * move composing elements to the top * verbose parameter naming * newline * add flag for config file path * update docs with config file flag * remove unnecessary nil check * use filepath.join * rename redis port to address * fix path.join * fix issues after merge * add vendor dir
64 lines
1.6 KiB
Go
64 lines
1.6 KiB
Go
package module
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/gomods/athens/pkg/config"
|
|
"github.com/gomods/athens/pkg/errors"
|
|
multierror "github.com/hashicorp/go-multierror"
|
|
)
|
|
|
|
const numFiles = 3
|
|
|
|
// Uploader takes a stream and saves it to the blob store under a given path
|
|
type Uploader func(ctx context.Context, path, contentType string, stream io.Reader) error
|
|
|
|
// Upload saves .info, .mod and .zip files to the blob store in parallel.
|
|
// Returns multierror containing errors from all uploads and timeouts
|
|
func Upload(ctx context.Context, module, version string, info, mod, zip io.Reader, uploader Uploader, timeout time.Duration) error {
|
|
const op errors.Op = "module.Upload"
|
|
tctx, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
|
|
save := func(ext, contentType string, stream io.Reader) <-chan error {
|
|
ec := make(chan error)
|
|
|
|
go func() {
|
|
defer close(ec)
|
|
p := config.PackageVersionedName(module, version, ext)
|
|
ec <- uploader(tctx, p, contentType, stream)
|
|
}()
|
|
return ec
|
|
}
|
|
|
|
errChan := make(chan error, numFiles)
|
|
saveOrAbort := func(ext, contentType string, stream io.Reader) {
|
|
select {
|
|
case err := <-save(ext, contentType, stream):
|
|
errChan <- err
|
|
case <-tctx.Done():
|
|
errChan <- fmt.Errorf("uploading %s.%s.%s failed: %s", module, version, ext, tctx.Err())
|
|
}
|
|
}
|
|
go saveOrAbort("info", "application/json", info)
|
|
go saveOrAbort("mod", "text/plain", mod)
|
|
go saveOrAbort("zip", "application/octet-stream", zip)
|
|
|
|
var errs error
|
|
for i := 0; i < numFiles; i++ {
|
|
err := <-errChan
|
|
if err != nil {
|
|
errs = multierror.Append(errs, err)
|
|
}
|
|
}
|
|
close(errChan)
|
|
if errs != nil {
|
|
return errors.E(op, errs)
|
|
}
|
|
|
|
return nil
|
|
}
|