package storm

import (
	"github.com/asdine/storm/internal"
	"github.com/asdine/storm/q"
	"github.com/coreos/bbolt"
)

// Select a list of records that match a list of matchers. Doesn't use indexes.
func (n *node) Select(matchers ...q.Matcher) Query {
	tree := q.And(matchers...)
	return newQuery(n, tree)
}

// Query is the low level query engine used by Storm. It allows to operate searches through an entire bucket.
type Query interface {
	// Skip matching records by the given number
	Skip(int) Query

	// Limit the results by the given number
	Limit(int) Query

	// Order by the given fields, in descending precedence, left-to-right.
	OrderBy(...string) Query

	// Reverse the order of the results
	Reverse() Query

	// Bucket specifies the bucket name
	Bucket(string) Query

	// Find a list of matching records
	Find(interface{}) error

	// First gets the first matching record
	First(interface{}) error

	// Delete all matching records
	Delete(interface{}) error

	// Count all the matching records
	Count(interface{}) (int, error)

	// Returns all the records without decoding them
	Raw() ([][]byte, error)

	// Execute the given function for each raw element
	RawEach(func([]byte, []byte) error) error

	// Execute the given function for each element
	Each(interface{}, func(interface{}) error) error
}

func newQuery(n *node, tree q.Matcher) *query {
	return &query{
		skip:  0,
		limit: -1,
		node:  n,
		tree:  tree,
	}
}

type query struct {
	limit   int
	skip    int
	reverse bool
	tree    q.Matcher
	node    *node
	bucket  string
	orderBy []string
}

func (q *query) Skip(nb int) Query {
	q.skip = nb
	return q
}

func (q *query) Limit(nb int) Query {
	q.limit = nb
	return q
}

func (q *query) OrderBy(field ...string) Query {
	q.orderBy = field
	return q
}

func (q *query) Reverse() Query {
	q.reverse = true
	return q
}

func (q *query) Bucket(bucketName string) Query {
	q.bucket = bucketName
	return q
}

func (q *query) Find(to interface{}) error {
	sink, err := newListSink(q.node, to)
	if err != nil {
		return err
	}

	return q.runQuery(sink)
}

func (q *query) First(to interface{}) error {
	sink, err := newFirstSink(q.node, to)
	if err != nil {
		return err
	}

	q.limit = 1
	return q.runQuery(sink)
}

func (q *query) Delete(kind interface{}) error {
	sink, err := newDeleteSink(q.node, kind)
	if err != nil {
		return err
	}

	return q.runQuery(sink)
}

func (q *query) Count(kind interface{}) (int, error) {
	sink, err := newCountSink(q.node, kind)
	if err != nil {
		return 0, err
	}

	err = q.runQuery(sink)
	if err != nil {
		return 0, err
	}

	return sink.counter, nil
}

func (q *query) Raw() ([][]byte, error) {
	sink := newRawSink()

	err := q.runQuery(sink)
	if err != nil {
		return nil, err
	}

	return sink.results, nil
}

func (q *query) RawEach(fn func([]byte, []byte) error) error {
	sink := newRawSink()

	sink.execFn = fn

	return q.runQuery(sink)
}

func (q *query) Each(kind interface{}, fn func(interface{}) error) error {
	sink, err := newEachSink(kind)
	if err != nil {
		return err
	}

	sink.execFn = fn

	return q.runQuery(sink)
}

func (q *query) runQuery(sink sink) error {
	if q.node.tx != nil {
		return q.query(q.node.tx, sink)
	}
	if sink.readOnly() {
		return q.node.s.Bolt.View(func(tx *bolt.Tx) error {
			return q.query(tx, sink)
		})
	}
	return q.node.s.Bolt.Update(func(tx *bolt.Tx) error {
		return q.query(tx, sink)
	})
}

func (q *query) query(tx *bolt.Tx, sink sink) error {
	bucketName := q.bucket
	if bucketName == "" {
		bucketName = sink.bucketName()
	}
	bucket := q.node.GetBucket(tx, bucketName)

	if q.limit == 0 {
		return sink.flush()
	}

	sorter := newSorter(q.node, sink)
	sorter.orderBy = q.orderBy
	sorter.reverse = q.reverse
	sorter.skip = q.skip
	sorter.limit = q.limit
	if bucket != nil {
		c := internal.Cursor{C: bucket.Cursor(), Reverse: q.reverse}
		for k, v := c.First(); k != nil; k, v = c.Next() {
			if v == nil {
				continue
			}

			stop, err := sorter.filter(q.tree, bucket, k, v)
			if err != nil {
				return err
			}

			if stop {
				break
			}
		}
	}

	return sorter.flush()
}