Init events

This commit is contained in:
Marwan Sulaiman
2020-07-31 14:43:37 -04:00
parent dfb7887080
commit 2d3250ffba
8 changed files with 265 additions and 1 deletions
+26 -1
View File
@@ -1,16 +1,20 @@
package actions
import (
"context"
"fmt"
"net/http"
"net/url"
"path"
"strings"
"time"
"github.com/gomods/athens/pkg/config"
"github.com/gomods/athens/pkg/download"
"github.com/gomods/athens/pkg/download/addons"
"github.com/gomods/athens/pkg/download/mode"
"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/events"
"github.com/gomods/athens/pkg/index"
"github.com/gomods/athens/pkg/index/mem"
"github.com/gomods/athens/pkg/index/mysql"
@@ -107,7 +111,11 @@ func addProxyRoutes(
if err != nil {
return err
}
st := stash.New(mf, s, indexer, stash.WithPool(c.GoGetWorkers), withSingleFlight)
withEventsHook, err := getEventHook(c)
if err != nil {
return err
}
st := stash.New(mf, s, indexer, stash.WithPool(c.GoGetWorkers), withSingleFlight, withEventsHook)
df, err := mode.NewFile(c.DownloadMode, c.DownloadURL)
if err != nil {
@@ -129,6 +137,23 @@ func addProxyRoutes(
return nil
}
func getEventHook(c *config.Config) (stash.Wrapper, error) {
const op errors.Op = "actions.getEventHook"
if c.EventsHook == "" {
return func(s stash.Stasher) stash.Stasher {
return s
}, nil
}
eh := events.NewClient(c.EventsHook, nil)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
err := eh.Ping(ctx)
if err != nil {
return nil, errors.E(op, err)
}
return stash.WithEventsHook(eh), nil
}
func getSingleFlight(c *config.Config, checker storage.Checker) (stash.Wrapper, error) {
switch c.SingleFlightType {
case "", "memory":
+9
View File
@@ -199,6 +199,15 @@ ForceSSL = false
# Env override: ATHENS_PROXY_VALIDATOR
ValidatorHook = ""
# EventsHook specifies a URL that will receive POST request events
# throughout an Athens request lifecycle.
#
# To see what type of events you will get and what the payload looks like
# check the pkg/event in this repository.
#
# Env override: ATHENS_EVENTS_HOOK
EventsHook = ""
# PathPrefix specifies whether the Proxy
# should have a basepath. Certain proxies and services
# are distinguished based on subdomain, while others are based
+1
View File
@@ -44,6 +44,7 @@ type Config struct {
PropagateAuthHost string `envconfig:"ATHENS_PROPAGATE_AUTH_HOST"`
ForceSSL bool `envconfig:"PROXY_FORCE_SSL"`
ValidatorHook string `envconfig:"ATHENS_PROXY_VALIDATOR"`
EventsHook string `envconfig:"ATHENS_EVENTS_HOOK"`
PathPrefix string `envconfig:"ATHENS_PATH_PREFIX"`
NETRCPath string `envconfig:"ATHENS_NETRC_PATH"`
GithubToken string `envconfig:"ATHENS_GITHUB_TOKEN"`
+78
View File
@@ -0,0 +1,78 @@
package events
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"github.com/gomods/athens/pkg/build"
"github.com/gomods/athens/pkg/errors"
)
// NewClient returns a new http service
func NewClient(url string, c *http.Client) Hook {
if c == nil {
c = http.DefaultClient
}
return &service{url, c}
}
type service struct {
url string
c *http.Client
}
func (s *service) Ping(ctx context.Context) error {
const op errors.Op = "events.Ping"
return s.sendEvent(ctx, op, Ping, PingEvent{BaseEvent: BaseEvent{
Event: Ping.String(),
Version: build.Data().Version,
}})
}
func (s *service) Stashed(ctx context.Context, mod, ver string) error {
const op errors.Op = "events.Stashed"
return s.sendEvent(ctx, op, Stashed, StashedEvent{
BaseEvent: BaseEvent{
Event: Stashed.String(),
Version: build.Data().Version,
},
Module: mod,
Version: ver,
})
}
func (s *service) sendEvent(ctx context.Context, op errors.Op, event Type, payload interface{}) error {
req, err := s.getRequest(ctx, event, payload)
if err != nil {
return errors.E(op, err)
}
resp, err := s.c.Do(req)
if err != nil {
return errors.E(op, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
return errors.E(op, fmt.Errorf("event backend returned non-200 code: %d - body: %s", resp.StatusCode, body))
}
return nil
}
func (s *service) getRequest(ctx context.Context, event Type, payload interface{}) (*http.Request, error) {
const op errors.Op = "events.getRequest"
var buf bytes.Buffer
err := json.NewEncoder(&buf).Encode(payload)
if err != nil {
return nil, errors.E(op, err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url, &buf)
if err != nil {
return nil, errors.E(op, err)
}
req.Header.Set(HeaderKey, event.String())
return req, nil
}
+50
View File
@@ -0,0 +1,50 @@
package events
import (
"context"
)
//go:generate stringer -type=Type
// Type describe various event types
type Type int
// HeaderKey is the HTTP Header that Athens will send
// along every event. This helps you know which JSON shape
// to use when parsing a request body
const HeaderKey = "Athens-Event"
// Event types
const (
Ping Type = iota + 1
Stashed
)
// BaseEvent is the common data that all
// event payloads are composed of.
type BaseEvent struct {
Event string
Version string
}
// PingEvent describes the payload for a Ping event
type PingEvent struct {
BaseEvent
}
// StashedEvent describes the payload for the Stashed event
type StashedEvent struct {
BaseEvent
Module, Version string
}
// Hook describes a service that can be used to send events to
type Hook interface {
// Ping pings the underlying server to ensure that
// the event hook url is ready to receive requests
Ping(ctx context.Context) error
// Stashed is called whenever a new module is succesfully persisted
// to the storage Backend
Stashed(ctx context.Context, mod, ver string) error
}
+42
View File
@@ -0,0 +1,42 @@
package events
import (
"encoding/json"
"fmt"
"net/http"
)
// NewServer returns an http.Handler that parses
func NewServer(h Hook) http.Handler {
return &server{h}
}
type server struct {
h Hook
}
func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.NotFound(w, r)
return
}
ctx := r.Context()
var err error
switch event := r.Header.Get(HeaderKey); event {
case Ping.String():
err = s.h.Ping(ctx)
case Stashed.String():
var body StashedEvent
err = json.NewDecoder(r.Body).Decode(&body)
if err != nil {
break
}
err = s.h.Stashed(ctx, body.Module, body.Version)
default:
err = fmt.Errorf("unknown event: %q", event)
}
if err != nil {
http.Error(w, err.Error(), 500)
return
}
}
+25
View File
@@ -0,0 +1,25 @@
// Code generated by "stringer -type=Type"; DO NOT EDIT.
package events
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[Ping-1]
_ = x[Stashed-2]
}
const _Type_name = "PingStashed"
var _Type_index = [...]uint8{0, 4, 11}
func (i Type) String() string {
i -= 1
if i < 0 || i >= Type(len(_Type_index)-1) {
return "Type(" + strconv.FormatInt(int64(i+1), 10) + ")"
}
return _Type_name[_Type_index[i]:_Type_index[i+1]]
}
+34
View File
@@ -0,0 +1,34 @@
package stash
import (
"context"
"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/events"
)
// WithEventsHook returns a stasher that can send out Stashed events
// to the given implementation
func WithEventsHook(e events.Hook) Wrapper {
return func(s Stasher) Stasher {
return &withEvent{s, e}
}
}
type withEvent struct {
s Stasher
e events.Hook
}
func (we *withEvent) Stash(ctx context.Context, mod string, ver string) (string, error) {
const op errors.Op = "stash.withEvent"
resolvedVer, err := we.s.Stash(ctx, mod, ver)
if err != nil {
return "", errors.E(op, err)
}
err = we.e.Stashed(ctx, mod, resolvedVer)
if err != nil {
return "", errors.E(op, err)
}
return resolvedVer, nil
}