From 9605eb88bd67c5b906e92f996457bc1ee971b3c0 Mon Sep 17 00:00:00 2001 From: Christine Dodrill Date: Wed, 5 Apr 2017 21:44:12 -0700 Subject: [PATCH] tun2: make Connection into its own file --- lib/tun2/connection.go | 130 ++++++++++++++++++++++++++++++++ lib/tun2/server.go | 165 +++-------------------------------------- 2 files changed, 142 insertions(+), 153 deletions(-) create mode 100644 lib/tun2/connection.go diff --git a/lib/tun2/connection.go b/lib/tun2/connection.go new file mode 100644 index 0000000..c53eefb --- /dev/null +++ b/lib/tun2/connection.go @@ -0,0 +1,130 @@ +package tun2 + +import ( + "bufio" + "context" + "net" + "net/http" + "time" + + "github.com/Xe/ln" + failure "github.com/dgryski/go-failure" + "github.com/pkg/errors" + "github.com/xtaci/smux" +) + +// Connection is a single active client -> server connection and session +// containing many streams over TCP+TLS or KCP+TLS. Every stream beyond the +// control stream is assumed to be passed to the underlying backend server. +type Connection struct { + id string + conn net.Conn + isKCP bool + session *smux.Session + controlStream *smux.Stream + user string + domain string + cancel context.CancelFunc + detector *failure.Detector + Auth *Auth +} + +// F logs key->value pairs as an ln.Fer +func (c *Connection) F() ln.F { + return map[string]interface{}{ + "id": c.id, + "remote": c.conn.RemoteAddr(), + "local": c.conn.LocalAddr(), + "isKCP": c.isKCP, + "user": c.user, + "domain": c.domain, + } +} + +// Ping ends a "ping" to the client. If the client doesn't respond or the connection +// dies, then the connection needs to be cleaned up. +func (c *Connection) Ping() error { + req, err := http.NewRequest("GET", "http://backend/health", nil) + if err != nil { + panic(err) + } + + _, err = c.RoundTrip(req) + if err != nil { + ln.Error(err, c.F(), ln.F{"action": "ping_roundtrip"}) + defer c.cancel() + return err + } + + c.detector.Ping(time.Now()) + + return nil +} + +// OpenStream creates a new stream (connection) to the backend server. +func (c *Connection) OpenStream() (net.Conn, error) { + err := c.conn.SetDeadline(time.Now().Add(time.Second)) + if err != nil { + ln.Error(err, c.F()) + return nil, err + } + + stream, err := c.session.OpenStream() + if err != nil { + ln.Error(err, c.F()) + return nil, err + } + + return stream, c.conn.SetDeadline(time.Time{}) +} + +// Close destroys resouces specific to the connection. +func (c *Connection) Close() error { + err := c.controlStream.Close() + if err != nil { + return err + } + + err = c.session.Close() + if err != nil { + return err + } + + err = c.conn.Close() + if err != nil { + return err + } + + return nil +} + +// Connection-specific errors +var ( + ErrCantOpenSessionStream = errors.New("tun2: connection can't open session stream") + ErrCantWriteRequest = errors.New("tun2: connection stream can't write request") + ErrCantReadResponse = errors.New("tun2: connection stream can't read response") +) + +// RoundTrip forwards a HTTP request to the remote backend and then returns the +// response, if any. +func (c *Connection) RoundTrip(req *http.Request) (*http.Response, error) { + stream, err := c.OpenStream() + if err != nil { + return nil, errors.Wrap(err, ErrCantOpenSessionStream.Error()) + } + defer stream.Close() + + err = req.Write(stream) + if err != nil { + return nil, errors.Wrap(err, ErrCantWriteRequest.Error()) + } + + buf := bufio.NewReader(stream) + + resp, err := http.ReadResponse(buf, req) + if err != nil { + return nil, errors.Wrap(err, ErrCantReadResponse.Error()) + } + + return resp, nil +} diff --git a/lib/tun2/server.go b/lib/tun2/server.go index f8f8cae..646d5a6 100644 --- a/lib/tun2/server.go +++ b/lib/tun2/server.go @@ -1,17 +1,14 @@ package tun2 import ( - "bufio" "context" "crypto/tls" "encoding/json" "errors" "fmt" - "io/ioutil" "math/rand" "net" "net/http" - "strings" "sync" "time" @@ -47,29 +44,6 @@ type Server struct { domains cmap.ConcurrentMap } -type Connection struct { - id string - conn net.Conn - isKCP bool - session *smux.Session - controlStream *smux.Stream - user string - domain string - cancel context.CancelFunc - detector *failure.Detector -} - -func (c *Connection) F() ln.F { - return map[string]interface{}{ - "id": c.id, - "remote": c.conn.RemoteAddr(), - "local": c.conn.LocalAddr(), - "isKCP": c.isKCP, - "user": c.user, - "domain": c.domain, - } -} - func NewServer(cfg *ServerConfig) (*Server, error) { if cfg == nil { return nil, errors.New("tun2: config must be specified") @@ -182,80 +156,6 @@ func (s *Server) ListenAndServe() error { return nil } -// Ping ends a "ping" to the client. If the client doesn't respond or the connection -// dies, then the connection needs to be cleaned up. -func (c *Connection) Ping() error { - req, err := http.NewRequest("GET", "http://backend/health", nil) - if err != nil { - panic(err) - } - - stream, err := c.OpenStream() - if err != nil { - ln.Error(err, c.F()) - defer c.cancel() - return err - } - defer stream.Close() - - stream.SetWriteDeadline(time.Now().Add(time.Second)) - err = req.Write(stream) - if err != nil { - ln.Error(err, c.F()) - defer c.cancel() - return err - } - - stream.SetReadDeadline(time.Now().Add(5 * time.Second)) - _, err = stream.Read(make([]byte, 30)) - if err != nil { - ln.Error(err, c.F()) - defer c.cancel() - return err - } - - c.detector.Ping(time.Now()) - - return nil -} - -// OpenStream creates a new stream (connection) to the backend server. -func (c *Connection) OpenStream() (net.Conn, error) { - err := c.conn.SetDeadline(time.Now().Add(time.Second)) - if err != nil { - ln.Error(err, c.F()) - return nil, err - } - - stream, err := c.session.OpenStream() - if err != nil { - ln.Error(err, c.F()) - return nil, err - } - - return stream, c.conn.SetDeadline(time.Time{}) -} - -// Close destroys resouces specific to the connection. -func (c *Connection) Close() error { - err := c.controlStream.Close() - if err != nil { - return err - } - - err = c.session.Close() - if err != nil { - return err - } - - err = c.conn.Close() - if err != nil { - return err - } - - return nil -} - func (s *Server) HandleConn(c net.Conn, isKCP bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -344,6 +244,7 @@ func (s *Server) HandleConn(c net.Conn, isKCP bool) { domain: auth.Domain, cancel: cancel, detector: failure.New(15, 1), + Auth: auth, } ln.Log(ln.F{ @@ -381,7 +282,7 @@ func (s *Server) HandleConn(c net.Conn, isKCP bool) { cancel() } case <-ctx.Done(): - s.RemoveConn(auth, connection) + s.RemoveConn(connection) connection.Close() return @@ -389,11 +290,14 @@ func (s *Server) HandleConn(c net.Conn, isKCP bool) { } } -func (s *Server) RemoveConn(auth *Auth, connection *Connection) { +// RemoveConn removes a connection +func (s *Server) RemoveConn(connection *Connection) { s.connlock.Lock() delete(s.conns, connection.conn) s.connlock.Unlock() + auth := connection.Auth + var conns []*Connection val, ok := s.domains.Get(auth.Domain) @@ -439,15 +343,7 @@ func (s *Server) RoundTrip(req *http.Request) (*http.Response, error) { "uri": req.RequestURI, }) - resp := &http.Response{ - StatusCode: http.StatusBadGateway, - Body: ioutil.NopCloser(strings.NewReader("no such domain")), - ContentLength: 14, - Close: true, - Request: req, - } - - return resp, errors.New("no backend connected") + return nil, errors.New("no backend connected") } } @@ -464,50 +360,13 @@ func (s *Server) RoundTrip(req *http.Request) (*http.Response, error) { c := conns[rand.Intn(len(conns))] - c.conn.SetDeadline(time.Now().Add(time.Second)) - stream, err := c.session.OpenStream() + resp, err := c.RoundTrip(req) if err != nil { - ln.Error(err, ln.F{ - "action": "opening_session_stream", - "remote_addr": req.RemoteAddr, - "host": req.Host, - "uri": req.RequestURI, - }, c.F()) - - c.cancel() - - return s.RoundTrip(req) - } - defer stream.Close() - c.conn.SetDeadline(time.Time{}) - - err = req.Write(stream) - if err != nil { - ln.Error(err, ln.F{ - "action": "request_writing", - "remote_addr": req.RemoteAddr, - "host": req.Host, - "uri": req.RequestURI, - }, c.F()) - - c.cancel() - - return s.RoundTrip(req) - } - - buf := bufio.NewReader(stream) - - resp, err := http.ReadResponse(buf, req) - if err != nil { - ln.Error(err, ln.F{ - "action": "response_reading", - "remote_addr": req.RemoteAddr, - "host": req.Host, - "uri": req.RequestURI, - }, c.F()) - - c.cancel() + ln.Error(err, c.F(), ln.F{ + "action": "connection_roundtrip", + }) + defer c.cancel() return nil, err }