tun2: make Connection into its own file

This commit is contained in:
Cadey Ratio 2017-04-05 21:44:12 -07:00
parent 2bda4a8ef8
commit 9605eb88bd
2 changed files with 142 additions and 153 deletions

130
lib/tun2/connection.go Normal file
View File

@ -0,0 +1,130 @@
package tun2
import (
"bufio"
"context"
"net"
"net/http"
"time"
"github.com/Xe/ln"
failure "github.com/dgryski/go-failure"
"github.com/pkg/errors"
"github.com/xtaci/smux"
)
// Connection is a single active client -> server connection and session
// containing many streams over TCP+TLS or KCP+TLS. Every stream beyond the
// control stream is assumed to be passed to the underlying backend server.
type Connection struct {
id string
conn net.Conn
isKCP bool
session *smux.Session
controlStream *smux.Stream
user string
domain string
cancel context.CancelFunc
detector *failure.Detector
Auth *Auth
}
// F logs key->value pairs as an ln.Fer
func (c *Connection) F() ln.F {
return map[string]interface{}{
"id": c.id,
"remote": c.conn.RemoteAddr(),
"local": c.conn.LocalAddr(),
"isKCP": c.isKCP,
"user": c.user,
"domain": c.domain,
}
}
// Ping ends a "ping" to the client. If the client doesn't respond or the connection
// dies, then the connection needs to be cleaned up.
func (c *Connection) Ping() error {
req, err := http.NewRequest("GET", "http://backend/health", nil)
if err != nil {
panic(err)
}
_, err = c.RoundTrip(req)
if err != nil {
ln.Error(err, c.F(), ln.F{"action": "ping_roundtrip"})
defer c.cancel()
return err
}
c.detector.Ping(time.Now())
return nil
}
// OpenStream creates a new stream (connection) to the backend server.
func (c *Connection) OpenStream() (net.Conn, error) {
err := c.conn.SetDeadline(time.Now().Add(time.Second))
if err != nil {
ln.Error(err, c.F())
return nil, err
}
stream, err := c.session.OpenStream()
if err != nil {
ln.Error(err, c.F())
return nil, err
}
return stream, c.conn.SetDeadline(time.Time{})
}
// Close destroys resouces specific to the connection.
func (c *Connection) Close() error {
err := c.controlStream.Close()
if err != nil {
return err
}
err = c.session.Close()
if err != nil {
return err
}
err = c.conn.Close()
if err != nil {
return err
}
return nil
}
// Connection-specific errors
var (
ErrCantOpenSessionStream = errors.New("tun2: connection can't open session stream")
ErrCantWriteRequest = errors.New("tun2: connection stream can't write request")
ErrCantReadResponse = errors.New("tun2: connection stream can't read response")
)
// RoundTrip forwards a HTTP request to the remote backend and then returns the
// response, if any.
func (c *Connection) RoundTrip(req *http.Request) (*http.Response, error) {
stream, err := c.OpenStream()
if err != nil {
return nil, errors.Wrap(err, ErrCantOpenSessionStream.Error())
}
defer stream.Close()
err = req.Write(stream)
if err != nil {
return nil, errors.Wrap(err, ErrCantWriteRequest.Error())
}
buf := bufio.NewReader(stream)
resp, err := http.ReadResponse(buf, req)
if err != nil {
return nil, errors.Wrap(err, ErrCantReadResponse.Error())
}
return resp, nil
}

View File

