package tun2 import ( "bufio" "context" "crypto/tls" "encoding/json" "errors" "fmt" "math/rand" "net" "net/http" "sync" "git.xeserv.us/xena/route/database" "github.com/Xe/ln" "github.com/mtneug/pkg/ulid" kcp "github.com/xtaci/kcp-go" "github.com/xtaci/smux" ) type ServerConfig struct { TCPAddr string KCPAddr string TLSConfig *tls.Config SmuxConf *smux.Config Storage Storage } type Storage interface { GetRouteForHost(name string) (*database.Route, error) //ValidateToken(token string) (username string, ok bool, err error) // XXX RIP implement when users are implemented } type Server struct { cfg *ServerConfig connlock sync.Mutex conns map[net.Conn]*Connection domainlock sync.Mutex domains map[string][]*Connection } type Connection struct { id string conn net.Conn isKCP bool session *smux.Session controlStream *smux.Stream user string domain string cancel context.CancelFunc } func NewServer(cfg *ServerConfig) (*Server, error) { if cfg == nil { return nil, errors.New("tun2: config must be specified") } if cfg.SmuxConf == nil { cfg.SmuxConf = smux.DefaultConfig() } server := &Server{ cfg: cfg, conns: map[net.Conn]*Connection{}, domains: map[string][]*Connection{}, } return server, nil } func (s *Server) ListenAndServe() error { ln.Log(ln.F{ "action": "listen_and_serve_called", }) if s.cfg.TCPAddr != "" { go func() { l, err := tls.Listen("tcp", s.cfg.TCPAddr, s.cfg.TLSConfig) if err != nil { panic(err) } ln.Log(ln.F{ "action": "tcp+tls_listening", "addr": l.Addr(), }) for { conn, err := l.Accept() if err != nil { ln.Error(err, ln.F{"kind": "tcp", "addr": l.Addr().String()}) continue } ln.Log(ln.F{ "action": "new_client", "kcp": false, "addr": conn.RemoteAddr(), }) go s.HandleConn(conn, false) } }() } if s.cfg.KCPAddr != "" { go func() { l, err := kcp.Listen(s.cfg.KCPAddr) if err != nil { panic(err) } ln.Log(ln.F{ "action": "kcp+tls_listening", "addr": l.Addr(), }) for { conn, err := l.Accept() if err != nil { ln.Error(err, ln.F{"kind": "kcp", "addr": l.Addr().String()}) } ln.Log(ln.F{ "action": "new_client", "kcp": true, "addr": conn.RemoteAddr(), }) tc := tls.Server(conn, s.cfg.TLSConfig) go s.HandleConn(tc, true) } }() } return nil } func (s *Server) HandleConn(c net.Conn, isKCP bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() session, err := smux.Server(c, s.cfg.SmuxConf) if err != nil { ln.Error(err, ln.F{ "action": "session_failure", "local": c.LocalAddr().String(), "remote": c.RemoteAddr().String(), }) c.Close() return } controlStream, err := session.OpenStream() if err != nil { ln.Error(err, ln.F{ "action": "control_stream_failure", "local": c.LocalAddr().String(), "remote": c.RemoteAddr().String(), }) session.Close() c.Close() return } csd := json.NewDecoder(controlStream) auth := &Auth{} err = csd.Decode(auth) if err != nil { ln.Error(err, ln.F{ "action": "control_stream_auth_decoding_failure", "local": c.LocalAddr().String(), "remote": c.RemoteAddr().String(), }) controlStream.Close() session.Close() c.Close() return } route, err := s.cfg.Storage.GetRouteForHost(auth.Domain) if err != nil { ln.Error(err, ln.F{ "action": "nosuch_domain", "local": c.LocalAddr().String(), "remote": c.RemoteAddr().String(), }) controlStream.Close() session.Close() c.Close() return } if route.Token != auth.Token { ln.Error(err, ln.F{ "action": "bad_token", "local": c.LocalAddr().String(), "remote": c.RemoteAddr().String(), }) fmt.Fprintln(controlStream, "bad token") controlStream.Close() session.Close() c.Close() return } connection := &Connection{ id: ulid.New().String(), conn: c, isKCP: isKCP, session: session, user: defaultUser, // XXX RIP replace this with the actual token user once users are implemented domain: auth.Domain, cancel: cancel, } ln.Log(ln.F{ "action": "backend_connected", "remote": c.RemoteAddr().String(), "kcp": isKCP, "domain": auth.Domain, "user": connection.user, "id": connection.id, }) s.connlock.Lock() s.conns[c] = connection s.connlock.Unlock() s.domainlock.Lock() s.domains[auth.Domain] = append(s.domains[auth.Domain], connection) s.domainlock.Unlock() select { case <-ctx.Done(): s.connlock.Lock() delete(s.conns, c) s.connlock.Unlock() s.domainlock.Lock() for i, cntn := range s.domains[auth.Domain] { if cntn.id == connection.id { s.domains[auth.Domain][i] = s.domains[auth.Domain][len(s.domains[auth.Domain])-1] s.domains[auth.Domain] = s.domains[auth.Domain][:len(s.domains[auth.Domain])-1] } } s.domainlock.Unlock() ln.Log(ln.F{ "action": "client_disconnecting", "remote": c.RemoteAddr(), "domain": auth.Domain, "id": connection.id, }) controlStream.Close() session.Close() c.Close() return } } func (s *Server) RoundTrip(req *http.Request) (*http.Response, error) { s.domainlock.Lock() conns, ok := s.domains[req.Host] s.domainlock.Unlock() if !ok { return nil, errors.New("domain not found") } c := conns[rand.Intn(len(conns))] stream, err := c.session.OpenStream() if err != nil { ln.Error(err, ln.F{ "action": "opening_session_stream", "backend": c.conn.RemoteAddr().String(), "remote_addr": req.RemoteAddr, "host": req.Host, "uri": req.RequestURI, }) c.cancel() return s.RoundTrip(req) } defer stream.Close() err = req.Write(stream) if err != nil { ln.Error(err, ln.F{ "action": "request_writing", "backend": c.conn.RemoteAddr().String(), "remote_addr": req.RemoteAddr, "host": req.Host, "uri": req.RequestURI, }) c.cancel() return s.RoundTrip(req) } buf := bufio.NewReader(stream) resp, err := http.ReadResponse(buf, req) if err != nil { ln.Error(err, ln.F{ "action": "response_reading", "backend": c.conn.RemoteAddr().String(), "remote_addr": req.RemoteAddr, "host": req.Host, "uri": req.RequestURI, }) c.cancel() return nil, err } return resp, nil } type Auth struct { Token string `json:"token"` Domain string `json:"domain"` } const defaultUser = "Cadey"