route/lib/tun2/server.go

422 lines
8.0 KiB
Go
Raw Normal View History

package tun2
import (
2017-03-26 20:38:05 +00:00
"context"
"crypto/tls"
"encoding/json"
"errors"
"math/rand"
"net"
"net/http"
"sync"
2017-03-26 22:14:13 +00:00
"time"
"github.com/Xe/ln"
failure "github.com/dgryski/go-failure"
"github.com/mtneug/pkg/ulid"
2017-03-27 04:56:54 +00:00
cmap "github.com/streamrail/concurrent-map"
kcp "github.com/xtaci/kcp-go"
"github.com/xtaci/smux"
)
2017-04-28 22:23:26 +00:00
// Error values
var (
ErrNoSuchBackend = errors.New("tun2: there is no such backend")
ErrAuthMismatch = errors.New("tun2: authenication doesn't match database records")
ErrCantRemoveWhatDoesntExist = errors.New("tun2: this connection does not exist, cannot remove it")
)
// ServerConfig ...
type ServerConfig struct {
TCPAddr string
KCPAddr string
TLSConfig *tls.Config
SmuxConf *smux.Config
Storage Storage
}
2017-04-28 22:23:26 +00:00
// Storage is the minimal subset of features that tun2's Server needs out of a
// persistence layer.
type Storage interface {
2017-04-28 22:23:26 +00:00
HasToken(token string) (user string, scopes []string, err error)
HasRoute(domain string) (user string, err error)
}
2017-04-28 22:23:26 +00:00
// Server routes frontend HTTP traffic to backend TCP traffic.
type Server struct {
cfg *ServerConfig
connlock sync.Mutex
conns map[net.Conn]*Connection
2017-03-27 04:56:54 +00:00
domains cmap.ConcurrentMap
}
2017-04-28 22:23:26 +00:00
// NewServer creates a new Server instance with a given config, acquiring all
// relevant resources.
func NewServer(cfg *ServerConfig) (*Server, error) {
if cfg == nil {
return nil, errors.New("tun2: config must be specified")
}
if cfg.SmuxConf == nil {
cfg.SmuxConf = smux.DefaultConfig()
}
cfg.SmuxConf.KeepAliveInterval = time.Second
cfg.SmuxConf.KeepAliveTimeout = 15 * time.Second
server := &Server{
cfg: cfg,
conns: map[net.Conn]*Connection{},
2017-03-27 04:56:54 +00:00
domains: cmap.New(),
}
return server, nil
}
2017-04-28 22:23:26 +00:00
// ListenAndServe starts the backend TCP/KCP listeners and relays backend
// traffic to and from them.
func (s *Server) ListenAndServe() error {
ln.Log(ln.F{
"action": "listen_and_serve_called",
})
if s.cfg.TCPAddr != "" {
go func() {
l, err := tls.Listen("tcp", s.cfg.TCPAddr, s.cfg.TLSConfig)
if err != nil {
panic(err)
}
ln.Log(ln.F{
"action": "tcp+tls_listening",
"addr": l.Addr(),
})
for {
conn, err := l.Accept()
if err != nil {
ln.Error(err, ln.F{"kind": "tcp", "addr": l.Addr().String()})
continue
}
2017-03-26 20:23:52 +00:00
ln.Log(ln.F{
"action": "new_client",
"kcp": false,
"addr": conn.RemoteAddr(),
})
go s.HandleConn(conn, false)
}
}()
}
if s.cfg.KCPAddr != "" {
go func() {
l, err := kcp.Listen(s.cfg.KCPAddr)
if err != nil {
panic(err)
}
ln.Log(ln.F{
"action": "kcp+tls_listening",
"addr": l.Addr(),
})
for {
conn, err := l.Accept()
if err != nil {
ln.Error(err, ln.F{"kind": "kcp", "addr": l.Addr().String()})
}
2017-03-26 20:23:52 +00:00
ln.Log(ln.F{
"action": "new_client",
"kcp": true,
"addr": conn.RemoteAddr(),
})
tc := tls.Server(conn, s.cfg.TLSConfig)
go s.HandleConn(tc, true)
}
}()
}
2017-04-28 22:23:26 +00:00
// XXX experimental, might get rid of this inside this process
2017-03-26 22:14:13 +00:00
go func() {
for {
time.Sleep(time.Second)
now := time.Now()
s.connlock.Lock()
for _, c := range s.conns {
failureChance := c.detector.Phi(now)
if failureChance > 0.8 {
ln.Log(c.F(), ln.F{
"action": "phi_failure_detection",
"value": failureChance,
})
}
}
2017-04-05 21:33:54 +00:00
s.connlock.Unlock()
2017-03-27 05:19:43 +00:00
}
}()
2017-03-26 22:14:13 +00:00
2017-03-27 05:19:43 +00:00
return nil
}
2017-03-26 23:16:39 +00:00
2017-04-28 22:23:26 +00:00
// HandleConn starts up the needed mechanisms to relay HTTP traffic to/from
// the currently connected backend.
func (s *Server) HandleConn(c net.Conn, isKCP bool) {
2017-04-28 22:23:26 +00:00
// XXX TODO clean this up it's really ugly.
defer c.Close()
2017-03-26 20:38:05 +00:00
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
session, err := smux.Server(c, s.cfg.SmuxConf)
if err != nil {
ln.Error(err, ln.F{
"action": "session_failure",
"local": c.LocalAddr().String(),
"remote": c.RemoteAddr().String(),
})
c.Close()
return
}
2017-04-28 22:23:26 +00:00
defer session.Close()
controlStream, err := session.OpenStream()
if err != nil {
ln.Error(err, ln.F{
2017-03-26 22:19:33 +00:00
"action": "control_stream_failure",
"local": c.LocalAddr().String(),
"remote": c.RemoteAddr().String(),
})
return
}
2017-04-28 22:23:26 +00:00
defer controlStream.Close()
csd := json.NewDecoder(controlStream)
auth := &Auth{}
err = csd.Decode(auth)
if err != nil {
ln.Error(err, ln.F{
"action": "control_stream_auth_decoding_failure",
"local": c.LocalAddr().String(),
"remote": c.RemoteAddr().String(),
})
return
}
2017-04-28 22:23:26 +00:00
routeUser, err := s.cfg.Storage.HasRoute(auth.Domain)
if err != nil {
ln.Error(err, ln.F{
"action": "nosuch_domain",
"local": c.LocalAddr().String(),
"remote": c.RemoteAddr().String(),
})
return
}
2017-04-28 22:23:26 +00:00
tokenUser, scopes, err := s.cfg.Storage.HasToken(auth.Token)
if err != nil {
ln.Error(err, ln.F{
2017-04-28 22:23:26 +00:00
"action": "nosuch_token",
"local": c.LocalAddr().String(),
"remote": c.RemoteAddr().String(),
})
2017-04-28 22:23:26 +00:00
return
}
ok := false
for _, sc := range scopes {
if sc == "connect" {
ok = true
break
}
}
2017-04-28 22:23:26 +00:00
if !ok {
ln.Error(ErrAuthMismatch, ln.F{
"action": "token_not_authorized",
"local": c.LocalAddr().String(),
"remote": c.RemoteAddr().String(),
})
}
if routeUser != tokenUser {
ln.Error(ErrAuthMismatch, ln.F{
"action": "auth_mismatch",
"local": c.LocalAddr().String(),
"remote": c.RemoteAddr().String(),
})
return
}
connection := &Connection{
id: ulid.New().String(),
conn: c,
isKCP: isKCP,
session: session,
2017-04-28 22:23:26 +00:00
user: tokenUser,
domain: auth.Domain,
cancel: cancel,
detector: failure.New(15, 1),
Auth: auth,
}
2017-03-26 20:18:58 +00:00
ln.Log(ln.F{
"action": "backend_connected",
2017-03-26 22:30:19 +00:00
}, connection.F())
2017-03-26 20:18:58 +00:00
s.connlock.Lock()
s.conns[c] = connection
s.connlock.Unlock()
2017-03-27 04:56:54 +00:00
var conns []*Connection
val, ok := s.domains.Get(auth.Domain)
if ok {
conns, ok = val.([]*Connection)
if !ok {
conns = nil
s.domains.Remove(auth.Domain)
}
}
conns = append(conns, connection)
s.domains.Set(auth.Domain, conns)
2017-03-26 20:38:05 +00:00
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := connection.Ping()
if err != nil {
cancel()
2017-03-27 04:56:54 +00:00
}
case <-ctx.Done():
s.RemoveConn(connection)
2017-04-05 22:04:02 +00:00
connection.Close()
2017-03-26 20:38:05 +00:00
2017-04-05 22:04:02 +00:00
return
}
}
}
2017-03-26 20:38:05 +00:00
2017-04-28 22:23:26 +00:00
// RemoveConn removes a connection.
func (s *Server) RemoveConn(connection *Connection) {
2017-04-05 22:04:02 +00:00
s.connlock.Lock()
delete(s.conns, connection.conn)
s.connlock.Unlock()
2017-03-26 20:38:05 +00:00
auth := connection.Auth
2017-04-05 22:04:02 +00:00
var conns []*Connection
2017-03-26 20:38:05 +00:00
2017-04-05 22:04:02 +00:00
val, ok := s.domains.Get(auth.Domain)
if ok {
conns, ok = val.([]*Connection)
if !ok {
2017-04-28 22:23:26 +00:00
ln.Error(ErrCantRemoveWhatDoesntExist, connection.F(), ln.F{
2017-04-05 22:04:02 +00:00
"action": "looking_up_for_disconnect_removal",
})
return
}
2017-03-26 20:38:05 +00:00
}
2017-04-05 22:04:02 +00:00
for i, cntn := range conns {
if cntn.id == connection.id {
conns[i] = conns[len(conns)-1]
conns = conns[:len(conns)-1]
}
}
if len(conns) != 0 {
s.domains.Set(auth.Domain, conns)
} else {
s.domains.Remove(auth.Domain)
}
ln.Log(connection.F(), ln.F{
"action": "client_disconnecting",
})
}
2017-04-28 22:23:26 +00:00
// RoundTrip sends a HTTP request to a backend and then returns its response.
func (s *Server) RoundTrip(req *http.Request) (*http.Response, error) {
2017-03-27 04:56:54 +00:00
var conns []*Connection
val, ok := s.domains.Get(req.Host)
if ok {
conns, ok = val.([]*Connection)
2017-03-27 05:03:06 +00:00
if !ok {
2017-04-28 22:23:26 +00:00
ln.Error(ErrNoSuchBackend, ln.F{
2017-03-27 04:56:54 +00:00
"action": "no_backend_connected",
"remote": req.RemoteAddr,
"host": req.Host,
"uri": req.RequestURI,
})
2017-04-28 22:23:26 +00:00
return nil, ErrNoSuchBackend
2017-03-27 04:56:54 +00:00
}
}
2017-03-27 05:03:06 +00:00
if len(conns) == 0 {
2017-04-28 22:23:26 +00:00
ln.Error(ErrNoSuchBackend, ln.F{
2017-03-27 05:50:45 +00:00
"action": "no_backend_connected",
"remote": req.RemoteAddr,
"host": req.Host,
"uri": req.RequestURI,
})
2017-04-28 22:23:26 +00:00
return nil, ErrNoSuchBackend
2017-03-27 05:03:06 +00:00
}
c := conns[rand.Intn(len(conns))]
resp, err := c.RoundTrip(req)
if err != nil {
ln.Error(err, c.F(), ln.F{
"action": "connection_roundtrip",
})
2017-03-26 20:47:42 +00:00
defer c.cancel()
return nil, err
}
2017-03-26 22:36:58 +00:00
ln.Log(c.F(), ln.F{
"action": "http_traffic",
"remote_addr": req.RemoteAddr,
"host": req.Host,
"uri": req.RequestURI,
})
return resp, nil
}
2017-04-28 22:23:26 +00:00
// Auth is the authentication info the client passes to the server.
type Auth struct {
Token string `json:"token"`
Domain string `json:"domain"`
}
const defaultUser = "Cadey"