Files
athens/pkg/storage/mongo/saver.go
Arpit Gogia 974077e73b Mongo go driver beta integration (#1042)
* Changed mongo.go to use new driver

* Modified mongo cataloger

* More new driver related changes

* Change lister.go

* Change saver.go

* Change imports

* Remove unnecessary Count query

* Use IndexView for indexing

* Rename ModuleStore fields

* Use map of key:sorting-order for creating the index

* Minor changes

* Use client options to configure mongo client

* Use method chaining

* gofmt changes

* Change imports

* Fix some build errors

* Use new GridFS API

* Fix more build errors

* Add Go Mongo driver to dependency modules

* Use multierror

* Leave download stream open

* Remove mgo error handling

* Copy zip instead of loading all in memory

* Use context.WithTimeout() wherever possible

* Raise KindNotFound when mod@ver isn't found

* NopCloser not needed

* Fix IndexView error

* Fix build errors

* Remove another mgo error usage

* Fix build error

* Changes according to review

* Formatting changes as per gofmt

* Modify gofmt argument to show the expected formatting (diff)

* Handle ErrNoDocument error and error arising from query execution

* Fix kind of returned error

* Minor changes

* Bug fixes

* gofmt related changes

* Minor change

* Use Insecure from MongoConfig, remove Insecure from global Config

* Remove stray print statement
2019-04-17 19:59:01 +02:00

70 lines
1.8 KiB
Go

package mongo
import (
"context"
"fmt"
"io"
"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/observ"
"github.com/gomods/athens/pkg/storage"
"go.mongodb.org/mongo-driver/mongo/gridfs"
"go.mongodb.org/mongo-driver/mongo/options"
)
// Save stores a module in mongo storage.
func (s *ModuleStore) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error {
const op errors.Op = "mongo.Save"
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
exists, err := s.Exists(ctx, module, version)
if err != nil {
return errors.E(op, err, errors.M(module), errors.V(version))
}
if exists {
return errors.E(op, "already exists", errors.M(module), errors.V(version), errors.KindAlreadyExists)
}
zipName := s.gridFileName(module, version)
db := s.client.Database(s.db)
bucket, err := gridfs.NewBucket(db, options.GridFSBucket())
if err != nil {
return errors.E(op, err, errors.M(module), errors.V(version))
}
uStream, err := bucket.OpenUploadStream(zipName, options.GridFSUpload())
if err != nil {
return errors.E(op, err, errors.M(module), errors.V(version))
}
defer uStream.Close()
numBytesWritten, err := io.Copy(uStream, zip)
if err != nil {
return errors.E(op, err, errors.M(module), errors.V(version))
}
if numBytesWritten <= 0 {
e := fmt.Errorf("copied %d bytes to Mongo GridFS", numBytesWritten)
return errors.E(op, e, errors.M(module), errors.V(version))
}
m := &storage.Module{
Module: module,
Version: version,
Mod: mod,
Info: info,
}
c := s.client.Database(s.db).Collection(s.coll)
tctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()
_, err = c.InsertOne(tctx, m, options.InsertOne().SetBypassDocumentValidation(false))
if err != nil {
return errors.E(op, err, errors.M(module), errors.V(version))
}
return nil
}