mirror of
https://github.com/gomods/athens
synced 2026-02-03 11:00:32 +00:00
fix: move single-flight to stash (#2050)
This commit is contained in:
@@ -14,7 +14,6 @@ import (
|
||||
"github.com/gomods/athens/pkg/observ"
|
||||
"github.com/gomods/athens/pkg/storage"
|
||||
"github.com/spf13/afero"
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
|
||||
type goGetFetcher struct {
|
||||
@@ -22,7 +21,6 @@ type goGetFetcher struct {
|
||||
goBinaryName string
|
||||
envVars []string
|
||||
gogetDir string
|
||||
sfg *singleflight.Group
|
||||
}
|
||||
|
||||
type goModule struct {
|
||||
@@ -48,7 +46,6 @@ func NewGoGetFetcher(goBinaryName, gogetDir string, envVars []string, fs afero.F
|
||||
goBinaryName: goBinaryName,
|
||||
envVars: envVars,
|
||||
gogetDir: gogetDir,
|
||||
sfg: &singleflight.Group{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -59,63 +56,57 @@ func (g *goGetFetcher) Fetch(ctx context.Context, mod, ver string) (*storage.Ver
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
|
||||
resp, err, _ := g.sfg.Do(mod+"###"+ver, func() (any, error) {
|
||||
// setup the GOPATH
|
||||
goPathRoot, err := afero.TempDir(g.fs, g.gogetDir, "athens")
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
sourcePath := filepath.Join(goPathRoot, "src")
|
||||
modPath := filepath.Join(sourcePath, getRepoDirName(mod, ver))
|
||||
if err := g.fs.MkdirAll(modPath, os.ModeDir|os.ModePerm); err != nil {
|
||||
_ = clearFiles(g.fs, goPathRoot)
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
|
||||
m, err := downloadModule(
|
||||
ctx,
|
||||
g.goBinaryName,
|
||||
g.envVars,
|
||||
goPathRoot,
|
||||
modPath,
|
||||
mod,
|
||||
ver,
|
||||
)
|
||||
if err != nil {
|
||||
_ = clearFiles(g.fs, goPathRoot)
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
|
||||
var storageVer storage.Version
|
||||
storageVer.Semver = m.Version
|
||||
info, err := afero.ReadFile(g.fs, m.Info)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
storageVer.Info = info
|
||||
|
||||
gomod, err := afero.ReadFile(g.fs, m.GoMod)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
storageVer.Mod = gomod
|
||||
|
||||
zip, err := g.fs.Open(m.Zip)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
// note: don't close zip here so that the caller can read directly from disk.
|
||||
//
|
||||
// if we close, then the caller will panic, and the alternative to make this work is
|
||||
// that we read into memory and return an io.ReadCloser that reads out of memory
|
||||
storageVer.Zip = &zipReadCloser{zip, g.fs, goPathRoot}
|
||||
|
||||
return &storageVer, nil
|
||||
})
|
||||
// setup the GOPATH
|
||||
goPathRoot, err := afero.TempDir(g.fs, g.gogetDir, "athens")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
return resp.(*storage.Version), nil
|
||||
sourcePath := filepath.Join(goPathRoot, "src")
|
||||
modPath := filepath.Join(sourcePath, getRepoDirName(mod, ver))
|
||||
if err := g.fs.MkdirAll(modPath, os.ModeDir|os.ModePerm); err != nil {
|
||||
_ = clearFiles(g.fs, goPathRoot)
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
|
||||
m, err := downloadModule(
|
||||
ctx,
|
||||
g.goBinaryName,
|
||||
g.envVars,
|
||||
goPathRoot,
|
||||
modPath,
|
||||
mod,
|
||||
ver,
|
||||
)
|
||||
if err != nil {
|
||||
_ = clearFiles(g.fs, goPathRoot)
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
|
||||
var storageVer storage.Version
|
||||
storageVer.Semver = m.Version
|
||||
info, err := afero.ReadFile(g.fs, m.Info)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
storageVer.Info = info
|
||||
|
||||
gomod, err := afero.ReadFile(g.fs, m.GoMod)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
storageVer.Mod = gomod
|
||||
|
||||
zip, err := g.fs.Open(m.Zip)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
// note: don't close zip here so that the caller can read directly from disk.
|
||||
//
|
||||
// if we close, then the caller will panic, and the alternative to make this work is
|
||||
// that we read into memory and return an io.ReadCloser that reads out of memory
|
||||
storageVer.Zip = &zipReadCloser{zip, g.fs, goPathRoot}
|
||||
|
||||
return &storageVer, nil
|
||||
}
|
||||
|
||||
// given a filesystem, gopath, repository root, module and version, runs 'go mod download -json'
|
||||
|
||||
+34
-21
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/gomods/athens/pkg/observ"
|
||||
"github.com/gomods/athens/pkg/storage"
|
||||
"go.opencensus.io/trace"
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
|
||||
// Stasher has the job of taking a module
|
||||
@@ -29,7 +30,7 @@ type Wrapper func(Stasher) Stasher
|
||||
// a module from a download.Protocol and
|
||||
// stashes it into a backend.Storage.
|
||||
func New(f module.Fetcher, s storage.Backend, indexer index.Indexer, wrappers ...Wrapper) Stasher {
|
||||
var st Stasher = &stasher{f, s, storage.WithChecker(s), indexer}
|
||||
var st Stasher = &stasher{f, s, storage.WithChecker(s), indexer, &singleflight.Group{}}
|
||||
for _, w := range wrappers {
|
||||
st = w(st)
|
||||
}
|
||||
@@ -42,6 +43,7 @@ type stasher struct {
|
||||
storage storage.Backend
|
||||
checker storage.Checker
|
||||
indexer index.Indexer
|
||||
sfg *singleflight.Group
|
||||
}
|
||||
|
||||
func (s *stasher) Stash(ctx context.Context, mod, ver string) (string, error) {
|
||||
@@ -50,33 +52,44 @@ func (s *stasher) Stash(ctx context.Context, mod, ver string) (string, error) {
|
||||
defer span.End()
|
||||
log.EntryFromContext(ctx).Debugf("saving %s@%s to storage...", mod, ver)
|
||||
|
||||
// create a new context that ditches whatever deadline the caller passed
|
||||
// but keep the tracing info so that we can properly trace the whole thing.
|
||||
ctx, cancel := context.WithTimeout(trace.NewContext(context.Background(), span), time.Minute*10)
|
||||
defer cancel()
|
||||
v, err := s.fetchModule(ctx, mod, ver)
|
||||
if err != nil {
|
||||
return "", errors.E(op, err)
|
||||
}
|
||||
defer func() { _ = v.Zip.Close() }()
|
||||
if v.Semver != ver {
|
||||
exists, err := s.checker.Exists(ctx, mod, v.Semver)
|
||||
semver_, err, _ := s.sfg.Do(mod+"###"+ver, func() (any, error) {
|
||||
// create a new context that ditches whatever deadline the caller passed
|
||||
// but keep the tracing info so that we can properly trace the whole thing.
|
||||
ctx, cancel := context.WithTimeout(trace.NewContext(context.Background(), span), time.Minute*10)
|
||||
defer cancel()
|
||||
v, err := s.fetchModule(ctx, mod, ver)
|
||||
if err != nil {
|
||||
return "", errors.E(op, err)
|
||||
}
|
||||
if exists {
|
||||
return v.Semver, nil
|
||||
defer func() { _ = v.Zip.Close() }()
|
||||
if v.Semver != ver {
|
||||
exists, err := s.checker.Exists(ctx, mod, v.Semver)
|
||||
if err != nil {
|
||||
return "", errors.E(op, err)
|
||||
}
|
||||
if exists {
|
||||
return v.Semver, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
err = s.storage.Save(ctx, mod, v.Semver, v.Mod, v.Zip, v.Info)
|
||||
err = s.storage.Save(ctx, mod, v.Semver, v.Mod, v.Zip, v.Info)
|
||||
if err != nil {
|
||||
return "", errors.E(op, err)
|
||||
}
|
||||
err = s.indexer.Index(ctx, mod, v.Semver)
|
||||
if err != nil && !errors.Is(err, errors.KindAlreadyExists) {
|
||||
return "", errors.E(op, err)
|
||||
}
|
||||
return v.Semver, nil
|
||||
})
|
||||
if err != nil {
|
||||
return "", errors.E(op, err)
|
||||
return "", err
|
||||
}
|
||||
err = s.indexer.Index(ctx, mod, v.Semver)
|
||||
if err != nil && !errors.Is(err, errors.KindAlreadyExists) {
|
||||
return "", errors.E(op, err)
|
||||
|
||||
semver, ok := semver_.(string)
|
||||
if !ok {
|
||||
return "", errors.E(op, "unexpected type assertion failure for semver", errors.KindUnexpected)
|
||||
}
|
||||
return v.Semver, nil
|
||||
return semver, nil
|
||||
}
|
||||
|
||||
func (s *stasher) fetchModule(ctx context.Context, mod, ver string) (*storage.Version, error) {
|
||||
|
||||
Reference in New Issue
Block a user