389 lines
9.1 KiB
Go
389 lines
9.1 KiB
Go
package analytics
|
||
|
||
import (
|
||
"fmt"
|
||
"io"
|
||
"io/ioutil"
|
||
"sync"
|
||
|
||
"bytes"
|
||
"encoding/json"
|
||
"net/http"
|
||
"time"
|
||
)
|
||
|
||
// Version of the client.
|
||
const Version = "3.0.0"
|
||
|
||
// This interface is the main API exposed by the analytics package.
|
||
// Values that satsify this interface are returned by the client constructors
|
||
// provided by the package and provide a way to send messages via the HTTP API.
|
||
type Client interface {
|
||
io.Closer
|
||
|
||
// Queues a message to be sent by the client when the conditions for a batch
|
||
// upload are met.
|
||
// This is the main method you'll be using, a typical flow would look like
|
||
// this:
|
||
//
|
||
// client := analytics.New(writeKey)
|
||
// ...
|
||
// client.Enqueue(analytics.Track{ ... })
|
||
// ...
|
||
// client.Close()
|
||
//
|
||
// The method returns an error if the message queue not be queued, which
|
||
// happens if the client was already closed at the time the method was
|
||
// called or if the message was malformed.
|
||
Enqueue(Message) error
|
||
}
|
||
|
||
type client struct {
|
||
Config
|
||
key string
|
||
|
||
// This channel is where the `Enqueue` method writes messages so they can be
|
||
// picked up and pushed by the backend goroutine taking care of applying the
|
||
// batching rules.
|
||
msgs chan Message
|
||
|
||
// These two channels are used to synchronize the client shutting down when
|
||
// `Close` is called.
|
||
// The first channel is closed to signal the backend goroutine that it has
|
||
// to stop, then the second one is closed by the backend goroutine to signal
|
||
// that it has finished flushing all queued messages.
|
||
quit chan struct{}
|
||
shutdown chan struct{}
|
||
|
||
// This HTTP client is used to send requests to the backend, it uses the
|
||
// HTTP transport provided in the configuration.
|
||
http http.Client
|
||
}
|
||
|
||
// Instantiate a new client that uses the write key passed as first argument to
|
||
// send messages to the backend.
|
||
// The client is created with the default configuration.
|
||
func New(writeKey string) Client {
|
||
// Here we can ignore the error because the default config is always valid.
|
||
c, _ := NewWithConfig(writeKey, Config{})
|
||
return c
|
||
}
|
||
|
||
// Instantiate a new client that uses the write key and configuration passed as
|
||
// arguments to send messages to the backend.
|
||
// The function will return an error if the configuration contained impossible
|
||
// values (like a negative flush interval for example).
|
||
// When the function returns an error the returned client will always be nil.
|
||
func NewWithConfig(writeKey string, config Config) (cli Client, err error) {
|
||
if err = config.validate(); err != nil {
|
||
return
|
||
}
|
||
|
||
c := &client{
|
||
Config: makeConfig(config),
|
||
key: writeKey,
|
||
msgs: make(chan Message, 100),
|
||
quit: make(chan struct{}),
|
||
shutdown: make(chan struct{}),
|
||
http: makeHttpClient(config.Transport),
|
||
}
|
||
|
||
go c.loop()
|
||
|
||
cli = c
|
||
return
|
||
}
|
||
|
||
func makeHttpClient(transport http.RoundTripper) http.Client {
|
||
httpClient := http.Client{
|
||
Transport: transport,
|
||
}
|
||
if supportsTimeout(transport) {
|
||
httpClient.Timeout = 10 * time.Second
|
||
}
|
||
return httpClient
|
||
}
|
||
|
||
func (c *client) Enqueue(msg Message) (err error) {
|
||
if err = msg.validate(); err != nil {
|
||
return
|
||
}
|
||
|
||
var id = c.uid()
|
||
var ts = c.now()
|
||
|
||
switch m := msg.(type) {
|
||
case Alias:
|
||
m.Type = "alias"
|
||
m.MessageId = makeMessageId(m.MessageId, id)
|
||
m.Timestamp = makeTimestamp(m.Timestamp, ts)
|
||
msg = m
|
||
|
||
case Group:
|
||
m.Type = "group"
|
||
m.MessageId = makeMessageId(m.MessageId, id)
|
||
m.Timestamp = makeTimestamp(m.Timestamp, ts)
|
||
msg = m
|
||
|
||
case Identify:
|
||
m.Type = "identify"
|
||
m.MessageId = makeMessageId(m.MessageId, id)
|
||
m.Timestamp = makeTimestamp(m.Timestamp, ts)
|
||
msg = m
|
||
|
||
case Page:
|
||
m.Type = "page"
|
||
m.MessageId = makeMessageId(m.MessageId, id)
|
||
m.Timestamp = makeTimestamp(m.Timestamp, ts)
|
||
msg = m
|
||
|
||
case Screen:
|
||
m.Type = "screen"
|
||
m.MessageId = makeMessageId(m.MessageId, id)
|
||
m.Timestamp = makeTimestamp(m.Timestamp, ts)
|
||
msg = m
|
||
|
||
case Track:
|
||
m.Type = "track"
|
||
m.MessageId = makeMessageId(m.MessageId, id)
|
||
m.Timestamp = makeTimestamp(m.Timestamp, ts)
|
||
msg = m
|
||
}
|
||
|
||
defer func() {
|
||
// When the `msgs` channel is closed writing to it will trigger a panic.
|
||
// To avoid letting the panic propagate to the caller we recover from it
|
||
// and instead report that the client has been closed and shouldn't be
|
||
// used anymore.
|
||
if recover() != nil {
|
||
err = ErrClosed
|
||
}
|
||
}()
|
||
|
||
c.msgs <- msg
|
||
return
|
||
}
|
||
|
||
// Close and flush metrics.
|
||
func (c *client) Close() (err error) {
|
||
defer func() {
|
||
// Always recover, a panic could be raised if `c`.quit was closed which
|
||
// means the method was called more than once.
|
||
if recover() != nil {
|
||
err = ErrClosed
|
||
}
|
||
}()
|
||
close(c.quit)
|
||
<-c.shutdown
|
||
return
|
||
}
|
||
|
||
// Asychronously send a batched requests.
|
||
func (c *client) sendAsync(msgs []message, wg *sync.WaitGroup, ex *executor) {
|
||
wg.Add(1)
|
||
|
||
if !ex.do(func() {
|
||
defer wg.Done()
|
||
defer func() {
|
||
// In case a bug is introduced in the send function that triggers
|
||
// a panic, we don't want this to ever crash the application so we
|
||
// catch it here and log it instead.
|
||
if err := recover(); err != nil {
|
||
c.errorf("panic - %s", err)
|
||
}
|
||
}()
|
||
c.send(msgs)
|
||
}) {
|
||
wg.Done()
|
||
c.errorf("sending messages failed - %s", ErrTooManyRequests)
|
||
c.notifyFailure(msgs, ErrTooManyRequests)
|
||
}
|
||
}
|
||
|
||
// Send batch request.
|
||
func (c *client) send(msgs []message) {
|
||
const attempts = 10
|
||
|
||
b, err := json.Marshal(batch{
|
||
MessageId: c.uid(),
|
||
SentAt: c.now(),
|
||
Messages: msgs,
|
||
Context: c.DefaultContext,
|
||
})
|
||
|
||
if err != nil {
|
||
c.errorf("marshalling messages - %s", err)
|
||
c.notifyFailure(msgs, err)
|
||
return
|
||
}
|
||
|
||
for i := 0; i != attempts; i++ {
|
||
if err = c.upload(b); err == nil {
|
||
c.notifySuccess(msgs)
|
||
return
|
||
}
|
||
|
||
// Wait for either a retry timeout or the client to be closed.
|
||
select {
|
||
case <-time.After(c.RetryAfter(i)):
|
||
case <-c.quit:
|
||
c.errorf("%d messages dropped because they failed to be sent and the client was closed", len(msgs))
|
||
c.notifyFailure(msgs, err)
|
||
return
|
||
}
|
||
}
|
||
|
||
c.errorf("%d messages dropped because they failed to be sent after %d attempts", len(msgs), attempts)
|
||
c.notifyFailure(msgs, err)
|
||
}
|
||
|
||
// Upload serialized batch message.
|
||
func (c *client) upload(b []byte) error {
|
||
url := c.Endpoint + "/v1/batch"
|
||
req, err := http.NewRequest("POST", url, bytes.NewReader(b))
|
||
if err != nil {
|
||
c.errorf("creating request - %s", err)
|
||
return err
|
||
}
|
||
|
||
req.Header.Add("User-Agent", "analytics-go (version: "+Version+")")
|
||
req.Header.Add("Content-Type", "application/json")
|
||
req.Header.Add("Content-Length", string(len(b)))
|
||
req.SetBasicAuth(c.key, "")
|
||
|
||
res, err := c.http.Do(req)
|
||
|
||
if err != nil {
|
||
c.errorf("sending request - %s", err)
|
||
return err
|
||
}
|
||
|
||
defer res.Body.Close()
|
||
return c.report(res)
|
||
}
|
||
|
||
// Report on response body.
|
||
func (c *client) report(res *http.Response) (err error) {
|
||
var body []byte
|
||
|
||
if res.StatusCode < 300 {
|
||
c.debugf("response %s", res.Status)
|
||
return
|
||
}
|
||
|
||
if body, err = ioutil.ReadAll(res.Body); err != nil {
|
||
c.errorf("response %d %s - %s", res.StatusCode, res.Status, err)
|
||
return
|
||
}
|
||
|
||
c.logf("response %d %s – %s", res.StatusCode, res.Status, string(body))
|
||
return fmt.Errorf("%d %s", res.StatusCode, res.Status)
|
||
}
|
||
|
||
// Batch loop.
|
||
func (c *client) loop() {
|
||
defer close(c.shutdown)
|
||
|
||
wg := &sync.WaitGroup{}
|
||
defer wg.Wait()
|
||
|
||
tick := time.NewTicker(c.Interval)
|
||
defer tick.Stop()
|
||
|
||
ex := newExecutor(c.maxConcurrentRequests)
|
||
defer ex.close()
|
||
|
||
mq := messageQueue{
|
||
maxBatchSize: c.BatchSize,
|
||
maxBatchBytes: c.maxBatchBytes(),
|
||
}
|
||
|
||
for {
|
||
select {
|
||
case msg := <-c.msgs:
|
||
c.push(&mq, msg, wg, ex)
|
||
|
||
case <-tick.C:
|
||
c.flush(&mq, wg, ex)
|
||
|
||
case <-c.quit:
|
||
c.debugf("exit requested – draining messages")
|
||
|
||
// Drain the msg channel, we have to close it first so no more
|
||
// messages can be pushed and otherwise the loop would never end.
|
||
close(c.msgs)
|
||
for msg := range c.msgs {
|
||
c.push(&mq, msg, wg, ex)
|
||
}
|
||
|
||
c.flush(&mq, wg, ex)
|
||
c.debugf("exit")
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
func (c *client) push(q *messageQueue, m Message, wg *sync.WaitGroup, ex *executor) {
|
||
var msg message
|
||
var err error
|
||
|
||
if msg, err = makeMessage(m, maxMessageBytes); err != nil {
|
||
c.errorf("%s - %v", err, m)
|
||
c.notifyFailure([]message{{m, nil}}, err)
|
||
return
|
||
}
|
||
|
||
c.debugf("buffer (%d/%d) %v", len(q.pending), c.BatchSize, m)
|
||
|
||
if msgs := q.push(msg); msgs != nil {
|
||
c.debugf("exceeded messages batch limit with batch of %d messages – flushing", len(msgs))
|
||
c.sendAsync(msgs, wg, ex)
|
||
}
|
||
}
|
||
|
||
func (c *client) flush(q *messageQueue, wg *sync.WaitGroup, ex *executor) {
|
||
if msgs := q.flush(); msgs != nil {
|
||
c.debugf("flushing %d messages", len(msgs))
|
||
c.sendAsync(msgs, wg, ex)
|
||
}
|
||
}
|
||
|
||
func (c *client) debugf(format string, args ...interface{}) {
|
||
if c.Verbose {
|
||
c.logf(format, args...)
|
||
}
|
||
}
|
||
|
||
func (c *client) logf(format string, args ...interface{}) {
|
||
c.Logger.Logf(format, args...)
|
||
}
|
||
|
||
func (c *client) errorf(format string, args ...interface{}) {
|
||
c.Logger.Errorf(format, args...)
|
||
}
|
||
|
||
func (c *client) maxBatchBytes() int {
|
||
b, _ := json.Marshal(batch{
|
||
MessageId: c.uid(),
|
||
SentAt: c.now(),
|
||
Context: c.DefaultContext,
|
||
})
|
||
return maxBatchBytes - len(b)
|
||
}
|
||
|
||
func (c *client) notifySuccess(msgs []message) {
|
||
if c.Callback != nil {
|
||
for _, m := range msgs {
|
||
c.Callback.Success(m.msg)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (c *client) notifyFailure(msgs []message, err error) {
|
||
if c.Callback != nil {
|
||
for _, m := range msgs {
|
||
c.Callback.Failure(m.msg, err)
|
||
}
|
||
}
|
||
}
|