implement /index endpoint (#1630)

* implement /index endpoint

* rename to Module to Path
This commit is contained in:
Marwan Sulaiman
2020-06-24 14:29:30 -04:00
committed by GitHub
parent 216723117e
commit 52934cfa46
25 changed files with 797 additions and 39 deletions
+118
View File
@@ -0,0 +1,118 @@
package compliance
import (
"context"
"fmt"
"testing"
"time"
"github.com/gomods/athens/pkg/index"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/technosophos/moniker"
)
func RunTests(t *testing.T, indexer index.Indexer, clearIndex func() error) {
if err := clearIndex(); err != nil {
t.Fatal(err)
}
var tests = []struct {
name string
desc string
limit int
preTest func(t *testing.T) ([]*index.Line, time.Time)
}{
{
name: "empty",
desc: "an empty index should return an empty slice",
preTest: func(t *testing.T) ([]*index.Line, time.Time) { return []*index.Line{}, time.Time{} },
limit: 2000,
},
{
name: "happy path",
desc: "given 10 modules, return all of them in correct order",
preTest: func(t *testing.T) ([]*index.Line, time.Time) {
return seed(t, indexer, 10), time.Time{}
},
limit: 2000,
},
{
name: "respect the limit",
desc: "givn 10 modules and a 'limit' of 5, only return the first five lines",
preTest: func(t *testing.T) ([]*index.Line, time.Time) {
lines := seed(t, indexer, 10)
return lines[0:5], time.Time{}
},
limit: 5,
},
{
name: "respect the time",
desc: "given 10 modules, 'since' should filter out the ones that came before it",
preTest: func(t *testing.T) ([]*index.Line, time.Time) {
err := indexer.Index(context.Background(), "tobeignored", "v1.2.3")
if err != nil {
t.Fatal(err)
}
time.Sleep(50 * time.Millisecond)
now := time.Now()
lines := seed(t, indexer, 5)
return lines, now
},
limit: 2000,
},
{
name: "ignore the past",
desc: "no line should be returned if 'since' is after all of the indexed modules",
preTest: func(t *testing.T) ([]*index.Line, time.Time) {
seed(t, indexer, 5)
time.Sleep(50 * time.Millisecond)
return []*index.Line{}, time.Now()
},
limit: 2000,
},
{
name: "no limit no line",
desc: "if limit is set to zero, then nothing should be returned",
preTest: func(t *testing.T) ([]*index.Line, time.Time) {
seed(t, indexer, 5)
return []*index.Line{}, time.Time{}
},
limit: 0,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Log(tc.desc)
t.Cleanup(func() {
if err := clearIndex(); err != nil {
t.Fatal(err)
}
})
expected, since := tc.preTest(t)
given, err := indexer.Lines(context.Background(), since, tc.limit)
if err != nil {
t.Fatal(err)
}
opts := cmpopts.IgnoreFields(index.Line{}, "Timestamp")
if !cmp.Equal(given, expected, opts) {
t.Fatal(cmp.Diff(expected, given, opts))
}
})
}
}
func seed(t *testing.T, indexer index.Indexer, num int) []*index.Line {
lines := []*index.Line{}
t.Helper()
for i := 0; i < num; i++ {
mod := moniker.New().NameSep("_")
ver := fmt.Sprintf("%d.0.0", i)
err := indexer.Index(context.Background(), mod, ver)
if err != nil {
t.Fatal(err)
}
lines = append(lines, &index.Line{Path: mod, Version: ver})
}
return lines
}
+26
View File
@@ -0,0 +1,26 @@
package index
import (
"context"
"time"
)
// Line represents a module@version line
// with its metadata such as creation time.
type Line struct {
Path, Version string
Timestamp time.Time
}
// Indexer is an interface that can process new module@versions
// and also retrieve 'limit' module@versions that were indexed after 'since'
type Indexer interface {
// Index stores the module@version into the index backend.
// Implementer must create the Timestamp at the time and set it
// to the time this method is call.
Index(ctx context.Context, mod, ver string) error
// Lines returns the module@version lines given the time and limit
// constraints
Lines(ctx context.Context, since time.Time, limit int) ([]*Line, error)
}
+51
View File
@@ -0,0 +1,51 @@
package mem
import (
"context"
"sync"
"time"
"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/index"
)
// New returns a new in-memory indexer
func New() index.Indexer {
return &indexer{}
}
type indexer struct {
mu sync.RWMutex
lines []*index.Line
}
func (i *indexer) Index(ctx context.Context, mod, ver string) error {
const op errors.Op = "mem.Index"
i.mu.Lock()
i.lines = append(i.lines, &index.Line{
Path: mod,
Version: ver,
Timestamp: time.Now(),
})
i.mu.Unlock()
return nil
}
func (i *indexer) Lines(ctx context.Context, since time.Time, limit int) ([]*index.Line, error) {
const op errors.Op = "mem.Lines"
lines := []*index.Line{}
var count int
i.mu.RLock()
defer i.mu.RUnlock()
for _, line := range i.lines {
if count >= limit {
break
}
if since.After(line.Timestamp) {
continue
}
lines = append(lines, line)
count++
}
return lines, nil
}
+20
View File
@@ -0,0 +1,20 @@
package mem
import (
"testing"
"github.com/gomods/athens/pkg/index"
"github.com/gomods/athens/pkg/index/compliance"
)
func TestMem(t *testing.T) {
indexer := &indexer{}
compliance.RunTests(t, indexer, indexer.clear)
}
func (i *indexer) clear() error {
i.mu.Lock()
i.lines = []*index.Line{}
i.mu.Unlock()
return nil
}
+105
View File
@@ -0,0 +1,105 @@
package mysql
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/go-sql-driver/mysql"
"github.com/gomods/athens/pkg/config"
"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/index"
)
func New(cfg *config.MySQL) (index.Indexer, error) {
dataSource := getMySQLSource(cfg)
db, err := sql.Open("mysql", dataSource)
if err != nil {
return nil, err
}
if err = db.Ping(); err != nil {
return nil, err
}
_, err = db.Exec(schema)
if err != nil {
return nil, err
}
return &indexer{db}, nil
}
const schema = `
CREATE TABLE IF NOT EXISTS indexes(
id INT
AUTO_INCREMENT
PRIMARY KEY
COMMENT 'Unique identifier for a module line',
path VARCHAR(255)
NOT NULL
COMMENT 'Import path of the module',
version VARCHAR(255)
NOT NULL
COMMENT 'Module version',
timestamp TIMESTAMP(6)
COMMENT 'Date and time when the module was first created',
INDEX (timestamp),
UNIQUE INDEX idx_module_version (path, version)
) CHARACTER SET utf8;
`
type indexer struct {
db *sql.DB
}
func (i *indexer) Index(ctx context.Context, mod, ver string) error {
const op errors.Op = "mysql.Index"
_, err := i.db.ExecContext(
ctx,
`INSERT INTO indexes (path, version, timestamp) VALUES (?, ?, ?)`,
mod,
ver,
time.Now().Format(time.RFC3339Nano),
)
if err != nil {
return errors.E(op, err)
}
return nil
}
func (i *indexer) Lines(ctx context.Context, since time.Time, limit int) ([]*index.Line, error) {
const op errors.Op = "mysql.Lines"
if since.IsZero() {
since = time.Unix(0, 0)
}
sinceStr := since.Format(time.RFC3339Nano)
rows, err := i.db.QueryContext(ctx, `SELECT path, version, timestamp FROM indexes WHERE timestamp >= ? LIMIT ?`, sinceStr, limit)
if err != nil {
return nil, errors.E(op, err)
}
defer rows.Close()
lines := []*index.Line{}
for rows.Next() {
var line index.Line
err = rows.Scan(&line.Path, &line.Version, &line.Timestamp)
if err != nil {
return nil, errors.E(op, err)
}
lines = append(lines, &line)
}
return lines, nil
}
func getMySQLSource(cfg *config.MySQL) string {
c := mysql.NewConfig()
c.Net = cfg.Protocol
c.Addr = fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
c.User = cfg.User
c.Passwd = cfg.Password
c.DBName = cfg.Database
c.Params = cfg.Params
return c.FormatDSN()
}
+35
View File
@@ -0,0 +1,35 @@
package mysql
import (
"os"
"testing"
"github.com/gomods/athens/pkg/config"
"github.com/gomods/athens/pkg/index/compliance"
)
func TestMySQL(t *testing.T) {
if os.Getenv("TEST_INDEX_MYSQL") != "true" {
t.SkipNow()
}
cfg := getTestConfig(t)
i, err := New(cfg)
if err != nil {
t.Fatal(err)
}
compliance.RunTests(t, i, i.(*indexer).clear)
}
func (i *indexer) clear() error {
_, err := i.db.Exec(`DELETE FROM indexes`)
return err
}
func getTestConfig(t *testing.T) *config.MySQL {
t.Helper()
cfg, err := config.Load("")
if err != nil {
t.Fatal(err)
}
return cfg.Index.MySQL
}
+22
View File
@@ -0,0 +1,22 @@
package nop
import (
"context"
"time"
"github.com/gomods/athens/pkg/index"
)
// New returns a no-op Indexer
func New() index.Indexer {
return indexer{}
}
type indexer struct{}
func (indexer) Index(ctx context.Context, mod, ver string) error {
return nil
}
func (indexer) Lines(ctx context.Context, since time.Time, limit int) ([]*index.Line, error) {
return []*index.Line{}, nil
}
+106
View File
@@ -0,0 +1,106 @@
package postgres
import (
"context"
"database/sql"
"strconv"
"strings"
"time"
// register the driver with database/sql
_ "github.com/lib/pq"
"github.com/gomods/athens/pkg/config"
"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/index"
)
func New(cfg *config.Postgres) (index.Indexer, error) {
dataSource := getPostgresSource(cfg)
db, err := sql.Open("postgres", dataSource)
if err != nil {
return nil, err
}
if err = db.Ping(); err != nil {
return nil, err
}
for _, statement := range schema {
_, err = db.Exec(statement)
if err != nil {
return nil, err
}
}
return &indexer{db}, nil
}
var schema = [...]string{
`
CREATE TABLE IF NOT EXISTS indexes(
id SERIAL PRIMARY KEY,
path VARCHAR(255) NOT NULL,
version VARCHAR(255) NOT NULL,
timestamp timestamp NOT NULL
)
`,
`
CREATE INDEX IF NOT EXISTS idx_timestamp ON indexes (timestamp)
`,
`
CREATE UNIQUE INDEX IF NOT EXISTS idx_module_version ON indexes (path, version)
`,
}
type indexer struct {
db *sql.DB
}
func (i *indexer) Index(ctx context.Context, mod, ver string) error {
const op errors.Op = "postgres.Index"
_, err := i.db.ExecContext(
ctx,
`INSERT INTO indexes (path, version, timestamp) VALUES ($1, $2, $3)`,
mod,
ver,
time.Now().Format(time.RFC3339Nano),
)
if err != nil {
return errors.E(op, err)
}
return nil
}
func (i *indexer) Lines(ctx context.Context, since time.Time, limit int) ([]*index.Line, error) {
const op errors.Op = "postgres.Lines"
if since.IsZero() {
since = time.Unix(0, 0)
}
sinceStr := since.Format(time.RFC3339Nano)
rows, err := i.db.QueryContext(ctx, `SELECT path, version, timestamp FROM indexes WHERE timestamp >= $1 LIMIT $2`, sinceStr, limit)
if err != nil {
return nil, errors.E(op, err)
}
defer rows.Close()
lines := []*index.Line{}
for rows.Next() {
var line index.Line
err = rows.Scan(&line.Path, &line.Version, &line.Timestamp)
if err != nil {
return nil, errors.E(op, err)
}
lines = append(lines, &line)
}
return lines, nil
}
func getPostgresSource(cfg *config.Postgres) string {
args := []string{}
args = append(args, "host="+cfg.Host)
args = append(args, "port=", strconv.Itoa(cfg.Port))
args = append(args, "user=", cfg.User)
args = append(args, "dbname=", cfg.Database)
args = append(args, "password="+cfg.Password)
for k, v := range cfg.Params {
args = append(args, k+"="+v)
}
return strings.Join(args, " ")
}
+35
View File
@@ -0,0 +1,35 @@
package postgres
import (
"os"
"testing"
"github.com/gomods/athens/pkg/config"
"github.com/gomods/athens/pkg/index/compliance"
)
func TestPostgres(t *testing.T) {
if os.Getenv("TEST_INDEX_POSTGRES") != "true" {
t.SkipNow()
}
cfg := getTestConfig(t)
i, err := New(cfg)
if err != nil {
t.Fatal(err)
}
compliance.RunTests(t, i, i.(*indexer).clear)
}
func (i *indexer) clear() error {
_, err := i.db.Exec(`DELETE FROM indexes`)
return err
}
func getTestConfig(t *testing.T) *config.Postgres {
t.Helper()
cfg, err := config.Load("")
if err != nil {
t.Fatal(err)
}
return cfg.Index.Postgres
}