232 lines
4.1 KiB
Go
232 lines
4.1 KiB
Go
|
package storm
|
||
|
|
||
|
import (
|
||
|
"github.com/asdine/storm/internal"
|
||
|
"github.com/asdine/storm/q"
|
||
|
"github.com/boltdb/bolt"
|
||
|
)
|
||
|
|
||
|
// Select a list of records that match a list of matchers. Doesn't use indexes.
|
||
|
func (n *node) Select(matchers ...q.Matcher) Query {
|
||
|
tree := q.And(matchers...)
|
||
|
return newQuery(n, tree)
|
||
|
}
|
||
|
|
||
|
// Query is the low level query engine used by Storm. It allows to operate searches through an entire bucket.
|
||
|
type Query interface {
|
||
|
// Skip matching records by the given number
|
||
|
Skip(int) Query
|
||
|
|
||
|
// Limit the results by the given number
|
||
|
Limit(int) Query
|
||
|
|
||
|
// Order by the given field.
|
||
|
OrderBy(string) Query
|
||
|
|
||
|
// Reverse the order of the results
|
||
|
Reverse() Query
|
||
|
|
||
|
// Bucket specifies the bucket name
|
||
|
Bucket(string) Query
|
||
|
|
||
|
// Find a list of matching records
|
||
|
Find(interface{}) error
|
||
|
|
||
|
// First gets the first matching record
|
||
|
First(interface{}) error
|
||
|
|
||
|
// Delete all matching records
|
||
|
Delete(interface{}) error
|
||
|
|
||
|
// Count all the matching records
|
||
|
Count(interface{}) (int, error)
|
||
|
|
||
|
// Returns all the records without decoding them
|
||
|
Raw() ([][]byte, error)
|
||
|
|
||
|
// Execute the given function for each raw element
|
||
|
RawEach(func([]byte, []byte) error) error
|
||
|
|
||
|
// Execute the given function for each element
|
||
|
Each(interface{}, func(interface{}) error) error
|
||
|
}
|
||
|
|
||
|
func newQuery(n *node, tree q.Matcher) *query {
|
||
|
return &query{
|
||
|
skip: 0,
|
||
|
limit: -1,
|
||
|
node: n,
|
||
|
tree: tree,
|
||
|
sorter: newSorter(n),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type query struct {
|
||
|
limit int
|
||
|
skip int
|
||
|
reverse bool
|
||
|
tree q.Matcher
|
||
|
node *node
|
||
|
bucket string
|
||
|
sorter *sorter
|
||
|
}
|
||
|
|
||
|
func (q *query) Skip(nb int) Query {
|
||
|
q.skip = nb
|
||
|
return q
|
||
|
}
|
||
|
|
||
|
func (q *query) Limit(nb int) Query {
|
||
|
q.limit = nb
|
||
|
return q
|
||
|
}
|
||
|
|
||
|
func (q *query) OrderBy(field string) Query {
|
||
|
q.sorter.orderBy = field
|
||
|
return q
|
||
|
}
|
||
|
|
||
|
func (q *query) Reverse() Query {
|
||
|
q.reverse = true
|
||
|
q.sorter.reverse = true
|
||
|
return q
|
||
|
}
|
||
|
|
||
|
func (q *query) Bucket(bucketName string) Query {
|
||
|
q.bucket = bucketName
|
||
|
return q
|
||
|
}
|
||
|
|
||
|
func (q *query) Find(to interface{}) error {
|
||
|
sink, err := newListSink(q.node, to)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
sink.limit = q.limit
|
||
|
sink.skip = q.skip
|
||
|
return q.runQuery(sink)
|
||
|
}
|
||
|
|
||
|
func (q *query) First(to interface{}) error {
|
||
|
sink, err := newFirstSink(q.node, to)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
sink.skip = q.skip
|
||
|
return q.runQuery(sink)
|
||
|
}
|
||
|
|
||
|
func (q *query) Delete(kind interface{}) error {
|
||
|
sink, err := newDeleteSink(q.node, kind)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
sink.limit = q.limit
|
||
|
sink.skip = q.skip
|
||
|
|
||
|
return q.runQuery(sink)
|
||
|
}
|
||
|
|
||
|
func (q *query) Count(kind interface{}) (int, error) {
|
||
|
sink, err := newCountSink(q.node, kind)
|
||
|
if err != nil {
|
||
|
return 0, err
|
||
|
}
|
||
|
|
||
|
sink.limit = q.limit
|
||
|
sink.skip = q.skip
|
||
|
|
||
|
err = q.runQuery(sink)
|
||
|
if err != nil {
|
||
|
return 0, err
|
||
|
}
|
||
|
|
||
|
return sink.counter, nil
|
||
|
}
|
||
|
|
||
|
func (q *query) Raw() ([][]byte, error) {
|
||
|
sink := newRawSink()
|
||
|
|
||
|
sink.limit = q.limit
|
||
|
sink.skip = q.skip
|
||
|
|
||
|
err := q.runQuery(sink)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return sink.results, nil
|
||
|
}
|
||
|
|
||
|
func (q *query) RawEach(fn func([]byte, []byte) error) error {
|
||
|
sink := newRawSink()
|
||
|
|
||
|
sink.limit = q.limit
|
||
|
sink.skip = q.skip
|
||
|
sink.execFn = fn
|
||
|
|
||
|
return q.runQuery(sink)
|
||
|
}
|
||
|
|
||
|
func (q *query) Each(kind interface{}, fn func(interface{}) error) error {
|
||
|
sink, err := newEachSink(kind)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
sink.limit = q.limit
|
||
|
sink.skip = q.skip
|
||
|
sink.execFn = fn
|
||
|
|
||
|
return q.runQuery(sink)
|
||
|
}
|
||
|
|
||
|
func (q *query) runQuery(sink sink) error {
|
||
|
if q.node.tx != nil {
|
||
|
return q.query(q.node.tx, sink)
|
||
|
}
|
||
|
if sink.readOnly() {
|
||
|
return q.node.s.Bolt.View(func(tx *bolt.Tx) error {
|
||
|
return q.query(tx, sink)
|
||
|
})
|
||
|
}
|
||
|
return q.node.s.Bolt.Update(func(tx *bolt.Tx) error {
|
||
|
return q.query(tx, sink)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func (q *query) query(tx *bolt.Tx, sink sink) error {
|
||
|
bucketName := q.bucket
|
||
|
if bucketName == "" {
|
||
|
bucketName = sink.bucketName()
|
||
|
}
|
||
|
bucket := q.node.GetBucket(tx, bucketName)
|
||
|
|
||
|
if q.limit == 0 {
|
||
|
return q.sorter.flush(sink)
|
||
|
}
|
||
|
|
||
|
if bucket != nil {
|
||
|
c := internal.Cursor{C: bucket.Cursor(), Reverse: q.reverse}
|
||
|
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||
|
if v == nil {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
stop, err := q.sorter.filter(sink, q.tree, bucket, k, v)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if stop {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return q.sorter.flush(sink)
|
||
|
}
|