mirror of
https://github.com/gomods/athens
synced 2026-02-03 07:30:32 +00:00
Add catalog to minio (#1040)
* Implement cataloger interface for Minio * Catalog fix * Implemented Catalog method in the Minio storage package * code fmt fix * fmt fix * remove unused channel
This commit is contained in:
committed by
Aaron Schlesinger
parent
c2647da423
commit
4dfa99320f
@@ -0,0 +1,84 @@
|
||||
package minio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/gomods/athens/pkg/errors"
|
||||
"github.com/gomods/athens/pkg/observ"
|
||||
"github.com/gomods/athens/pkg/paths"
|
||||
"github.com/minio/minio-go"
|
||||
)
|
||||
|
||||
// Catalog implements the (./pkg/storage).Cataloger interface
|
||||
// It returns a list of modules and versions contained in the storage
|
||||
func (s *storageImpl) Catalog(ctx context.Context, token string, pageSize int) ([]paths.AllPathParams, string, error) {
|
||||
const op errors.Op = "minio.Catalog"
|
||||
ctx, span := observ.StartSpan(ctx, op.String())
|
||||
defer span.End()
|
||||
queryToken := token
|
||||
res := make([]paths.AllPathParams, 0)
|
||||
startAfter := token
|
||||
token = ""
|
||||
|
||||
count := pageSize
|
||||
for count > 0 {
|
||||
loo, err := s.minioCore.ListObjectsV2(s.bucketName, token, "", false, "", 0, startAfter)
|
||||
if err != nil {
|
||||
return nil, "", errors.E(op, err)
|
||||
}
|
||||
|
||||
m, lastKey := fetchModsAndVersions(loo.Contents, count)
|
||||
|
||||
res = append(res, m...)
|
||||
count -= len(m)
|
||||
queryToken = lastKey
|
||||
|
||||
if !loo.IsTruncated { // not truncated, there is no point in asking more
|
||||
if count > 0 { // it means we reached the end, no subsequent requests are necessary
|
||||
queryToken = ""
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return res, queryToken, nil
|
||||
}
|
||||
|
||||
func fetchModsAndVersions(objects []minio.ObjectInfo, elementsNum int) ([]paths.AllPathParams, string) {
|
||||
res := make([]paths.AllPathParams, 0)
|
||||
lastKey := ""
|
||||
|
||||
for _, o := range objects {
|
||||
if !strings.HasSuffix(o.Key, ".info") {
|
||||
continue
|
||||
}
|
||||
|
||||
p, err := parseMinioKey(&o)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
res = append(res, p)
|
||||
lastKey = o.Key
|
||||
|
||||
elementsNum--
|
||||
if elementsNum == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return res, lastKey
|
||||
}
|
||||
|
||||
func parseMinioKey(o *minio.ObjectInfo) (paths.AllPathParams, error) {
|
||||
const op errors.Op = "minio.parseMinioKey"
|
||||
parts := strings.Split(o.Key, "/")
|
||||
v := parts[len(parts)-2]
|
||||
m := strings.Replace(o.Key, v, "", -2)
|
||||
m = strings.Replace(m, "//.info", "", -1)
|
||||
if m == "" || v == "" {
|
||||
return paths.AllPathParams{}, errors.E(op, fmt.Errorf("invalid object key format %s", o.Key))
|
||||
}
|
||||
return paths.AllPathParams{m, v}, nil
|
||||
}
|
||||
@@ -18,8 +18,9 @@ func (l *storageImpl) List(ctx context.Context, module string) ([]string, error)
|
||||
doneCh := make(chan struct{})
|
||||
defer close(doneCh)
|
||||
searchPrefix := module + "/"
|
||||
objectCh := l.minioClient.ListObjectsV2(l.bucketName, searchPrefix, false, doneCh)
|
||||
for object := range objectCh {
|
||||
objectCh, _ := l.minioCore.ListObjectsV2(l.bucketName, searchPrefix, "", false, "", 0, "")
|
||||
|
||||
for _, object := range objectCh.Contents {
|
||||
if object.Err != nil {
|
||||
return nil, errors.E(op, object.Err, errors.M(module))
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
type storageImpl struct {
|
||||
minioClient *minio.Client
|
||||
minioCore *minio.Core
|
||||
bucketName string
|
||||
}
|
||||
|
||||
@@ -29,6 +30,7 @@ func NewStorage(conf *config.MinioConfig, timeout time.Duration) (storage.Backen
|
||||
bucketName := conf.Bucket
|
||||
region := conf.Region
|
||||
useSSL := conf.EnableSSL
|
||||
minioCore, err := minio.NewCore(endpoint, accessKeyID, secretAccessKey, useSSL)
|
||||
minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, useSSL)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
@@ -42,5 +44,5 @@ func NewStorage(conf *config.MinioConfig, timeout time.Duration) (storage.Backen
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
}
|
||||
return &storageImpl{minioClient, bucketName}, nil
|
||||
return &storageImpl{minioClient, minioCore, bucketName}, nil
|
||||
}
|
||||
|
||||
@@ -19,10 +19,8 @@ func BenchmarkBackend(b *testing.B) {
|
||||
}
|
||||
|
||||
func (s *storageImpl) clear() error {
|
||||
doneCh := make(chan struct{})
|
||||
defer close(doneCh)
|
||||
objectCh := s.minioClient.ListObjectsV2(s.bucketName, "", true, doneCh)
|
||||
for object := range objectCh {
|
||||
objectCh, _ := s.minioCore.ListObjectsV2(s.bucketName, "", "", false, "", 0, "")
|
||||
for _, object := range objectCh.Contents {
|
||||
if object.Err != nil {
|
||||
return object.Err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user