[WIP] Push endpoint (#181)

* initial structure

* make it compile...

* missing comment

* fix after rename

* single worker, two handlers

* minio starting to late

* fix after merge
This commit is contained in:
marpio
2018-06-25 22:03:01 +02:00
committed by Aaron Schlesinger
parent 2d25ccb3de
commit d0cbd9b809
7 changed files with 93 additions and 11 deletions
+2 -1
View File
@@ -21,7 +21,8 @@ script:
before_script:
- GO_FILES=$(find . -iname '*.go' -type f | grep -v /vendor/) # All the .go files, excluding vendor/
- go get github.com/golang/lint/golint
- wget https://dl.minio.io/server/minio/release/linux-amd64/minio && chmod +x minio && nohup ./minio server . &
- wget "https://dl.minio.io/server/minio/release/linux-amd64/minio"
- chmod +x minio && nohup ./minio server . &
- go get -u -v github.com/gobuffalo/buffalo/buffalo
- go get -u -v golang.org/x/vgo
- buffalo db create
+12 -6
View File
@@ -19,14 +19,19 @@ import (
)
const (
// DownloadWorkerName is name of the worker downloading packages from VCS
DownloadWorkerName = "download-worker"
// OlympusWorkerName is the name of the Olympus worker
OlympusWorkerName = "olympus-worker"
// DownloadHandlerName is name of the handler downloading packages from VCS
DownloadHandlerName = "download-handler"
// PushNotificationHandlerName is the name of the handler processing push notifications
PushNotificationHandlerName = "push-notification-worker"
)
var (
workerQueue = "default"
workerModuleKey = "module"
workerVersionKey = "version"
workerQueue = "default"
workerModuleKey = "module"
workerVersionKey = "version"
workerPushNotificationKey = "push-notification"
// ENV is used to help switch settings based on where the
// application is being run. Default is "development".
ENV = envy.Get("GO_ENV", "development")
@@ -106,6 +111,7 @@ func App() *buffalo.App {
app.GET("/feed/{lastID}", feedHandler(storage))
app.GET("/eventlog/{sequence_id}", eventlogHandler(eventlogReader))
app.POST("/cachemiss", cachemissHandler(cacheMissesLog, app.Worker))
app.POST("/push", pushNotificationHandler(app.Worker))
app.ServeFiles("/", assetsBox) // serve files from the public directory
}
@@ -122,7 +128,7 @@ func getWorker(port string) worker.Worker {
return redis.Dial("tcp", port)
},
},
Name: DownloadWorkerName,
Name: OlympusWorkerName,
MaxConcurrency: 25,
})
}
+1 -1
View File
@@ -36,7 +36,7 @@ func cachemissHandler(l eventlog.Appender, w worker.Worker) func(c buffalo.Conte
return w.Perform(worker.Job{
Queue: workerQueue,
Handler: DownloadWorkerName,
Handler: DownloadHandlerName,
Args: worker.Args{
workerModuleKey: cm.Name,
workerVersionKey: cm.Version,
+2 -2
View File
@@ -17,7 +17,7 @@ import (
// GetPackageDownloaderJob porcesses queue of cache misses and downloads sources from VCS
func GetPackageDownloaderJob(s storage.Backend, e eventlog.Eventlog, w worker.Worker) worker.Handler {
return func(args worker.Args) error {
module, version, err := parseArgs(args)
module, version, err := parsePackageDownloaderJobArgs(args)
if err != nil {
return err
}
@@ -62,7 +62,7 @@ func GetPackageDownloaderJob(s storage.Backend, e eventlog.Eventlog, w worker.Wo
}
}
func parseArgs(args worker.Args) (string, string, error) {
func parsePackageDownloaderJobArgs(args worker.Args) (string, string, error) {
module, ok := args[workerModuleKey].(string)
if !ok {
return "", "", errors.New("module name not specified")
+64
View File
@@ -0,0 +1,64 @@
package actions
import (
"context"
"encoding/json"
"errors"
"github.com/gobuffalo/buffalo"
"github.com/gobuffalo/buffalo/worker"
"github.com/gomods/athens/pkg/eventlog"
"github.com/gomods/athens/pkg/payloads"
"github.com/gomods/athens/pkg/storage"
)
func pushNotificationHandler(w worker.Worker) func(c buffalo.Context) error {
return func(c buffalo.Context) error {
p := &payloads.PushNotification{}
if err := c.Bind(p); err != nil {
return err
}
pj, err := json.Marshal(p)
if err != nil {
return err
}
return w.Perform(worker.Job{
Queue: workerQueue,
Handler: PushNotificationHandlerName,
Args: worker.Args{
workerPushNotificationKey: string(pj),
},
})
}
}
// GetProcessPushNotificationJob processes queue of push notifications
func GetProcessPushNotificationJob(storage storage.Backend, eLog eventlog.Eventlog, w worker.Worker) worker.Handler {
return func(args worker.Args) (err error) {
// TODO: background for now
ctx := context.Background()
pn, err := parsePushNotificationJobArgs(args)
if err != nil {
return err
}
diff, err := buildDiff(pn.Events)
if err != nil {
return err
}
return mergeDB(ctx, pn.OriginURL, *diff, eLog, storage)
}
}
func parsePushNotificationJobArgs(args worker.Args) (*payloads.PushNotification, error) {
pn, ok := args[workerPushNotificationKey].(string)
if !ok {
return nil, errors.New("push notification not found")
}
p := &payloads.PushNotification{}
b := []byte(pn)
if err := json.Unmarshal(b, p); err != nil {
return nil, err
}
return p, nil
}
+4 -1
View File
@@ -22,7 +22,10 @@ func main() {
}
w := app.Worker
if err := w.Register(actions.DownloadWorkerName, actions.GetPackageDownloaderJob(s, e, w)); err != nil {
if err := w.Register(actions.DownloadHandlerName, actions.GetPackageDownloaderJob(s, e, w)); err != nil {
log.Fatal(err)
}
if err := w.Register(actions.PushNotificationHandlerName, actions.GetProcessPushNotificationJob(s, e, w)); err != nil {
log.Fatal(err)
}
+8
View File
@@ -1,5 +1,7 @@
package payloads
import "github.com/gomods/athens/pkg/eventlog"
// Upload is used to send a module (zip and mod file) via POST request to the storage backend and save it there.
type Upload struct {
Module []byte `json:"module"`
@@ -12,3 +14,9 @@ type Module struct {
Name string `json:"name"`
Version string `json:"version"`
}
// PushNotification is used to notify other Olympus instances about a new event
type PushNotification struct {
Events []eventlog.Event `json:"events"`
OriginURL string `json:"originURL"`
}