mirror of
https://github.com/gomods/athens
synced 2026-02-03 12:10:32 +00:00
Calling HeadObjectWithContext() concurrently to check the existence of a module (#1844)
* Using HeadObject to check the existence of a module
* fixing build
* calling cancel before closing channel
* fixing test
* using waitgroup
* calling cancel
* calling wg.Done
* only return the first error
* Revert "only return the first error"
This reverts commit c0aa18b522.
* clean err if not exist
This commit is contained in:
+35
-22
@@ -2,13 +2,14 @@ package s3
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
errs "errors"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||||
"github.com/aws/aws-sdk-go/service/s3"
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
"github.com/gomods/athens/pkg/config"
|
"github.com/gomods/athens/pkg/config"
|
||||||
"github.com/gomods/athens/pkg/errors"
|
"github.com/gomods/athens/pkg/errors"
|
||||||
"github.com/gomods/athens/pkg/log"
|
|
||||||
"github.com/gomods/athens/pkg/observ"
|
"github.com/gomods/athens/pkg/observ"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -19,27 +20,39 @@ func (s *Storage) Exists(ctx context.Context, module, version string) (bool, err
|
|||||||
ctx, span := observ.StartSpan(ctx, op.String())
|
ctx, span := observ.StartSpan(ctx, op.String())
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
lsParams := &s3.ListObjectsInput{
|
files := []string{"info", "mod", "zip"}
|
||||||
Bucket: aws.String(s.bucket),
|
errChan := make(chan error, len(files))
|
||||||
Prefix: aws.String(fmt.Sprintf("%s/@v", module)),
|
defer close(errChan)
|
||||||
|
cancelingCtx, cancel := context.WithCancel(ctx)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for _, file := range files {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(file string) {
|
||||||
|
defer wg.Done()
|
||||||
|
_, err := s.s3API.HeadObjectWithContext(
|
||||||
|
cancelingCtx,
|
||||||
|
&s3.HeadObjectInput{
|
||||||
|
Bucket: aws.String(s.bucket),
|
||||||
|
Key: aws.String(config.PackageVersionedName(module, version, file)),
|
||||||
|
})
|
||||||
|
errChan <- err
|
||||||
|
}(file)
|
||||||
}
|
}
|
||||||
found := make(map[string]struct{}, 3)
|
exists := true
|
||||||
err := s.s3API.ListObjectsPagesWithContext(ctx, lsParams, func(loo *s3.ListObjectsOutput, lastPage bool) bool {
|
var err error
|
||||||
for _, o := range loo.Contents {
|
for range files {
|
||||||
if _, exists := found[*o.Key]; exists {
|
err = <-errChan
|
||||||
log.EntryFromContext(ctx).Warnf("duplicate key in prefix %q: %q", *lsParams.Prefix, *o.Key)
|
if err == nil {
|
||||||
continue
|
continue
|
||||||
}
|
|
||||||
if *o.Key == config.PackageVersionedName(module, version, "info") ||
|
|
||||||
*o.Key == config.PackageVersionedName(module, version, "mod") ||
|
|
||||||
*o.Key == config.PackageVersionedName(module, version, "zip") {
|
|
||||||
found[*o.Key] = struct{}{}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return len(found) < 3
|
var aerr awserr.Error
|
||||||
})
|
if errs.As(err, &aerr) && aerr.Code() == "NotFound" {
|
||||||
if err != nil {
|
err = nil
|
||||||
return false, errors.E(op, err, errors.M(module), errors.V(version))
|
exists = false
|
||||||
|
}
|
||||||
|
break
|
||||||
}
|
}
|
||||||
return len(found) == 3, nil
|
cancel()
|
||||||
|
wg.Wait()
|
||||||
|
return exists, err
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user