Add Event Hooks To Athens

This commit is contained in:
Marwan Sulaiman
2020-08-11 12:43:06 -04:00
parent 23672f78c9
commit 7da3818a3a
9 changed files with 391 additions and 1 deletions
+26 -1
View File
@@ -1,16 +1,20 @@
package actions package actions
import ( import (
"context"
"fmt" "fmt"
"net/http" "net/http"
"net/url" "net/url"
"path" "path"
"strings" "strings"
"time"
"github.com/gomods/athens/pkg/config" "github.com/gomods/athens/pkg/config"
"github.com/gomods/athens/pkg/download" "github.com/gomods/athens/pkg/download"
"github.com/gomods/athens/pkg/download/addons" "github.com/gomods/athens/pkg/download/addons"
"github.com/gomods/athens/pkg/download/mode" "github.com/gomods/athens/pkg/download/mode"
"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/events"
"github.com/gomods/athens/pkg/index" "github.com/gomods/athens/pkg/index"
"github.com/gomods/athens/pkg/index/mem" "github.com/gomods/athens/pkg/index/mem"
"github.com/gomods/athens/pkg/index/mysql" "github.com/gomods/athens/pkg/index/mysql"
@@ -107,7 +111,11 @@ func addProxyRoutes(
if err != nil { if err != nil {
return err return err
} }
st := stash.New(mf, s, indexer, stash.WithPool(c.GoGetWorkers), withSingleFlight) withEventsHook, err := getEventHook(c)
if err != nil {
return err
}
st := stash.New(mf, s, indexer, stash.WithPool(c.GoGetWorkers), withSingleFlight, withEventsHook)
df, err := mode.NewFile(c.DownloadMode, c.DownloadURL) df, err := mode.NewFile(c.DownloadMode, c.DownloadURL)
if err != nil { if err != nil {
@@ -129,6 +137,23 @@ func addProxyRoutes(
return nil return nil
} }
func getEventHook(c *config.Config) (stash.Wrapper, error) {
const op errors.Op = "actions.getEventHook"
if c.EventsHook == "" {
return func(s stash.Stasher) stash.Stasher {
return s
}, nil
}
eh := events.NewClient(c.EventsHook, nil)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
err := eh.Ping(ctx)
if err != nil {
return nil, errors.E(op, err)
}
return stash.WithEventsHook(eh), nil
}
func getSingleFlight(c *config.Config, checker storage.Checker) (stash.Wrapper, error) { func getSingleFlight(c *config.Config, checker storage.Checker) (stash.Wrapper, error) {
switch c.SingleFlightType { switch c.SingleFlightType {
case "", "memory": case "", "memory":
+9
View File
@@ -199,6 +199,15 @@ ForceSSL = false
# Env override: ATHENS_PROXY_VALIDATOR # Env override: ATHENS_PROXY_VALIDATOR
ValidatorHook = "" ValidatorHook = ""
# EventsHook specifies a URL that will receive POST request events
# throughout an Athens request lifecycle.
#
# To see what type of events you will get and what the payload looks like
# check the pkg/event in this repository.
#
# Env override: ATHENS_EVENTS_HOOK
EventsHook = ""
# PathPrefix specifies whether the Proxy # PathPrefix specifies whether the Proxy
# should have a basepath. Certain proxies and services # should have a basepath. Certain proxies and services
# are distinguished based on subdomain, while others are based # are distinguished based on subdomain, while others are based
+1
View File
@@ -44,6 +44,7 @@ type Config struct {
PropagateAuthHost string `envconfig:"ATHENS_PROPAGATE_AUTH_HOST"` PropagateAuthHost string `envconfig:"ATHENS_PROPAGATE_AUTH_HOST"`
ForceSSL bool `envconfig:"PROXY_FORCE_SSL"` ForceSSL bool `envconfig:"PROXY_FORCE_SSL"`
ValidatorHook string `envconfig:"ATHENS_PROXY_VALIDATOR"` ValidatorHook string `envconfig:"ATHENS_PROXY_VALIDATOR"`
EventsHook string `envconfig:"ATHENS_EVENTS_HOOK"`
PathPrefix string `envconfig:"ATHENS_PATH_PREFIX"` PathPrefix string `envconfig:"ATHENS_PATH_PREFIX"`
NETRCPath string `envconfig:"ATHENS_NETRC_PATH"` NETRCPath string `envconfig:"ATHENS_NETRC_PATH"`
GithubToken string `envconfig:"ATHENS_GITHUB_TOKEN"` GithubToken string `envconfig:"ATHENS_GITHUB_TOKEN"`
+80
View File
@@ -0,0 +1,80 @@
package events
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"github.com/gomods/athens/pkg/build"
"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/requestid"
)
// NewClient returns a new http service
func NewClient(url string, c *http.Client) Hook {
if c == nil {
c = http.DefaultClient
}
return &service{url, c}
}
type service struct {
url string
c *http.Client
}
func (s *service) Ping(ctx context.Context) error {
const op errors.Op = "events.Ping"
return s.sendEvent(ctx, op, Ping, PingEvent{BaseEvent: BaseEvent{
Event: Ping.String(),
Version: build.Data().Version,
}})
}
func (s *service) Stashed(ctx context.Context, mod, ver string) error {
const op errors.Op = "events.Stashed"
return s.sendEvent(ctx, op, Stashed, StashedEvent{
BaseEvent: BaseEvent{
Event: Stashed.String(),
Version: build.Data().Version,
},
Module: mod,
Version: ver,
})
}
func (s *service) sendEvent(ctx context.Context, op errors.Op, event Type, payload interface{}) error {
req, err := s.getRequest(ctx, event, payload)
if err != nil {
return errors.E(op, err)
}
resp, err := s.c.Do(req)
if err != nil {
return errors.E(op, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
return errors.E(op, fmt.Errorf("event backend returned non-200 code: %d - body: %s", resp.StatusCode, body))
}
return nil
}
func (s *service) getRequest(ctx context.Context, event Type, payload interface{}) (*http.Request, error) {
const op errors.Op = "events.getRequest"
var buf bytes.Buffer
err := json.NewEncoder(&buf).Encode(payload)
if err != nil {
return nil, errors.E(op, err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url, &buf)
if err != nil {
return nil, errors.E(op, err)
}
req.Header.Set(HeaderKey, event.String())
req.Header.Set(requestid.HeaderKey, requestid.FromContext(ctx))
return req, nil
}
+50
View File
@@ -0,0 +1,50 @@
package events
import (
"context"
)
//go:generate stringer -type=Type
// Type describe various event types
type Type int
// HeaderKey is the HTTP Header that Athens will send
// along every event. This helps you know which JSON shape
// to use when parsing a request body
const HeaderKey = "Athens-Event"
// Event types
const (
Ping Type = iota + 1
Stashed
)
// BaseEvent is the common data that all
// event payloads are composed of.
type BaseEvent struct {
Event string
Version string
}
// PingEvent describes the payload for a Ping event
type PingEvent struct {
BaseEvent
}
// StashedEvent describes the payload for the Stashed event
type StashedEvent struct {
BaseEvent
Module, Version string
}
// Hook describes a service that can be used to send events to
type Hook interface {
// Ping pings the underlying server to ensure that
// the event hook url is ready to receive requests
Ping(ctx context.Context) error
// Stashed is called whenever a new module is succesfully persisted
// to the storage Backend
Stashed(ctx context.Context, mod, ver string) error
}
+121
View File
@@ -0,0 +1,121 @@
package events
import (
"context"
"fmt"
"net/http/httptest"
"testing"
"github.com/gomods/athens/pkg/requestid"
"github.com/stretchr/testify/require"
"github.com/technosophos/moniker"
)
var pingTests = []struct {
name string
err error
}{
{
name: "ping",
},
{
name: "ping_err",
err: fmt.Errorf("could not ping"),
},
}
func TestClientServerPing(t *testing.T) {
for _, tc := range pingTests {
t.Run(tc.name, func(t *testing.T) {
hook := &mockHook{err: tc.err}
srv := httptest.NewServer(NewServer(hook))
t.Cleanup(srv.Close)
client := NewClient(srv.URL, nil)
err := client.Ping(context.Background())
checkErr(t, tc.err != nil, err)
})
}
}
var stashedTests = []struct {
name string
mod string
ver string
err error
}{
{
name: "happy path",
mod: "github.com/gomods/athens",
ver: "v0.10.0",
},
{
name: "stashed error",
mod: "mod",
ver: "ver",
err: fmt.Errorf("server error"),
},
}
func TestClientServerStashed(t *testing.T) {
for _, tc := range stashedTests {
t.Run(tc.name, func(t *testing.T) {
hook := &mockHook{err: tc.err}
srv := httptest.NewServer(NewServer(hook))
t.Cleanup(srv.Close)
client := NewClient(srv.URL, nil)
err := client.Stashed(context.Background(), "github.com/gomods/athens", "v0.10.0")
if checkErr(t, tc.err != nil, err) {
return
}
if tc.mod != hook.mod {
t.Fatalf("expected module to be %q but got %q", tc.mod, hook.mod)
}
if tc.ver != hook.ver {
t.Fatalf("expected version to be %q but got %q", tc.ver, hook.ver)
}
})
}
}
func TestRequestIDPropagation(t *testing.T) {
hook := &mockHook{}
srv := httptest.NewServer(NewServer(hook))
t.Cleanup(srv.Close)
client := NewClient(srv.URL, nil)
reqID := moniker.New().Name()
ctx := requestid.SetInContext(context.Background(), reqID)
err := client.Ping(ctx)
if err != nil {
t.Fatal(err)
}
if reqID != hook.reqid {
t.Fatalf("expected request id to be %q but got %q", reqID, hook.reqid)
}
}
type mockHook struct {
mod, ver string
reqid string
err error
}
func (mh *mockHook) Ping(ctx context.Context) error {
mh.reqid = requestid.FromContext(ctx)
return mh.err
}
func (mh *mockHook) Stashed(ctx context.Context, mod, ver string) error {
mh.mod, mh.ver = mod, ver
return mh.err
}
func checkErr(t *testing.T, wantErr bool, err error) bool {
if wantErr {
if err == nil {
t.Fatal("expected an error but got nil")
}
return true
}
require.NoError(t, err)
return false
}
+45
View File
@@ -0,0 +1,45 @@
package events
import (
"encoding/json"
"fmt"
"net/http"
"github.com/gomods/athens/pkg/requestid"
)
// NewServer returns an http.Handler that parses
func NewServer(h Hook) http.Handler {
return &server{h}
}
type server struct {
h Hook
}
func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.NotFound(w, r)
return
}
ctx := r.Context()
ctx = requestid.SetInContext(ctx, r.Header.Get(requestid.HeaderKey))
var err error
switch event := r.Header.Get(HeaderKey); event {
case Ping.String():
err = s.h.Ping(ctx)
case Stashed.String():
var body StashedEvent
err = json.NewDecoder(r.Body).Decode(&body)
if err != nil {
break
}
err = s.h.Stashed(ctx, body.Module, body.Version)
default:
err = fmt.Errorf("unknown event: %q", event)
}
if err != nil {
http.Error(w, err.Error(), 500)
return
}
}
+25
View File
@@ -0,0 +1,25 @@
// Code generated by "stringer -type=Type"; DO NOT EDIT.
package events
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[Ping-1]
_ = x[Stashed-2]
}
const _Type_name = "PingStashed"
var _Type_index = [...]uint8{0, 4, 11}
func (i Type) String() string {
i -= 1
if i < 0 || i >= Type(len(_Type_index)-1) {
return "Type(" + strconv.FormatInt(int64(i+1), 10) + ")"
}
return _Type_name[_Type_index[i]:_Type_index[i+1]]
}
+34
View File
@@ -0,0 +1,34 @@
package stash
import (
"context"
"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/events"
)
// WithEventsHook returns a stasher that can send out Stashed events
// to the given implementation
func WithEventsHook(e events.Hook) Wrapper {
return func(s Stasher) Stasher {
return &withEvent{s, e}
}
}
type withEvent struct {
s Stasher
e events.Hook
}
func (we *withEvent) Stash(ctx context.Context, mod string, ver string) (string, error) {
const op errors.Op = "stash.withEvent"
resolvedVer, err := we.s.Stash(ctx, mod, ver)
if err != nil {
return "", errors.E(op, err)
}
err = we.e.Stashed(ctx, mod, resolvedVer)
if err != nil {
return "", errors.E(op, err)
}
return resolvedVer, nil
}