gorqlite/cluster.go

224 lines
5.9 KiB
Go
Raw Permalink Normal View History

2016-09-01 16:47:49 +00:00
package gorqlite
/*
this file holds most of the cluster-related stuff:
types:
peer
2017-07-15 20:50:10 +00:00
rqliteCluster
2016-09-01 16:47:49 +00:00
Connection methods:
assembleURL (from a peer)
updateClusterInfo (does the full cluster discovery via status)
*/
/* *****************************************************************
imports
* *****************************************************************/
import "bytes"
import "encoding/json"
import "errors"
import "fmt"
import "strings"
//import "os"
//import "reflect"
/* *****************************************************************
type: peer
this is an internal type to abstact peer info.
2017-07-15 20:50:10 +00:00
note that hostname is sometimes used for "has this struct been
2016-09-01 16:47:49 +00:00
inialized" checks.
* *****************************************************************/
type peer struct {
2017-07-15 20:50:10 +00:00
hostname string // hostname or "localhost"
port string // "4001" or port, only ever used as a string
2016-09-01 16:47:49 +00:00
}
func (p *peer) String() string {
2017-07-15 20:50:10 +00:00
return fmt.Sprintf("%s:%s", p.hostname, p.port)
2016-09-01 16:47:49 +00:00
}
/* *****************************************************************
type: rqliteCluster
internal type that abstracts the full cluster state (leader, peers)
* *****************************************************************/
type rqliteCluster struct {
2017-07-15 20:50:10 +00:00
leader peer
2016-09-01 16:47:49 +00:00
otherPeers []peer
2017-07-15 20:50:10 +00:00
conn *Connection
2016-09-01 16:47:49 +00:00
}
/* *****************************************************************
method: rqliteCluster.makePeerList()
in the api calls, we'll want to try the leader first, then the other
peers. to make looping easy, this function returns a list of peers
in the order the try them: leader, other peer, other peer, etc.
* *****************************************************************/
func (rc *rqliteCluster) makePeerList() []peer {
2017-07-15 20:50:10 +00:00
trace("%s: makePeerList() called", rc.conn.ID)
2016-09-01 16:47:49 +00:00
var peerList []peer
2017-07-15 20:50:10 +00:00
peerList = append(peerList, rc.leader)
2016-09-01 16:47:49 +00:00
for _, p := range rc.otherPeers {
2017-07-15 20:50:10 +00:00
peerList = append(peerList, p)
2016-09-01 16:47:49 +00:00
}
2017-07-15 20:50:10 +00:00
trace("%s: makePeerList() returning this list:", rc.conn.ID)
2016-09-01 16:47:49 +00:00
for n, v := range peerList {
2017-07-15 20:50:10 +00:00
trace("%s: makePeerList() peer %d -> %s", rc.conn.ID, n, v.hostname+":"+v.port)
2016-09-01 16:47:49 +00:00
}
return peerList
}
/* *****************************************************************
method: Connection.assembleURL()
tell it what peer to talk to and what kind of API operation you're
making, and it will return the full URL, from start to finish.
e.g.:
https://mary:secret2@server1.example.com:1234/db/query?transaction&level=strong
note: this func needs to live at the Connection level because the
Connection holds the username, password, consistencyLevel, etc.
* *****************************************************************/
func (conn *Connection) assembleURL(apiOp apiOperation, p peer) string {
var stringBuffer bytes.Buffer
2017-07-15 20:50:10 +00:00
if conn.wantsHTTPS == true {
2016-09-01 16:47:49 +00:00
stringBuffer.WriteString("https")
} else {
stringBuffer.WriteString("http")
}
stringBuffer.WriteString("://")
2017-07-15 20:50:10 +00:00
if conn.username != "" && conn.password != "" {
2016-09-01 16:47:49 +00:00
stringBuffer.WriteString(conn.username)
stringBuffer.WriteString(":")
stringBuffer.WriteString(conn.password)
stringBuffer.WriteString("@")
}
stringBuffer.WriteString(p.hostname)
stringBuffer.WriteString(":")
stringBuffer.WriteString(p.port)
switch apiOp {
2017-07-15 20:50:10 +00:00
case api_STATUS:
stringBuffer.WriteString("/status")
case api_QUERY:
stringBuffer.WriteString("/db/query")
case api_WRITE:
stringBuffer.WriteString("/db/execute")
2016-09-01 16:47:49 +00:00
}
2017-07-15 20:50:10 +00:00
if apiOp == api_QUERY || apiOp == api_WRITE {
2016-09-01 16:47:49 +00:00
stringBuffer.WriteString("?timings&transaction&level=")
stringBuffer.WriteString(consistencyLevelNames[conn.consistencyLevel])
}
switch apiOp {
2017-07-15 20:50:10 +00:00
case api_QUERY:
trace("%s: assembled URL for an api_QUERY: %s", conn.ID, stringBuffer.String())
case api_STATUS:
trace("%s: assembled URL for an api_STATUS: %s", conn.ID, stringBuffer.String())
case api_WRITE:
trace("%s: assembled URL for an api_WRITE: %s", conn.ID, stringBuffer.String())
2016-09-01 16:47:49 +00:00
}
return stringBuffer.String()
}
/* *****************************************************************
method: Connection.updateClusterInfo()
upon invocation, updateClusterInfo() completely erases and refreshes
the Connection's cluster info, replacing its rqliteCluster object
with current info.
the web heavy lifting (retrying, etc.) is done in rqliteApiGet()
* *****************************************************************/
func (conn *Connection) updateClusterInfo() error {
2017-07-15 20:50:10 +00:00
trace("%s: updateClusterInfo() called", conn.ID)
2016-09-01 16:47:49 +00:00
// start with a fresh new cluster
var rc rqliteCluster
rc.conn = conn
responseBody, err := conn.rqliteApiGet(api_STATUS)
2017-07-15 20:50:10 +00:00
if err != nil {
return err
}
trace("%s: updateClusterInfo() back from api call OK", conn.ID)
2016-09-01 16:47:49 +00:00
sections := make(map[string]interface{})
2017-07-15 20:50:10 +00:00
err = json.Unmarshal(responseBody, &sections)
if err != nil {
return err
}
2016-09-01 16:47:49 +00:00
sMap := sections["store"].(map[string]interface{})
leaderRaftAddr := sMap["leader"].(string)
2017-07-15 20:50:10 +00:00
trace("%s: leader from store section is %s", conn.ID, leaderRaftAddr)
2016-09-01 16:47:49 +00:00
// leader in this case is the RAFT address
// we want the HTTP address, so we'll use this as
// a key as we sift through APIPeers
meta := sMap["meta"].(map[string]interface{})
apiPeers := meta["APIPeers"].(map[string]interface{})
for raftAddr, httpAddr := range apiPeers {
2017-07-15 20:50:10 +00:00
trace("%s: examining httpAddr %s", conn.ID, httpAddr)
2016-09-01 16:47:49 +00:00
/* httpAddr are usually hostname:port */
var p peer
2017-07-15 20:50:10 +00:00
parts := strings.Split(httpAddr.(string), ":")
2016-09-01 16:47:49 +00:00
p.hostname = parts[0]
p.port = parts[1]
// so is this the leader?
2017-07-15 20:50:10 +00:00
if leaderRaftAddr == raftAddr {
trace("%s: found leader at %s", conn.ID, httpAddr)
2016-09-01 16:47:49 +00:00
rc.leader = p
} else {
rc.otherPeers = append(rc.otherPeers, p)
}
}
2017-07-15 20:50:10 +00:00
if rc.leader.hostname == "" {
2016-09-01 16:47:49 +00:00
return errors.New("could not determine leader from API status call")
}
// dump to trace
2017-07-15 20:50:10 +00:00
trace("%s: here is my cluster config:", conn.ID)
trace("%s: leader : %s", conn.ID, rc.leader.String())
2016-09-01 16:47:49 +00:00
for n, v := range rc.otherPeers {
2017-07-15 20:50:10 +00:00
trace("%s: otherPeer #%d: %s", conn.ID, n, v.String())
}
2016-09-01 16:47:49 +00:00
// now make it official
conn.cluster = rc
return nil
}