From 5dcaeb8ab8ae002b6087c385a4130e789ccdf168 Mon Sep 17 00:00:00 2001 From: Marwan Sulaiman Date: Thu, 20 Sep 2018 10:59:42 -0400 Subject: [PATCH] Trace stash operations (#682) * Trace stash operations * fix build --- pkg/download/protocol.go | 6 +++--- pkg/module/go_get_fetcher.go | 4 ++++ pkg/stash/stasher.go | 14 +++++++++++--- pkg/stash/with_pool.go | 9 +++++++-- pkg/stash/with_pool_test.go | 5 +++-- pkg/stash/with_singleflight.go | 15 +++++++++++---- pkg/stash/with_singleflight_test.go | 7 ++++--- 7 files changed, 43 insertions(+), 17 deletions(-) diff --git a/pkg/download/protocol.go b/pkg/download/protocol.go index c7959c1e..b9ceb9de 100644 --- a/pkg/download/protocol.go +++ b/pkg/download/protocol.go @@ -105,7 +105,7 @@ func (p *protocol) Info(ctx context.Context, mod, ver string) ([]byte, error) { defer span.End() info, err := p.s.Info(ctx, mod, ver) if errors.IsNotFoundErr(err) { - err = p.stasher.Stash(mod, ver) + err = p.stasher.Stash(ctx, mod, ver) if err != nil { return nil, errors.E(op, err) } @@ -124,7 +124,7 @@ func (p *protocol) GoMod(ctx context.Context, mod, ver string) ([]byte, error) { defer span.End() goMod, err := p.s.GoMod(ctx, mod, ver) if errors.IsNotFoundErr(err) { - err = p.stasher.Stash(mod, ver) + err = p.stasher.Stash(ctx, mod, ver) if err != nil { return nil, errors.E(op, err) } @@ -143,7 +143,7 @@ func (p *protocol) Zip(ctx context.Context, mod, ver string) (io.ReadCloser, err defer span.End() zip, err := p.s.Zip(ctx, mod, ver) if errors.IsNotFoundErr(err) { - err = p.stasher.Stash(mod, ver) + err = p.stasher.Stash(ctx, mod, ver) if err != nil { return nil, errors.E(op, err) } diff --git a/pkg/module/go_get_fetcher.go b/pkg/module/go_get_fetcher.go index a6b7688d..7004fc07 100644 --- a/pkg/module/go_get_fetcher.go +++ b/pkg/module/go_get_fetcher.go @@ -11,6 +11,7 @@ import ( "strings" "github.com/gomods/athens/pkg/errors" + "github.com/gomods/athens/pkg/observ" "github.com/gomods/athens/pkg/paths" "github.com/gomods/athens/pkg/storage" "github.com/spf13/afero" @@ -37,6 +38,9 @@ func NewGoGetFetcher(goBinaryName string, fs afero.Fs) (Fetcher, error) { // .info, .mod, and .zip files. func (g *goGetFetcher) Fetch(ctx context.Context, mod, ver string) (*storage.Version, error) { const op errors.Op = "goGetFetcher.Fetch" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + // setup the GOPATH goPathRoot, err := afero.TempDir(g.fs, "", "athens") if err != nil { diff --git a/pkg/stash/stasher.go b/pkg/stash/stasher.go index ab655add..8c7e1c52 100644 --- a/pkg/stash/stasher.go +++ b/pkg/stash/stasher.go @@ -6,13 +6,15 @@ import ( "github.com/gomods/athens/pkg/errors" "github.com/gomods/athens/pkg/module" + "github.com/gomods/athens/pkg/observ" "github.com/gomods/athens/pkg/storage" + "go.opencensus.io/trace" ) // Stasher has the job of taking a module // from an upstream entity and stashing it to a Storage Backend. type Stasher interface { - Stash(string, string) error + Stash(ctx context.Context, mod string, ver string) error } // Wrapper helps extend the main stasher's functionality with addons. @@ -35,10 +37,16 @@ type stasher struct { s storage.Backend } -func (s *stasher) Stash(mod, ver string) error { +func (s *stasher) Stash(ctx context.Context, mod, ver string) error { const op errors.Op = "stasher.Stash" - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) + _, span := observ.StartSpan(ctx, op.String()) + defer span.End() + + // create a new context that ditches whatever deadline the caller passed + // but keep the tracing info so that we can properly trace the whole thing. + ctx, cancel := context.WithTimeout(trace.NewContext(context.Background(), span), time.Minute*10) defer cancel() + v, err := s.fetchModule(ctx, mod, ver) if err != nil { return errors.E(op, err) diff --git a/pkg/stash/with_pool.go b/pkg/stash/with_pool.go index ba4e2ba0..f5a65afe 100644 --- a/pkg/stash/with_pool.go +++ b/pkg/stash/with_pool.go @@ -1,7 +1,10 @@ package stash import ( + "context" + "github.com/gomods/athens/pkg/errors" + "github.com/gomods/athens/pkg/observ" ) type withpool struct { @@ -37,12 +40,14 @@ func (s *withpool) listen() { } } -func (s *withpool) Stash(mod, ver string) error { +func (s *withpool) Stash(ctx context.Context, mod, ver string) error { const op errors.Op = "stash.Pool" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() var err error done := make(chan struct{}, 1) s.jobCh <- func() { - err = s.s.Stash(mod, ver) + err = s.s.Stash(ctx, mod, ver) close(done) } <-done diff --git a/pkg/stash/with_pool_test.go b/pkg/stash/with_pool_test.go index 676aab80..09898038 100644 --- a/pkg/stash/with_pool_test.go +++ b/pkg/stash/with_pool_test.go @@ -1,6 +1,7 @@ package stash import ( + "context" "fmt" "testing" ) @@ -8,7 +9,7 @@ import ( func TestPoolWrapper(t *testing.T) { m := &mockStasher{inputMod: "mod", inputVer: "ver", err: fmt.Errorf("wrapped err")} s := WithPool(2)(m) - err := s.Stash(m.inputMod, m.inputVer) + err := s.Stash(context.Background(), m.inputMod, m.inputVer) if err.Error() != m.err.Error() { t.Fatalf("expected err to be `%v` but got `%v`", m.err, err) } @@ -20,7 +21,7 @@ type mockStasher struct { err error } -func (m *mockStasher) Stash(mod, ver string) error { +func (m *mockStasher) Stash(ctx context.Context, mod, ver string) error { if m.inputMod != mod { return fmt.Errorf("expected input mod %v but got %v", m.inputMod, mod) } diff --git a/pkg/stash/with_singleflight.go b/pkg/stash/with_singleflight.go index 347a5420..08f0ab34 100644 --- a/pkg/stash/with_singleflight.go +++ b/pkg/stash/with_singleflight.go @@ -1,9 +1,12 @@ package stash import ( + "context" "sync" "github.com/gomods/athens/pkg/config" + "github.com/gomods/athens/pkg/errors" + "github.com/gomods/athens/pkg/observ" ) // WithSingleflight returns a singleflight stasher. @@ -26,9 +29,9 @@ type withsf struct { subs map[string][]chan error } -func (s *withsf) process(mod, ver string) { +func (s *withsf) process(ctx context.Context, mod, ver string) { mv := config.FmtModVer(mod, ver) - err := s.s.Stash(mod, ver) + err := s.s.Stash(ctx, mod, ver) s.mu.Lock() defer s.mu.Unlock() for _, ch := range s.subs[mv] { @@ -37,14 +40,18 @@ func (s *withsf) process(mod, ver string) { delete(s.subs, mv) } -func (s *withsf) Stash(mod, ver string) error { +func (s *withsf) Stash(ctx context.Context, mod, ver string) error { + const op errors.Op = "singleflight.Stash" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + mv := config.FmtModVer(mod, ver) s.mu.Lock() subCh := make(chan error, 1) _, inFlight := s.subs[mv] if !inFlight { s.subs[mv] = []chan error{subCh} - go s.process(mod, ver) + go s.process(ctx, mod, ver) } else { s.subs[mv] = append(s.subs[mv], subCh) } diff --git a/pkg/stash/with_singleflight_test.go b/pkg/stash/with_singleflight_test.go index a7d54690..35eec572 100644 --- a/pkg/stash/with_singleflight_test.go +++ b/pkg/stash/with_singleflight_test.go @@ -1,6 +1,7 @@ package stash import ( + "context" "fmt" "sync" "testing" @@ -19,7 +20,7 @@ func TestSingleFlight(t *testing.T) { var eg errgroup.Group for i := 0; i < 5; i++ { eg.Go(func() error { - return s.Stash("mod", "ver") + return s.Stash(context.Background(), "mod", "ver") }) } @@ -30,7 +31,7 @@ func TestSingleFlight(t *testing.T) { for i := 0; i < 5; i++ { eg.Go(func() error { - return s.Stash("mod", "ver") + return s.Stash(context.Background(), "mod", "ver") }) } err = eg.Wait() @@ -49,7 +50,7 @@ type mockSFStasher struct { num int } -func (ms *mockSFStasher) Stash(mod, ver string) error { +func (ms *mockSFStasher) Stash(ctx context.Context, mod, ver string) error { time.Sleep(time.Millisecond * 100) // allow for second requests to come in. ms.mu.Lock() defer ms.mu.Unlock()