Add tracing functions to s3, Azure, minio, rdbms, mongo (#332)

* Add opentracing to context for azurecdn

* Add opentracing context for s3

* Add opentracing for minio

* Add opentracing functions for mongodb

* Add tracing functions for rdbms

* Fix span name in aws.s3.upload

* add opentracing calls for storage.olympus

* Add opentracing functions to storage.gcp
This commit is contained in:
Manu Gupta
2018-07-26 20:15:23 -04:00
committed by Rob j Loranger
parent 1dfbf799c3
commit 85c3df5a54
25 changed files with 94 additions and 5 deletions
@@ -5,6 +5,8 @@ import (
"io"
"net/url"
"github.com/opentracing/opentracing-go"
"github.com/Azure/azure-storage-blob-go/2017-07-29/azblob"
)
@@ -26,6 +28,8 @@ func newBlobStoreClient(accountURL *url.URL, accountName, accountKey, containerN
}
func (c *azureBlobStoreClient) UploadWithContext(ctx context.Context, path, contentType string, content io.Reader) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.azurecdn.UploadWithContext")
defer sp.Finish()
blobURL := c.containerURL.NewBlockBlobURL(path)
emptyMeta := map[string]string{}
emptyBlobAccessCond := azblob.BlobAccessConditions{}
+4
View File
@@ -7,6 +7,8 @@ import (
"io"
"net/url"
"github.com/opentracing/opentracing-go"
"github.com/gomods/athens/pkg/config/env"
moduploader "github.com/gomods/athens/pkg/storage/module"
)
@@ -56,6 +58,8 @@ func (s Storage) BaseURL() *url.URL {
// Save implements the (github.com/gomods/athens/pkg/storage).Saver interface.
func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.azurecdn.Save")
sp.Finish()
err := moduploader.Upload(ctx, module, version, bytes.NewReader(info), bytes.NewReader(mod), zip, s.cl.UploadWithContext)
// TODO: take out lease on the /list file and add the version to it
//
+3
View File
@@ -4,10 +4,13 @@ import (
"context"
"github.com/gomods/athens/pkg/config"
opentracing "github.com/opentracing/opentracing-go"
)
// Exists implements the (./pkg/storage).Checker interface
// returning true if the module at version exists in storage
func (s *Storage) Exists(ctx context.Context, module, version string) bool {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.gcp.Exists")
defer sp.Finish()
return s.bucket.Exists(ctx, config.PackageVersionedName(module, version, ".mod"))
}
+3
View File
@@ -6,12 +6,15 @@ import (
"github.com/gomods/athens/pkg/config"
"github.com/gomods/athens/pkg/storage"
modupl "github.com/gomods/athens/pkg/storage/module"
opentracing "github.com/opentracing/opentracing-go"
)
// Delete implements the (./pkg/storage).Deleter interface and
// removes a version of a module from storage. Returning ErrNotFound
// if the version does not exist.
func (s *Storage) Delete(ctx context.Context, module, version string) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.gcp.Delete")
defer sp.Finish()
if exists := s.bucket.Exists(ctx, config.PackageVersionedName(module, version, "mod")); !exists {
return storage.ErrVersionNotFound{Module: module, Version: version}
}
+3
View File
@@ -7,6 +7,7 @@ import (
"github.com/gomods/athens/pkg/config"
"github.com/gomods/athens/pkg/storage"
opentracing "github.com/opentracing/opentracing-go"
)
// Get retrieves a module at a specific version from storage as a (./pkg/storage).Version
@@ -14,6 +15,8 @@ import (
// The caller is responsible for calling close on the Zip ReadCloser
func (s *Storage) Get(module, version string) (*storage.Version, error) {
ctx := context.TODO()
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.gcp.Get")
defer sp.Finish()
if exists := s.Exists(ctx, module, version); !exists {
return nil, storage.ErrVersionNotFound{Module: module, Version: version}
}
+3
View File
@@ -5,11 +5,14 @@ import (
"strings"
"github.com/gomods/athens/pkg/storage"
opentracing "github.com/opentracing/opentracing-go"
)
// List implements the (./pkg/storage).Lister interface
// It returns a list of versions, if any, for a given module
func (s *Storage) List(ctx context.Context, module string) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.gcp.List")
defer sp.Finish()
paths, err := s.bucket.List(ctx, module)
if err != nil {
return nil, err
+5
View File
@@ -8,6 +8,7 @@ import (
stg "github.com/gomods/athens/pkg/storage"
moduploader "github.com/gomods/athens/pkg/storage/module"
opentracing "github.com/opentracing/opentracing-go"
)
// Save uploads the module's .mod, .zip and .info files for a given version
@@ -18,6 +19,8 @@ import (
// Uploaded files are publicly accessable in the storage bucket as per
// an ACL rule.
func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.gcp.Save")
defer sp.Finish()
if exists := s.Exists(ctx, module, version); exists {
return stg.ErrVersionAlreadyExists{Module: module, Version: version}
}
@@ -30,6 +33,8 @@ func (s *Storage) Save(ctx context.Context, module, version string, mod []byte,
}
func (s *Storage) upload(ctx context.Context, path, contentType string, stream io.Reader) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.gcp.upload")
defer sp.Finish()
wc := s.bucket.Write(ctx, path)
defer func(wc io.WriteCloser) {
if err := wc.Close(); err != nil {
+3
View File
@@ -5,9 +5,12 @@ import (
"fmt"
minio "github.com/minio/minio-go"
opentracing "github.com/opentracing/opentracing-go"
)
func (v *storageImpl) Exists(ctx context.Context, module, version string) bool {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.minio.Exists")
defer sp.Finish()
versionedPath := v.versionLocation(module, version)
modPath := fmt.Sprintf("%s/go.mod", versionedPath)
_, err := v.minioClient.StatObject(v.bucketName, modPath, minio.StatObjectOptions{})
+3
View File
@@ -5,9 +5,12 @@ import (
"fmt"
"github.com/gomods/athens/pkg/storage"
opentracing "github.com/opentracing/opentracing-go"
)
func (v *storageImpl) Delete(ctx context.Context, module, version string) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.minio.Delete")
defer sp.Finish()
if !v.Exists(ctx, module, version) {
return storage.ErrVersionNotFound{
Module: module,
+4
View File
@@ -1,15 +1,19 @@
package minio
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"github.com/gomods/athens/pkg/storage"
minio "github.com/minio/minio-go"
opentracing "github.com/opentracing/opentracing-go"
)
func (v *storageImpl) Get(module, version string) (*storage.Version, error) {
sp, _ := opentracing.StartSpanFromContext(context.TODO(), "storage.minio.Get")
defer sp.Finish()
versionedPath := v.versionLocation(module, version)
modPath := fmt.Sprintf("%s/go.mod", versionedPath)
modReader, err := v.minioClient.GetObject(v.bucketName, modPath, minio.GetObjectOptions{})
+4
View File
@@ -4,9 +4,13 @@ import (
"context"
"sort"
"strings"
opentracing "github.com/opentracing/opentracing-go"
)
func (l *storageImpl) List(ctx context.Context, module string) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.minio.List")
defer sp.Finish()
dict := make(map[string]struct{})
doneCh := make(chan struct{})
+4 -1
View File
@@ -6,9 +6,12 @@ import (
"io"
minio "github.com/minio/minio-go"
opentracing "github.com/opentracing/opentracing-go"
)
func (s *storageImpl) Save(_ context.Context, module, vsn string, mod []byte, zip io.Reader, info []byte) error {
func (s *storageImpl) Save(ctx context.Context, module, vsn string, mod []byte, zip io.Reader, info []byte) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.minio.Save")
defer sp.Finish()
dir := s.versionLocation(module, vsn)
modFileName := dir + "/" + "go.mod"
zipFileName := dir + "/" + "source.zip"
+3
View File
@@ -4,10 +4,13 @@ import (
"context"
"github.com/globalsign/mgo/bson"
opentracing "github.com/opentracing/opentracing-go"
)
// Exists checks for a specific version of a module
func (s *ModuleStore) Exists(ctx context.Context, module, vsn string) bool {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.mongo.Exists")
defer sp.Finish()
c := s.s.DB(s.d).C(s.c)
count, err := c.Find(bson.M{"module": module, "version": vsn}).Count()
return err == nil && count > 0
+3
View File
@@ -5,10 +5,13 @@ import (
"github.com/globalsign/mgo/bson"
"github.com/gomods/athens/pkg/storage"
opentracing "github.com/opentracing/opentracing-go"
)
// Delete removes a specific version of a module
func (s *ModuleStore) Delete(ctx context.Context, module, version string) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.mongo.Delete")
defer sp.Finish()
if !s.Exists(ctx, module, version) {
return storage.ErrVersionNotFound{
Module: module,
+4
View File
@@ -2,15 +2,19 @@ package mongo
import (
"bytes"
"context"
"io/ioutil"
"strings"
"github.com/globalsign/mgo/bson"
"github.com/gomods/athens/pkg/storage"
opentracing "github.com/opentracing/opentracing-go"
)
// Get a specific version of a module
func (s *ModuleStore) Get(module, vsn string) (*storage.Version, error) {
sp, _ := opentracing.StartSpanFromContext(context.TODO(), "storage.mongo.Get")
defer sp.Finish()
c := s.s.DB(s.d).C(s.c)
result := &storage.Module{}
err := c.Find(bson.M{"module": module, "version": vsn}).One(result)
+3
View File
@@ -6,10 +6,13 @@ import (
"github.com/globalsign/mgo/bson"
"github.com/gomods/athens/pkg/storage"
opentracing "github.com/opentracing/opentracing-go"
)
// List lists all versions of a module
func (s *ModuleStore) List(ctx context.Context, module string) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.mongo.List")
defer sp.Finish()
c := s.s.DB(s.d).C(s.c)
result := make([]storage.Module, 0)
err := c.Find(bson.M{"module": module}).All(&result)
+4 -1
View File
@@ -6,10 +6,13 @@ import (
"io/ioutil"
"github.com/gomods/athens/pkg/storage"
opentracing "github.com/opentracing/opentracing-go"
)
// Save stores a module in mongo storage.
func (s *ModuleStore) Save(_ context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error {
func (s *ModuleStore) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.mongo.Save")
defer sp.Finish()
zipBytes, err := ioutil.ReadAll(zip)
if err != nil {
return err
+6 -1
View File
@@ -2,18 +2,23 @@ package olympus
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"github.com/opentracing/opentracing-go"
"github.com/gomods/athens/pkg/config"
"github.com/gomods/athens/pkg/storage"
)
// Get a specific version of a module
func (s *ModuleStore) Get(module, vsn string) (*storage.Version, error) {
// TODO: fetch from endpoint
sp, _ := opentracing.StartSpanFromContext(context.TODO(), "storage.olympus.Get")
defer sp.Finish()
// TODO: fetch from endpoint
modURI := fmt.Sprintf("%s/%s", s.url, config.PackageVersionedName(module, vsn, "mod"))
zipURI := fmt.Sprintf("%s/%s", s.url, config.PackageVersionedName(module, vsn, "zip"))
infoURI := fmt.Sprintf("%s/%s", s.url, config.PackageVersionedName(module, vsn, "info"))
+5 -1
View File
@@ -3,11 +3,15 @@ package olympus
import (
"context"
"io"
"github.com/opentracing/opentracing-go"
)
// Save stores a module in olympus.
// This actually does not store anything just reports cache miss
func (s *ModuleStore) Save(_ context.Context, module, version string, _ []byte, _ io.ReadSeeker, _ []byte) error {
func (s *ModuleStore) Save(ctx context.Context, module, version string, _ []byte, _ io.ReadSeeker, _ []byte) error {
// dummy implementation so Olympus Store can be used everywhere as Backend iface
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.olympus.Save")
defer sp.Finish()
return nil
}
+3
View File
@@ -4,10 +4,13 @@ import (
"context"
"github.com/gomods/athens/pkg/storage/rdbms/models"
opentracing "github.com/opentracing/opentracing-go"
)
// Exists checks for a specific version of a module
func (r *ModuleStore) Exists(ctx context.Context, module, vsn string) bool {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.rdbms.Exists")
defer sp.Finish()
result := models.Module{}
query := r.conn.Where("module = ?", module).Where("version = ?", vsn)
count, err := query.Count(&result)
+3
View File
@@ -5,10 +5,13 @@ import (
"github.com/gomods/athens/pkg/storage"
"github.com/gomods/athens/pkg/storage/rdbms/models"
opentracing "github.com/opentracing/opentracing-go"
)
// Delete removes a specific version of a module.
func (r *ModuleStore) Delete(ctx context.Context, module, version string) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.rdbms.Delete")
defer sp.Finish()
if !r.Exists(ctx, module, version) {
return storage.ErrVersionNotFound{
Module: module,
+4
View File
@@ -2,16 +2,20 @@ package rdbms
import (
"bytes"
"context"
"database/sql"
"io/ioutil"
"github.com/gomods/athens/pkg/storage"
"github.com/gomods/athens/pkg/storage/rdbms/models"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
)
// Get a specific version of a module
func (r *ModuleStore) Get(module, vsn string) (*storage.Version, error) {
sp, _ := opentracing.StartSpanFromContext(context.TODO(), "storage.rdbms.Get")
defer sp.Finish()
result := models.Module{}
query := r.conn.Where("module = ?", module).Where("version = ?", vsn)
if err := query.First(&result); err != nil {
+3
View File
@@ -4,10 +4,13 @@ import (
"context"
"github.com/gomods/athens/pkg/storage/rdbms/models"
opentracing "github.com/opentracing/opentracing-go"
)
// List lists all versions of a module
func (r *ModuleStore) List(ctx context.Context, module string) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.rdbms.List")
defer sp.Finish()
result := make([]models.Module, 0)
err := r.conn.Where("module = ?", module).All(&result)
if err != nil {
+4 -1
View File
@@ -6,10 +6,13 @@ import (
"io/ioutil"
"github.com/gomods/athens/pkg/storage/rdbms/models"
opentracing "github.com/opentracing/opentracing-go"
)
// Save stores a module in rdbms storage.
func (r *ModuleStore) Save(_ context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error {
func (r *ModuleStore) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.rdbms.Save")
defer sp.Finish()
zipBytes, err := ioutil.ReadAll(zip)
if err != nil {
return err
+6
View File
@@ -7,6 +7,8 @@ import (
"io"
"net/url"
"github.com/opentracing/opentracing-go"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/aws/aws-sdk-go/service/s3/s3manager/s3manageriface"
@@ -75,6 +77,8 @@ func (s Storage) BaseURL() *url.URL {
// Save implements the (github.com/gomods/athens/pkg/storage).Saver interface.
func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.s3.Save")
defer sp.Finish()
err := moduploader.Upload(ctx, module, version, bytes.NewReader(info), bytes.NewReader(mod), zip, s.upload)
// TODO: take out lease on the /list file and add the version to it
//
@@ -83,6 +87,8 @@ func (s *Storage) Save(ctx context.Context, module, version string, mod []byte,
}
func (s *Storage) upload(ctx context.Context, path, contentType string, stream io.Reader) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.s3.upload")
defer sp.Finish()
upParams := &s3manager.UploadInput{
Bucket: &s.bucket,
Key: &path,