163 lines
3.5 KiB
Go
163 lines
3.5 KiB
Go
package tun2
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"io"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/Xe/ln"
|
|
failure "github.com/dgryski/go-failure"
|
|
"github.com/pkg/errors"
|
|
"github.com/xtaci/smux"
|
|
)
|
|
|
|
// Connection is a single active client -> server connection and session
|
|
// containing many streams over TCP+TLS or KCP+TLS. Every stream beyond the
|
|
// control stream is assumed to be passed to the underlying backend server.
|
|
type Connection struct {
|
|
id string
|
|
conn net.Conn
|
|
isKCP bool
|
|
session *smux.Session
|
|
controlStream *smux.Stream
|
|
user string
|
|
domain string
|
|
cf context.CancelFunc
|
|
detector *failure.Detector
|
|
Auth *Auth
|
|
usable bool
|
|
}
|
|
|
|
func (c *Connection) cancel() {
|
|
c.cf()
|
|
c.usable = false
|
|
}
|
|
|
|
// F logs key->value pairs as an ln.Fer
|
|
func (c *Connection) F() ln.F {
|
|
return map[string]interface{}{
|
|
"id": c.id,
|
|
"remote": c.conn.RemoteAddr(),
|
|
"local": c.conn.LocalAddr(),
|
|
"isKCP": c.isKCP,
|
|
"user": c.user,
|
|
"domain": c.domain,
|
|
}
|
|
}
|
|
|
|
// Ping ends a "ping" to the client. If the client doesn't respond or the connection
|
|
// dies, then the connection needs to be cleaned up.
|
|
func (c *Connection) Ping() error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
defer cancel()
|
|
|
|
req, err := http.NewRequest("GET", "http://backend/health", nil)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
req = req.WithContext(ctx)
|
|
|
|
_, err = c.RoundTrip(req)
|
|
if err != nil {
|
|
ln.Error(ctx, err, c, ln.Action("pinging the backend"))
|
|
return err
|
|
}
|
|
|
|
c.detector.Ping(time.Now())
|
|
|
|
return nil
|
|
}
|
|
|
|
// OpenStream creates a new stream (connection) to the backend server.
|
|
func (c *Connection) OpenStream(ctx context.Context) (net.Conn, error) {
|
|
if !c.usable {
|
|
return nil, ErrNoSuchBackend
|
|
}
|
|
|
|
err := c.conn.SetDeadline(time.Now().Add(time.Second))
|
|
if err != nil {
|
|
ln.Error(ctx, err, c)
|
|
return nil, err
|
|
}
|
|
|
|
stream, err := c.session.OpenStream()
|
|
if err != nil {
|
|
ln.Error(ctx, err, c)
|
|
return nil, err
|
|
}
|
|
|
|
return stream, c.conn.SetDeadline(time.Time{})
|
|
}
|
|
|
|
// Close destroys resouces specific to the connection.
|
|
func (c *Connection) Close() error {
|
|
err := c.controlStream.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = c.session.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = c.conn.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Connection-specific errors
|
|
var (
|
|
ErrCantOpenSessionStream = errors.New("tun2: connection can't open session stream")
|
|
ErrCantWriteRequest = errors.New("tun2: connection stream can't write request")
|
|
ErrCantReadResponse = errors.New("tun2: connection stream can't read response")
|
|
)
|
|
|
|
// RoundTrip forwards a HTTP request to the remote backend and then returns the
|
|
// response, if any.
|
|
func (c *Connection) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
stream, err := c.OpenStream(req.Context())
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, ErrCantOpenSessionStream.Error())
|
|
}
|
|
|
|
err = req.Write(stream)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, ErrCantWriteRequest.Error())
|
|
}
|
|
|
|
buf := bufio.NewReader(stream)
|
|
|
|
resp, err := http.ReadResponse(buf, req)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, ErrCantReadResponse.Error())
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
body, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "can't read response body")
|
|
}
|
|
|
|
resp.Body = &mixedReadCloser{
|
|
Reader: buf,
|
|
Closer: stream,
|
|
}
|
|
|
|
resp.ContentLength = int64(len(body))
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
type mixedReadCloser struct {
|
|
io.Reader
|
|
io.Closer
|
|
}
|