@ -1,17 +1,14 @@
package tun2
import (
"bufio"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"strings"
"sync"
"time"
@ -47,29 +44,6 @@ type Server struct {
domains cmap.ConcurrentMap
}
type Connection struct {
id string
conn net.Conn
isKCP bool
session *smux.Session
controlStream *smux.Stream
user string
domain string
cancel context.CancelFunc
detector *failure.Detector
}
func (c *Connection) F() ln.F {
return map[string]interface{}{
"id": c.id,
"remote": c.conn.RemoteAddr(),
"local": c.conn.LocalAddr(),
"isKCP": c.isKCP,
"user": c.user,
"domain": c.domain,
}
}
func NewServer(cfg *ServerConfig) (*Server, error) {
if cfg == nil {
return nil, errors.New("tun2: config must be specified")
@ -182,80 +156,6 @@ func (s *Server) ListenAndServe() error {
return nil
}
// Ping ends a "ping" to the client. If the client doesn't respond or the connection
// dies, then the connection needs to be cleaned up.
func (c *Connection) Ping() error {
req, err := http.NewRequest("GET", "http://backend/health", nil)
if err != nil {
panic(err)
}
stream, err := c.OpenStream()
if err != nil {
ln.Error(err, c.F())
defer c.cancel()
return err
}
defer stream.Close()
stream.SetWriteDeadline(time.Now().Add(time.Second))
err = req.Write(stream)
if err != nil {
ln.Error(err, c.F())
defer c.cancel()
return err
}
stream.SetReadDeadline(time.Now().Add(5 * time.Second))
_, err = stream.Read(make([]byte, 30))
if err != nil {
ln.Error(err, c.F())
defer c.cancel()
return err
}
c.detector.Ping(time.Now())
return nil
}
// OpenStream creates a new stream (connection) to the backend server.
func (c *Connection) OpenStream() (net.Conn, error) {
err := c.conn.SetDeadline(time.Now().Add(time.Second))
if err != nil {
ln.Error(err, c.F())
return nil, err
}
stream, err := c.session.OpenStream()
if err != nil {
ln.Error(err, c.F())
return nil, err
}
return stream, c.conn.SetDeadline(time.Time{})
}
// Close destroys resouces specific to the connection.
func (c *Connection) Close() error {
err := c.controlStream.Close()
if err != nil {
return err
}
err = c.session.Close()
if err != nil {
return err
}
err = c.conn.Close()
if err != nil {
return err
}
return nil
}
func (s *Server) HandleConn(c net.Conn, isKCP bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -344,6 +244,7 @@ func (s *Server) HandleConn(c net.Conn, isKCP bool) {
domain: auth.Domain,
cancel: cancel,
detector: failure.New(15, 1),
Auth: auth,
}
ln.Log(ln.F{
@ -381,7 +282,7 @@ func (s *Server) HandleConn(c net.Conn, isKCP bool) {
cancel()
}
case <-ctx.Done():
s.RemoveConn(auth, connection)
s.RemoveConn(connection)
connection.Close()
return
@ -389,11 +290,14 @@ func (s *Server) HandleConn(c net.Conn, isKCP bool) {
}
}
func (s *Server) RemoveConn(auth *Auth, connection *Connection) {
// RemoveConn removes a connection
func (s *Server) RemoveConn(connection *Connection) {
s.connlock.Lock()
delete(s.conns, connection.conn)
s.connlock.Unlock()
auth := connection.Auth
var conns []*Connection
val, ok := s.domains.Get(auth.Domain)
@ -439,15 +343,7 @@ func (s *Server) RoundTrip(req *http.Request) (*http.Response, error) {
"uri": req.RequestURI,
})
resp := &http.Response{
StatusCode: http.StatusBadGateway,
Body: ioutil.NopCloser(strings.NewReader("no such domain")),
ContentLength: 14,
Close: true,
Request: req,
}
return resp, errors.New("no backend connected")
return nil, errors.New("no backend connected")
}
}
@ -464,50 +360,13 @@ func (s *Server) RoundTrip(req *http.Request) (*http.Response, error) {
c := conns[rand.Intn(len(conns))]
c.conn.SetDeadline(time.Now().Add(time.Second))
stream, err := c.session.OpenStream()
resp, err := c.RoundTrip(req)
if err != nil {
ln.Error(err, ln.F{
"action": "opening_session_stream",
"remote_addr": req.RemoteAddr,
"host": req.Host,
"uri": req.RequestURI,
}, c.F())
c.cancel()
return s.RoundTrip(req)
}
defer stream.Close()
c.conn.SetDeadline(time.Time{})
err = req.Write(stream)
if err != nil {
ln.Error(err, ln.F{
"action": "request_writing",
"remote_addr": req.RemoteAddr,
"host": req.Host,
"uri": req.RequestURI,
}, c.F())
c.cancel()
return s.RoundTrip(req)
}
buf := bufio.NewReader(stream)
resp, err := http.ReadResponse(buf, req)
if err != nil {
ln.Error(err, ln.F{
"action": "response_reading",
"remote_addr": req.RemoteAddr,
"host": req.Host,
"uri": req.RequestURI,
}, c.F())
c.cancel()
ln.Error(err, c.F(), ln.F{
"action": "connection_roundtrip",
})
defer c.cancel()
return nil, err
}