224 lines
5.9 KiB
Go
224 lines
5.9 KiB
Go
package gorqlite
|
|
|
|
/*
|
|
this file holds most of the cluster-related stuff:
|
|
|
|
types:
|
|
peer
|
|
rqliteCluster
|
|
Connection methods:
|
|
assembleURL (from a peer)
|
|
updateClusterInfo (does the full cluster discovery via status)
|
|
*/
|
|
|
|
/* *****************************************************************
|
|
|
|
imports
|
|
|
|
* *****************************************************************/
|
|
|
|
import "bytes"
|
|
import "encoding/json"
|
|
import "errors"
|
|
import "fmt"
|
|
import "strings"
|
|
|
|
//import "os"
|
|
//import "reflect"
|
|
|
|
/* *****************************************************************
|
|
|
|
type: peer
|
|
|
|
this is an internal type to abstact peer info.
|
|
|
|
note that hostname is sometimes used for "has this struct been
|
|
inialized" checks.
|
|
|
|
* *****************************************************************/
|
|
|
|
type peer struct {
|
|
hostname string // hostname or "localhost"
|
|
port string // "4001" or port, only ever used as a string
|
|
}
|
|
|
|
func (p *peer) String() string {
|
|
return fmt.Sprintf("%s:%s", p.hostname, p.port)
|
|
}
|
|
|
|
/* *****************************************************************
|
|
|
|
type: rqliteCluster
|
|
|
|
internal type that abstracts the full cluster state (leader, peers)
|
|
|
|
* *****************************************************************/
|
|
|
|
type rqliteCluster struct {
|
|
leader peer
|
|
otherPeers []peer
|
|
conn *Connection
|
|
}
|
|
|
|
/* *****************************************************************
|
|
|
|
method: rqliteCluster.makePeerList()
|
|
|
|
in the api calls, we'll want to try the leader first, then the other
|
|
peers. to make looping easy, this function returns a list of peers
|
|
in the order the try them: leader, other peer, other peer, etc.
|
|
|
|
* *****************************************************************/
|
|
|
|
func (rc *rqliteCluster) makePeerList() []peer {
|
|
trace("%s: makePeerList() called", rc.conn.ID)
|
|
var peerList []peer
|
|
peerList = append(peerList, rc.leader)
|
|
for _, p := range rc.otherPeers {
|
|
peerList = append(peerList, p)
|
|
}
|
|
|
|
trace("%s: makePeerList() returning this list:", rc.conn.ID)
|
|
for n, v := range peerList {
|
|
trace("%s: makePeerList() peer %d -> %s", rc.conn.ID, n, v.hostname+":"+v.port)
|
|
}
|
|
|
|
return peerList
|
|
}
|
|
|
|
/* *****************************************************************
|
|
|
|
method: Connection.assembleURL()
|
|
|
|
tell it what peer to talk to and what kind of API operation you're
|
|
making, and it will return the full URL, from start to finish.
|
|
e.g.:
|
|
|
|
https://mary:secret2@server1.example.com:1234/db/query?transaction&level=strong
|
|
|
|
note: this func needs to live at the Connection level because the
|
|
Connection holds the username, password, consistencyLevel, etc.
|
|
|
|
* *****************************************************************/
|
|
|
|
func (conn *Connection) assembleURL(apiOp apiOperation, p peer) string {
|
|
var stringBuffer bytes.Buffer
|
|
|
|
if conn.wantsHTTPS == true {
|
|
stringBuffer.WriteString("https")
|
|
} else {
|
|
stringBuffer.WriteString("http")
|
|
}
|
|
stringBuffer.WriteString("://")
|
|
if conn.username != "" && conn.password != "" {
|
|
stringBuffer.WriteString(conn.username)
|
|
stringBuffer.WriteString(":")
|
|
stringBuffer.WriteString(conn.password)
|
|
stringBuffer.WriteString("@")
|
|
}
|
|
stringBuffer.WriteString(p.hostname)
|
|
stringBuffer.WriteString(":")
|
|
stringBuffer.WriteString(p.port)
|
|
|
|
switch apiOp {
|
|
case api_STATUS:
|
|
stringBuffer.WriteString("/status")
|
|
case api_QUERY:
|
|
stringBuffer.WriteString("/db/query")
|
|
case api_WRITE:
|
|
stringBuffer.WriteString("/db/execute")
|
|
}
|
|
|
|
if apiOp == api_QUERY || apiOp == api_WRITE {
|
|
stringBuffer.WriteString("?timings&transaction&level=")
|
|
stringBuffer.WriteString(consistencyLevelNames[conn.consistencyLevel])
|
|
}
|
|
|
|
switch apiOp {
|
|
case api_QUERY:
|
|
trace("%s: assembled URL for an api_QUERY: %s", conn.ID, stringBuffer.String())
|
|
case api_STATUS:
|
|
trace("%s: assembled URL for an api_STATUS: %s", conn.ID, stringBuffer.String())
|
|
case api_WRITE:
|
|
trace("%s: assembled URL for an api_WRITE: %s", conn.ID, stringBuffer.String())
|
|
}
|
|
|
|
return stringBuffer.String()
|
|
}
|
|
|
|
/* *****************************************************************
|
|
|
|
method: Connection.updateClusterInfo()
|
|
|
|
upon invocation, updateClusterInfo() completely erases and refreshes
|
|
the Connection's cluster info, replacing its rqliteCluster object
|
|
with current info.
|
|
|
|
the web heavy lifting (retrying, etc.) is done in rqliteApiGet()
|
|
|
|
* *****************************************************************/
|
|
|
|
func (conn *Connection) updateClusterInfo() error {
|
|
trace("%s: updateClusterInfo() called", conn.ID)
|
|
|
|
// start with a fresh new cluster
|
|
var rc rqliteCluster
|
|
rc.conn = conn
|
|
|
|
responseBody, err := conn.rqliteApiGet(api_STATUS)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
trace("%s: updateClusterInfo() back from api call OK", conn.ID)
|
|
|
|
sections := make(map[string]interface{})
|
|
err = json.Unmarshal(responseBody, §ions)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sMap := sections["store"].(map[string]interface{})
|
|
leaderRaftAddr := sMap["leader"].(string)
|
|
trace("%s: leader from store section is %s", conn.ID, leaderRaftAddr)
|
|
|
|
// leader in this case is the RAFT address
|
|
// we want the HTTP address, so we'll use this as
|
|
// a key as we sift through APIPeers
|
|
|
|
meta := sMap["meta"].(map[string]interface{})
|
|
apiPeers := meta["APIPeers"].(map[string]interface{})
|
|
|
|
for raftAddr, httpAddr := range apiPeers {
|
|
trace("%s: examining httpAddr %s", conn.ID, httpAddr)
|
|
|
|
/* httpAddr are usually hostname:port */
|
|
var p peer
|
|
parts := strings.Split(httpAddr.(string), ":")
|
|
p.hostname = parts[0]
|
|
p.port = parts[1]
|
|
|
|
// so is this the leader?
|
|
if leaderRaftAddr == raftAddr {
|
|
trace("%s: found leader at %s", conn.ID, httpAddr)
|
|
rc.leader = p
|
|
} else {
|
|
rc.otherPeers = append(rc.otherPeers, p)
|
|
}
|
|
}
|
|
|
|
if rc.leader.hostname == "" {
|
|
return errors.New("could not determine leader from API status call")
|
|
}
|
|
|
|
// dump to trace
|
|
trace("%s: here is my cluster config:", conn.ID)
|
|
trace("%s: leader : %s", conn.ID, rc.leader.String())
|
|
for n, v := range rc.otherPeers {
|
|
trace("%s: otherPeer #%d: %s", conn.ID, n, v.String())
|
|
}
|
|
|
|
// now make it official
|
|
conn.cluster = rc
|
|
|
|
return nil
|
|
}
|