tun2: make Server better
This commit is contained in:
parent
96e2e399bc
commit
e8acea0351
|
@ -5,14 +5,12 @@ import (
|
|||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.xeserv.us/xena/route/database"
|
||||
"github.com/Xe/ln"
|
||||
failure "github.com/dgryski/go-failure"
|
||||
"github.com/mtneug/pkg/ulid"
|
||||
|
@ -21,6 +19,14 @@ import (
|
|||
"github.com/xtaci/smux"
|
||||
)
|
||||
|
||||
// Error values
|
||||
var (
|
||||
ErrNoSuchBackend = errors.New("tun2: there is no such backend")
|
||||
ErrAuthMismatch = errors.New("tun2: authenication doesn't match database records")
|
||||
ErrCantRemoveWhatDoesntExist = errors.New("tun2: this connection does not exist, cannot remove it")
|
||||
)
|
||||
|
||||
// ServerConfig ...
|
||||
type ServerConfig struct {
|
||||
TCPAddr string
|
||||
KCPAddr string
|
||||
|
@ -30,11 +36,14 @@ type ServerConfig struct {
|
|||
Storage Storage
|
||||
}
|
||||
|
||||
// Storage is the minimal subset of features that tun2's Server needs out of a
|
||||
// persistence layer.
|
||||
type Storage interface {
|
||||
GetRouteForHost(name string) (*database.Route, error)
|
||||
//ValidateToken(token string) (username string, ok bool, err error) // XXX RIP implement when users are implemented
|
||||
HasToken(token string) (user string, scopes []string, err error)
|
||||
HasRoute(domain string) (user string, err error)
|
||||
}
|
||||
|
||||
// Server routes frontend HTTP traffic to backend TCP traffic.
|
||||
type Server struct {
|
||||
cfg *ServerConfig
|
||||
|
||||
|
@ -44,6 +53,8 @@ type Server struct {
|
|||
domains cmap.ConcurrentMap
|
||||
}
|
||||
|
||||
// NewServer creates a new Server instance with a given config, acquiring all
|
||||
// relevant resources.
|
||||
func NewServer(cfg *ServerConfig) (*Server, error) {
|
||||
if cfg == nil {
|
||||
return nil, errors.New("tun2: config must be specified")
|
||||
|
@ -66,6 +77,8 @@ func NewServer(cfg *ServerConfig) (*Server, error) {
|
|||
return server, nil
|
||||
}
|
||||
|
||||
// ListenAndServe starts the backend TCP/KCP listeners and relays backend
|
||||
// traffic to and from them.
|
||||
func (s *Server) ListenAndServe() error {
|
||||
ln.Log(ln.F{
|
||||
"action": "listen_and_serve_called",
|
||||
|
@ -132,6 +145,7 @@ func (s *Server) ListenAndServe() error {
|
|||
}()
|
||||
}
|
||||
|
||||
// XXX experimental, might get rid of this inside this process
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(time.Second)
|
||||
|
@ -156,7 +170,12 @@ func (s *Server) ListenAndServe() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// HandleConn starts up the needed mechanisms to relay HTTP traffic to/from
|
||||
// the currently connected backend.
|
||||
func (s *Server) HandleConn(c net.Conn, isKCP bool) {
|
||||
// XXX TODO clean this up it's really ugly.
|
||||
defer c.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
|
@ -172,6 +191,7 @@ func (s *Server) HandleConn(c net.Conn, isKCP bool) {
|
|||
|
||||
return
|
||||
}
|
||||
defer session.Close()
|
||||
|
||||
controlStream, err := session.OpenStream()
|
||||
if err != nil {
|
||||
|
@ -181,11 +201,9 @@ func (s *Server) HandleConn(c net.Conn, isKCP bool) {
|
|||
"remote": c.RemoteAddr().String(),
|
||||
})
|
||||
|
||||
session.Close()
|
||||
c.Close()
|
||||
|
||||
return
|
||||
}
|
||||
defer controlStream.Close()
|
||||
|
||||
csd := json.NewDecoder(controlStream)
|
||||
auth := &Auth{}
|
||||
|
@ -197,14 +215,10 @@ func (s *Server) HandleConn(c net.Conn, isKCP bool) {
|
|||
"remote": c.RemoteAddr().String(),
|
||||
})
|
||||
|
||||
controlStream.Close()
|
||||
session.Close()
|
||||
c.Close()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
route, err := s.cfg.Storage.GetRouteForHost(auth.Domain)
|
||||
routeUser, err := s.cfg.Storage.HasRoute(auth.Domain)
|
||||
if err != nil {
|
||||
ln.Error(err, ln.F{
|
||||
"action": "nosuch_domain",
|
||||
|
@ -212,25 +226,42 @@ func (s *Server) HandleConn(c net.Conn, isKCP bool) {
|
|||
"remote": c.RemoteAddr().String(),
|
||||
})
|
||||
|
||||
controlStream.Close()
|
||||
session.Close()
|
||||
c.Close()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if route.Token != auth.Token {
|
||||
tokenUser, scopes, err := s.cfg.Storage.HasToken(auth.Token)
|
||||
if err != nil {
|
||||
ln.Error(err, ln.F{
|
||||
"action": "bad_token",
|
||||
"action": "nosuch_token",
|
||||
"local": c.LocalAddr().String(),
|
||||
"remote": c.RemoteAddr().String(),
|
||||
})
|
||||
|
||||
fmt.Fprintln(controlStream, "bad token")
|
||||
return
|
||||
}
|
||||
|
||||
controlStream.Close()
|
||||
session.Close()
|
||||
c.Close()
|
||||
ok := false
|
||||
for _, sc := range scopes {
|
||||
if sc == "connect" {
|
||||
ok = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !ok {
|
||||
ln.Error(ErrAuthMismatch, ln.F{
|
||||
"action": "token_not_authorized",
|
||||
"local": c.LocalAddr().String(),
|
||||
"remote": c.RemoteAddr().String(),
|
||||
})
|
||||
}
|
||||
|
||||
if routeUser != tokenUser {
|
||||
ln.Error(ErrAuthMismatch, ln.F{
|
||||
"action": "auth_mismatch",
|
||||
"local": c.LocalAddr().String(),
|
||||
"remote": c.RemoteAddr().String(),
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -240,7 +271,7 @@ func (s *Server) HandleConn(c net.Conn, isKCP bool) {
|
|||
conn: c,
|
||||
isKCP: isKCP,
|
||||
session: session,
|
||||
user: defaultUser, // XXX RIP replace this with the actual token user once users are implemented
|
||||
user: tokenUser,
|
||||
domain: auth.Domain,
|
||||
cancel: cancel,
|
||||
detector: failure.New(15, 1),
|
||||
|
@ -290,7 +321,7 @@ func (s *Server) HandleConn(c net.Conn, isKCP bool) {
|
|||
}
|
||||
}
|
||||
|
||||
// RemoveConn removes a connection
|
||||
// RemoveConn removes a connection.
|
||||
func (s *Server) RemoveConn(connection *Connection) {
|
||||
s.connlock.Lock()
|
||||
delete(s.conns, connection.conn)
|
||||
|
@ -304,7 +335,7 @@ func (s *Server) RemoveConn(connection *Connection) {
|
|||
if ok {
|
||||
conns, ok = val.([]*Connection)
|
||||
if !ok {
|
||||
ln.Error(errors.New("fundamental assertion is not met"), connection.F(), ln.F{
|
||||
ln.Error(ErrCantRemoveWhatDoesntExist, connection.F(), ln.F{
|
||||
"action": "looking_up_for_disconnect_removal",
|
||||
})
|
||||
return
|
||||
|
@ -329,6 +360,7 @@ func (s *Server) RemoveConn(connection *Connection) {
|
|||
})
|
||||
}
|
||||
|
||||
// RoundTrip sends a HTTP request to a backend and then returns its response.
|
||||
func (s *Server) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
var conns []*Connection
|
||||
|
||||
|
@ -336,26 +368,26 @@ func (s *Server) RoundTrip(req *http.Request) (*http.Response, error) {
|
|||
if ok {
|
||||
conns, ok = val.([]*Connection)
|
||||
if !ok {
|
||||
ln.Error(errors.New("no backend connected"), ln.F{
|
||||
ln.Error(ErrNoSuchBackend, ln.F{
|
||||
"action": "no_backend_connected",
|
||||
"remote": req.RemoteAddr,
|
||||
"host": req.Host,
|
||||
"uri": req.RequestURI,
|
||||
})
|
||||
|
||||
return nil, errors.New("no backend connected")
|
||||
return nil, ErrNoSuchBackend
|
||||
}
|
||||
}
|
||||
|
||||
if len(conns) == 0 {
|
||||
ln.Error(errors.New("no backend connected"), ln.F{
|
||||
ln.Error(ErrNoSuchBackend, ln.F{
|
||||
"action": "no_backend_connected",
|
||||
"remote": req.RemoteAddr,
|
||||
"host": req.Host,
|
||||
"uri": req.RequestURI,
|
||||
})
|
||||
|
||||
return nil, errors.New("no backend connected")
|
||||
return nil, ErrNoSuchBackend
|
||||
}
|
||||
|
||||
c := conns[rand.Intn(len(conns))]
|
||||
|
@ -380,6 +412,7 @@ func (s *Server) RoundTrip(req *http.Request) (*http.Response, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
// Auth is the authentication info the client passes to the server.
|
||||
type Auth struct {
|
||||
Token string `json:"token"`
|
||||
Domain string `json:"domain"`
|
||||
|
|
Loading…
Reference in New Issue