route/vendor/github.com/GoRethink/gorethink/cursor.go

711 lines
15 KiB
Go

package gorethink
import (
"bytes"
"encoding/json"
"errors"
"reflect"
"sync"
"gopkg.in/gorethink/gorethink.v2/encoding"
p "gopkg.in/gorethink/gorethink.v2/ql2"
)
var (
errNilCursor = errors.New("cursor is nil")
errCursorClosed = errors.New("connection closed, cannot read cursor")
)
func newCursor(conn *Connection, cursorType string, token int64, term *Term, opts map[string]interface{}) *Cursor {
if cursorType == "" {
cursorType = "Cursor"
}
connOpts := &ConnectOpts{}
if conn != nil {
connOpts = conn.opts
}
cursor := &Cursor{
conn: conn,
connOpts: connOpts,
token: token,
cursorType: cursorType,
term: term,
opts: opts,
buffer: make([]interface{}, 0),
responses: make([]json.RawMessage, 0),
}
return cursor
}
// Cursor is the result of a query. Its cursor starts before the first row
// of the result set. A Cursor is not thread safe and should only be accessed
// by a single goroutine at any given time. Use Next to advance through the
// rows:
//
// cursor, err := query.Run(session)
// ...
// defer cursor.Close()
//
// var response interface{}
// for cursor.Next(&response) {
// ...
// }
// err = cursor.Err() // get any error encountered during iteration
// ...
type Cursor struct {
releaseConn func() error
conn *Connection
connOpts *ConnectOpts
token int64
cursorType string
term *Term
opts map[string]interface{}
mu sync.RWMutex
lastErr error
fetching bool
closed bool
finished bool
isAtom bool
isSingleValue bool
pendingSkips int
buffer []interface{}
responses []json.RawMessage
profile interface{}
}
// Profile returns the information returned from the query profiler.
func (c *Cursor) Profile() interface{} {
if c == nil {
return nil
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.profile
}
// Type returns the cursor type (by default "Cursor")
func (c *Cursor) Type() string {
if c == nil {
return "Cursor"
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.cursorType
}
// Err returns nil if no errors happened during iteration, or the actual
// error otherwise.
func (c *Cursor) Err() error {
if c == nil {
return errNilCursor
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.lastErr
}
// Close closes the cursor, preventing further enumeration. If the end is
// encountered, the cursor is closed automatically. Close is idempotent.
func (c *Cursor) Close() error {
if c == nil {
return errNilCursor
}
c.mu.Lock()
defer c.mu.Unlock()
var err error
// If cursor is already closed return immediately
closed := c.closed
if closed {
return nil
}
// Get connection and check its valid, don't need to lock as this is only
// set when the cursor is created
conn := c.conn
if conn == nil {
return nil
}
if conn.Conn == nil {
return nil
}
// Stop any unfinished queries
if !c.finished {
q := Query{
Type: p.Query_STOP,
Token: c.token,
Opts: map[string]interface{}{
"noreply": true,
},
}
_, _, err = conn.Query(q)
}
if c.releaseConn != nil {
if err := c.releaseConn(); err != nil {
return err
}
}
c.closed = true
c.conn = nil
c.buffer = nil
c.responses = nil
return err
}
// Next retrieves the next document from the result set, blocking if necessary.
// This method will also automatically retrieve another batch of documents from
// the server when the current one is exhausted, or before that in background
// if possible.
//
// Next returns true if a document was successfully unmarshalled onto result,
// and false at the end of the result set or if an error happened.
// When Next returns false, the Err method should be called to verify if
// there was an error during iteration.
//
// Also note that you are able to reuse the same variable multiple times as
// `Next` zeroes the value before scanning in the result.
func (c *Cursor) Next(dest interface{}) bool {
if c == nil {
return false
}
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return false
}
hasMore, err := c.nextLocked(dest, true)
if c.handleErrorLocked(err) != nil {
c.mu.Unlock()
c.Close()
return false
}
c.mu.Unlock()
if !hasMore {
c.Close()
}
return hasMore
}
func (c *Cursor) nextLocked(dest interface{}, progressCursor bool) (bool, error) {
for {
if err := c.seekCursor(true); err != nil {
return false, err
}
if c.closed {
return false, nil
}
if len(c.buffer) == 0 && c.finished {
return false, nil
}
if len(c.buffer) > 0 {
data := c.buffer[0]
if progressCursor {
c.buffer = c.buffer[1:]
}
err := encoding.Decode(dest, data)
if err != nil {
return false, err
}
return true, nil
}
}
}
// Peek behaves similarly to Next, retreiving the next document from the result set
// and blocking if necessary. Peek, however, does not progress the position of the cursor.
// This can be useful for expressions which can return different types to attempt to
// decode them into different interfaces.
//
// Like Next, it will also automatically retrieve another batch of documents from
// the server when the current one is exhausted, or before that in background
// if possible.
//
// Unlike Next, Peek does not progress the position of the cursor. Peek
// will return errors from decoding, but they will not be persisted in the cursor
// and therefore will not be available on cursor.Err(). This can be useful for
// expressions that can return different types to attempt to decode them into
// different interfaces.
//
// Peek returns true if a document was successfully unmarshalled onto result,
// and false at the end of the result set or if an error happened. Peek also
// returns the error (if any) that occured
func (c *Cursor) Peek(dest interface{}) (bool, error) {
if c == nil {
return false, errNilCursor
}
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return false, nil
}
hasMore, err := c.nextLocked(dest, false)
if _, isDecodeErr := err.(*encoding.DecodeTypeError); isDecodeErr {
c.mu.Unlock()
return false, err
}
if c.handleErrorLocked(err) != nil {
c.mu.Unlock()
c.Close()
return false, err
}
c.mu.Unlock()
return hasMore, nil
}
// Skip progresses the cursor by one record. It is useful after a successful
// Peek to avoid duplicate decoding work.
func (c *Cursor) Skip() {
if c == nil {
return
}
c.mu.Lock()
defer c.mu.Unlock()
c.pendingSkips++
}
// NextResponse retrieves the next raw response from the result set, blocking if necessary.
// Unlike Next the returned response is the raw JSON document returned from the
// database.
//
// NextResponse returns false (and a nil byte slice) at the end of the result
// set or if an error happened.
func (c *Cursor) NextResponse() ([]byte, bool) {
if c == nil {
return nil, false
}
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return nil, false
}
b, hasMore, err := c.nextResponseLocked()
if c.handleErrorLocked(err) != nil {
c.mu.Unlock()
c.Close()
return nil, false
}
c.mu.Unlock()
if !hasMore {
c.Close()
}
return b, hasMore
}
func (c *Cursor) nextResponseLocked() ([]byte, bool, error) {
for {
if err := c.seekCursor(false); err != nil {
return nil, false, err
}
if len(c.responses) == 0 && c.finished {
return nil, false, nil
}
if len(c.responses) > 0 {
var response json.RawMessage
response, c.responses = c.responses[0], c.responses[1:]
return []byte(response), true, nil
}
}
}
// All retrieves all documents from the result set into the provided slice
// and closes the cursor.
//
// The result argument must necessarily be the address for a slice. The slice
// may be nil or previously allocated.
//
// Also note that you are able to reuse the same variable multiple times as
// `All` zeroes the value before scanning in the result. It also attempts
// to reuse the existing slice without allocating any more space by either
// resizing or returning a selection of the slice if necessary.
func (c *Cursor) All(result interface{}) error {
if c == nil {
return errNilCursor
}
resultv := reflect.ValueOf(result)
if resultv.Kind() != reflect.Ptr || resultv.Elem().Kind() != reflect.Slice {
panic("result argument must be a slice address")
}
slicev := resultv.Elem()
slicev = slicev.Slice(0, slicev.Cap())
elemt := slicev.Type().Elem()
i := 0
for {
if slicev.Len() == i {
elemp := reflect.New(elemt)
if !c.Next(elemp.Interface()) {
break
}
slicev = reflect.Append(slicev, elemp.Elem())
slicev = slicev.Slice(0, slicev.Cap())
} else {
if !c.Next(slicev.Index(i).Addr().Interface()) {
break
}
}
i++
}
resultv.Elem().Set(slicev.Slice(0, i))
if err := c.Err(); err != nil {
c.Close()
return err
}
if err := c.Close(); err != nil {
return err
}
return nil
}
// One retrieves a single document from the result set into the provided
// slice and closes the cursor.
//
// Also note that you are able to reuse the same variable multiple times as
// `One` zeroes the value before scanning in the result.
func (c *Cursor) One(result interface{}) error {
if c == nil {
return errNilCursor
}
if c.IsNil() {
c.Close()
return ErrEmptyResult
}
hasResult := c.Next(result)
if err := c.Err(); err != nil {
c.Close()
return err
}
if err := c.Close(); err != nil {
return err
}
if !hasResult {
return ErrEmptyResult
}
return nil
}
// Interface retrieves all documents from the result set and returns the data
// as an interface{} and closes the cursor.
//
// If the query returns multiple documents then a slice will be returned,
// otherwise a single value will be returned.
func (c *Cursor) Interface() (interface{}, error) {
if c == nil {
return nil, errNilCursor
}
var results []interface{}
var result interface{}
for c.Next(&result) {
results = append(results, result)
}
if err := c.Err(); err != nil {
return nil, err
}
c.mu.RLock()
isSingleValue := c.isSingleValue
c.mu.RUnlock()
if isSingleValue {
if len(results) == 0 {
return nil, nil
}
return results[0], nil
}
return results, nil
}
// Listen listens for rows from the database and sends the result onto the given
// channel. The type that the row is scanned into is determined by the element
// type of the channel.
//
// Also note that this function returns immediately.
//
// cursor, err := r.Expr([]int{1,2,3}).Run(session)
// if err != nil {
// panic(err)
// }
//
// ch := make(chan int)
// cursor.Listen(ch)
// <- ch // 1
// <- ch // 2
// <- ch // 3
func (c *Cursor) Listen(channel interface{}) {
go func() {
channelv := reflect.ValueOf(channel)
if channelv.Kind() != reflect.Chan {
panic("input argument must be a channel")
}
elemt := channelv.Type().Elem()
for {
elemp := reflect.New(elemt)
if !c.Next(elemp.Interface()) {
break
}
channelv.Send(elemp.Elem())
}
c.Close()
channelv.Close()
}()
}
// IsNil tests if the current row is nil.
func (c *Cursor) IsNil() bool {
if c == nil {
return true
}
c.mu.RLock()
defer c.mu.RUnlock()
if len(c.buffer) > 0 {
return c.buffer[0] == nil
}
if len(c.responses) > 0 {
response := c.responses[0]
if response == nil {
return true
}
if string(response) == "null" {
return true
}
return false
}
return true
}
// fetchMore fetches more rows from the database.
//
// If wait is true then it will wait for the database to reply otherwise it
// will return after sending the continue query.
func (c *Cursor) fetchMore() error {
var err error
if !c.fetching {
c.fetching = true
if c.closed {
return errCursorClosed
}
q := Query{
Type: p.Query_CONTINUE,
Token: c.token,
}
c.mu.Unlock()
_, _, err = c.conn.Query(q)
c.mu.Lock()
}
return err
}
// handleError sets the value of lastErr to err if lastErr is not yet set.
func (c *Cursor) handleError(err error) error {
c.mu.Lock()
defer c.mu.Unlock()
return c.handleErrorLocked(err)
}
func (c *Cursor) handleErrorLocked(err error) error {
if c.lastErr == nil {
c.lastErr = err
}
return c.lastErr
}
// extend adds the result of a continue query to the cursor.
func (c *Cursor) extend(response *Response) {
c.mu.Lock()
defer c.mu.Unlock()
c.extendLocked(response)
}
func (c *Cursor) extendLocked(response *Response) {
c.responses = append(c.responses, response.Responses...)
c.finished = response.Type != p.Response_SUCCESS_PARTIAL
c.fetching = false
c.isAtom = response.Type == p.Response_SUCCESS_ATOM
putResponse(response)
}
// seekCursor takes care of loading more data if needed and applying pending skips
//
// bufferResponse determines whether the response will be parsed into the buffer
func (c *Cursor) seekCursor(bufferResponse bool) error {
if c.lastErr != nil {
return c.lastErr
}
if len(c.buffer) == 0 && len(c.responses) == 0 && c.closed {
return errCursorClosed
}
// Loop over loading data, applying skips as necessary and loading more data as needed
// until either the cursor is closed or finished, or we have applied all outstanding
// skips and data is available
for {
c.applyPendingSkips(bufferResponse) // if we are buffering the responses, skip can drain from the buffer
if bufferResponse && len(c.buffer) == 0 && len(c.responses) > 0 {
if err := c.bufferNextResponse(); err != nil {
return err
}
continue // go around the loop again to re-apply pending skips
} else if len(c.buffer) == 0 && len(c.responses) == 0 && !c.finished {
// We skipped all of our data, load some more
if err := c.fetchMore(); err != nil {
return err
}
if c.closed {
return nil
}
continue // go around the loop again to re-apply pending skips
}
return nil
}
}
// applyPendingSkips applies all pending skips to the buffer and
// returns whether there are more pending skips to be applied
//
// if drainFromBuffer is true, we will drain from the buffer, otherwise
// we drain from the responses
func (c *Cursor) applyPendingSkips(drainFromBuffer bool) (stillPending bool) {
if c.pendingSkips == 0 {
return false
}
if drainFromBuffer {
if len(c.buffer) > c.pendingSkips {
c.buffer = c.buffer[c.pendingSkips:]
c.pendingSkips = 0
return false
}
c.pendingSkips -= len(c.buffer)
c.buffer = c.buffer[:0]
return c.pendingSkips > 0
}
if len(c.responses) > c.pendingSkips {
c.responses = c.responses[c.pendingSkips:]
c.pendingSkips = 0
return false
}
c.pendingSkips -= len(c.responses)
c.responses = c.responses[:0]
return c.pendingSkips > 0
}
// bufferResponse reads a single response and stores the result into the buffer
// if the response is from an atomic response, it will check if the
// response contains multiple records and store them all into the buffer
func (c *Cursor) bufferNextResponse() error {
if c.closed {
return errCursorClosed
}
// If there are no responses, nothing to do
if len(c.responses) == 0 {
return nil
}
response := c.responses[0]
c.responses = c.responses[1:]
var value interface{}
decoder := json.NewDecoder(bytes.NewBuffer(response))
if c.connOpts.UseJSONNumber {
decoder.UseNumber()
}
err := decoder.Decode(&value)
if err != nil {
return err
}
value, err = recursivelyConvertPseudotype(value, c.opts)
if err != nil {
return err
}
// If response is an ATOM then try and convert to an array
if data, ok := value.([]interface{}); ok && c.isAtom {
c.buffer = append(c.buffer, data...)
} else if value == nil {
c.buffer = append(c.buffer, nil)
} else {
c.buffer = append(c.buffer, value)
// If this is the only value in the response and the response was an
// atom then set the single value flag
if c.isAtom {
c.isSingleValue = true
}
}
return nil
}