gorqlite/conn.go

301 lines
8.0 KiB
Go
Raw Permalink Normal View History

2016-09-01 16:47:49 +00:00
package gorqlite
/*
this file contains some high-level Connection-oriented stuff
*/
/* *****************************************************************
imports
* *****************************************************************/
import "errors"
import "fmt"
import "io"
import "net"
import nurl "net/url"
import "strings"
var errClosed = errors.New("gorqlite: connection is closed")
var traceOut io.Writer
// defaults to false. This is used in trace() to quickly
// return if tracing is off, so that we don't do a perhaps
// expensive Sprintf() call only to send it to Discard
var wantsTrace bool
/* *****************************************************************
type: Connection
* *****************************************************************/
/*
The connection abstraction. Note that since rqlite is stateless,
there really is no "connection". However, this type holds
2017-07-15 20:50:10 +00:00
information such as the current leader, peers, connection
2016-09-01 16:47:49 +00:00
string to build URLs, etc.
Connections are assigned a "connection ID" which is a pseudo-UUID
for connection identification in trace output only. This helps
sort out what's going on if you have multiple connections going
2017-07-15 20:50:10 +00:00
at once. It's generated using a non-standards-or-anything-else-compliant
2016-09-01 16:47:49 +00:00
function that uses crypto/rand to generate 16 random bytes.
Note that the Connection objection holds info on all peers, gathered
at time of Open() from the node specified.
*/
type Connection struct {
cluster rqliteCluster
/*
2017-07-15 20:50:10 +00:00
name type default
*/
2016-09-01 16:47:49 +00:00
2017-07-15 20:50:10 +00:00
username string // username or ""
password string // username or ""
consistencyLevel consistencyLevel // WEAK
wantsHTTPS bool // false unless connection URL is https
2016-09-01 16:47:49 +00:00
// variables below this line need to be initialized in Open()
2017-07-15 20:50:10 +00:00
timeout int // 10
hasBeenClosed bool // false
ID string // generated in init()
2016-09-01 16:47:49 +00:00
}
/* *****************************************************************
method: Connection.Close()
* *****************************************************************/
func (conn *Connection) Close() {
conn.hasBeenClosed = true
2017-07-15 20:50:10 +00:00
trace("%s: %s", conn.ID, "closing connection")
2016-09-01 16:47:49 +00:00
}
/* *****************************************************************
method: Connection.ConsistencyLevel()
* *****************************************************************/
func (conn *Connection) ConsistencyLevel() (string, error) {
2017-07-15 20:50:10 +00:00
if conn.hasBeenClosed {
return "", errClosed
2016-09-01 16:47:49 +00:00
}
return consistencyLevelNames[conn.consistencyLevel], nil
}
/* *****************************************************************
method: Connection.Leader()
* *****************************************************************/
func (conn *Connection) Leader() (string, error) {
2017-07-15 20:50:10 +00:00
if conn.hasBeenClosed {
return "", errClosed
2016-09-01 16:47:49 +00:00
}
2017-07-15 20:50:10 +00:00
trace("%s: Leader(), calling updateClusterInfo()", conn.ID)
2016-09-01 16:47:49 +00:00
err := conn.updateClusterInfo()
2017-07-15 20:50:10 +00:00
if err != nil {
trace("%s: Leader() got error from updateClusterInfo(): %s", conn.ID, err.Error())
2016-09-01 16:47:49 +00:00
return "", err
} else {
2017-07-15 20:50:10 +00:00
trace("%s: Leader(), updateClusterInfo() OK", conn.ID)
2016-09-01 16:47:49 +00:00
}
return conn.cluster.leader.String(), nil
}
/* *****************************************************************
method: Connection.Peers()
* *****************************************************************/
func (conn *Connection) Peers() ([]string, error) {
2017-07-15 20:50:10 +00:00
if conn.hasBeenClosed {
2016-09-01 16:47:49 +00:00
var ans []string
2017-07-15 20:50:10 +00:00
return ans, errClosed
2016-09-01 16:47:49 +00:00
}
2017-07-15 20:50:10 +00:00
plist := make([]string, 0)
2016-09-01 16:47:49 +00:00
2017-07-15 20:50:10 +00:00
trace("%s: Peers(), calling updateClusterInfo()", conn.ID)
2016-09-01 16:47:49 +00:00
err := conn.updateClusterInfo()
2017-07-15 20:50:10 +00:00
if err != nil {
trace("%s: Peers() got error from updateClusterInfo(): %s", conn.ID, err.Error())
2016-09-01 16:47:49 +00:00
return plist, err
} else {
2017-07-15 20:50:10 +00:00
trace("%s: Peers(), updateClusterInfo() OK", conn.ID)
2016-09-01 16:47:49 +00:00
}
2017-07-15 20:50:10 +00:00
plist = append(plist, conn.cluster.leader.String())
2016-09-01 16:47:49 +00:00
for _, p := range conn.cluster.otherPeers {
2017-07-15 20:50:10 +00:00
plist = append(plist, p.String())
2016-09-01 16:47:49 +00:00
}
return plist, nil
}
/* *****************************************************************
method: Connection.SetConsistencyLevel()
* *****************************************************************/
func (conn *Connection) SetConsistencyLevel(levelDesired string) error {
2017-07-15 20:50:10 +00:00
if conn.hasBeenClosed {
return errClosed
2016-09-01 16:47:49 +00:00
}
_, ok := consistencyLevels[levelDesired]
2017-07-15 20:50:10 +00:00
if ok {
2016-09-01 16:47:49 +00:00
conn.consistencyLevel = consistencyLevels[levelDesired]
return nil
}
2017-07-15 20:50:10 +00:00
return errors.New(fmt.Sprintf("unknown consistency level: %s", levelDesired))
2016-09-01 16:47:49 +00:00
}
/* *****************************************************************
method: Connection.initConnection()
* *****************************************************************/
/*
initConnection takes the initial connection URL specified by
the user, and parses it into a peer. This peer is assumed to
be the leader. The next thing Open() does is updateClusterInfo()
so the truth will be revealed soon enough.
2017-07-15 20:50:10 +00:00
initConnection() does not talk to rqlite. It only parses the
2016-09-01 16:47:49 +00:00
connection URL and prepares the new connection for work.
URL format:
http[s]://${USER}:${PASSWORD}@${HOSTNAME}:${PORT}/db?[OPTIONS]
Examples:
https://mary:secret2@localhost:4001/db
https://mary:secret2@server1.example.com:4001/db?level=none
https://mary:secret2@server2.example.com:4001/db?level=weak
https://mary:secret2@localhost:2265/db?level=strong
to use default connection to localhost:4001 with no auth:
http://
https://
guaranteed map fields - will be set to "" if not specified
field name default if not specified
username ""
password ""
hostname "localhost"
port "4001"
consistencyLevel "weak"
*/
func (conn *Connection) initConnection(url string) error {
// do some sanity checks. You know users.
2017-07-15 20:50:10 +00:00
if len(url) < 7 {
2016-09-01 16:47:49 +00:00
return errors.New("url specified is impossibly short")
}
2017-07-15 20:50:10 +00:00
if strings.HasPrefix(url, "http") == false {
2016-09-01 16:47:49 +00:00
return errors.New("url does not start with 'http'")
}
u, err := nurl.Parse(url)
2017-07-15 20:50:10 +00:00
if err != nil {
2016-09-01 16:47:49 +00:00
return err
}
2017-07-15 20:50:10 +00:00
trace("%s: net.url.Parse() OK", conn.ID)
2016-09-01 16:47:49 +00:00
2017-07-15 20:50:10 +00:00
if u.Scheme == "https" {
2016-09-01 16:47:49 +00:00
conn.wantsHTTPS = true
}
// specs say Username() is always populated even if empty
if u.User == nil {
conn.username = ""
conn.password = ""
} else {
// guaranteed, but could be empty which is ok
conn.username = u.User.Username()
// not guaranteed, so test if set
pass, isset := u.User.Password()
2017-07-15 20:50:10 +00:00
if isset {
2016-09-01 16:47:49 +00:00
conn.password = pass
} else {
conn.password = ""
}
}
2017-07-15 20:50:10 +00:00
if u.Host == "" {
2016-09-01 16:47:49 +00:00
conn.cluster.leader.hostname = "localhost"
} else {
conn.cluster.leader.hostname = u.Host
}
2017-07-15 20:50:10 +00:00
if u.Host == "" {
2016-09-01 16:47:49 +00:00
conn.cluster.leader.hostname = "localhost"
conn.cluster.leader.port = "4001"
} else {
// SplitHostPort() should only return an error if there is no host port.
// I think.
h, p, err := net.SplitHostPort(u.Host)
2017-07-15 20:50:10 +00:00
if err != nil {
2016-09-01 16:47:49 +00:00
conn.cluster.leader.hostname = u.Host
} else {
conn.cluster.leader.hostname = h
conn.cluster.leader.port = p
}
}
/*
2017-07-15 20:50:10 +00:00
at the moment, the only allowed query is "level=" with
the desired consistency level
2016-09-01 16:47:49 +00:00
*/
// default
conn.consistencyLevel = cl_WEAK
2017-07-15 20:50:10 +00:00
if u.RawQuery != "" {
if u.RawQuery == "level=weak" {
2016-09-01 16:47:49 +00:00
// that's ok but nothing to do
2017-07-15 20:50:10 +00:00
} else if u.RawQuery == "level=strong" {
2016-09-01 16:47:49 +00:00
conn.consistencyLevel = cl_STRONG
2017-07-15 20:50:10 +00:00
} else if u.RawQuery == "level=none" { // the fools!
2016-09-01 16:47:49 +00:00
conn.consistencyLevel = cl_NONE
} else {
return errors.New("don't know what to do with this query: " + u.RawQuery)
}
}
2017-07-15 20:50:10 +00:00
trace("%s: parseDefaultPeer() is done:", conn.ID)
if conn.wantsHTTPS == true {
trace("%s: %s -> %s", conn.ID, "wants https?", "yes")
2016-09-01 16:47:49 +00:00
} else {
2017-07-15 20:50:10 +00:00
trace("%s: %s -> %s", conn.ID, "wants https?", "no")
2016-09-01 16:47:49 +00:00
}
2017-07-15 20:50:10 +00:00
trace("%s: %s -> %s", conn.ID, "username", conn.username)
trace("%s: %s -> %s", conn.ID, "password", conn.password)
trace("%s: %s -> %s", conn.ID, "hostname", conn.cluster.leader.hostname)
trace("%s: %s -> %s", conn.ID, "port", conn.cluster.leader.port)
trace("%s: %s -> %s", conn.ID, "consistencyLevel", consistencyLevelNames[conn.consistencyLevel])
2016-09-01 16:47:49 +00:00
conn.cluster.conn = conn
return nil
}