site/vendor/gopkg.in/segmentio/analytics-go.v3/analytics.go

389 lines
9.1 KiB
Go
Raw Normal View History

2018-08-22 03:17:59 +00:00
package analytics
import (
"fmt"
"io"
"io/ioutil"
"sync"
"bytes"
"encoding/json"
"net/http"
"time"
)
// Version of the client.
const Version = "3.0.0"
// This interface is the main API exposed by the analytics package.
// Values that satsify this interface are returned by the client constructors
// provided by the package and provide a way to send messages via the HTTP API.
type Client interface {
io.Closer
// Queues a message to be sent by the client when the conditions for a batch
// upload are met.
// This is the main method you'll be using, a typical flow would look like
// this:
//
// client := analytics.New(writeKey)
// ...
// client.Enqueue(analytics.Track{ ... })
// ...
// client.Close()
//
// The method returns an error if the message queue not be queued, which
// happens if the client was already closed at the time the method was
// called or if the message was malformed.
Enqueue(Message) error
}
type client struct {
Config
key string
// This channel is where the `Enqueue` method writes messages so they can be
// picked up and pushed by the backend goroutine taking care of applying the
// batching rules.
msgs chan Message
// These two channels are used to synchronize the client shutting down when
// `Close` is called.
// The first channel is closed to signal the backend goroutine that it has
// to stop, then the second one is closed by the backend goroutine to signal
// that it has finished flushing all queued messages.
quit chan struct{}
shutdown chan struct{}
// This HTTP client is used to send requests to the backend, it uses the
// HTTP transport provided in the configuration.
http http.Client
}
// Instantiate a new client that uses the write key passed as first argument to
// send messages to the backend.
// The client is created with the default configuration.
func New(writeKey string) Client {
// Here we can ignore the error because the default config is always valid.
c, _ := NewWithConfig(writeKey, Config{})
return c
}
// Instantiate a new client that uses the write key and configuration passed as
// arguments to send messages to the backend.
// The function will return an error if the configuration contained impossible
// values (like a negative flush interval for example).
// When the function returns an error the returned client will always be nil.
func NewWithConfig(writeKey string, config Config) (cli Client, err error) {
if err = config.validate(); err != nil {
return
}
c := &client{
Config: makeConfig(config),
key: writeKey,
msgs: make(chan Message, 100),
quit: make(chan struct{}),
shutdown: make(chan struct{}),
http: makeHttpClient(config.Transport),
}
go c.loop()
cli = c
return
}
func makeHttpClient(transport http.RoundTripper) http.Client {
httpClient := http.Client{
Transport: transport,
}
if supportsTimeout(transport) {
httpClient.Timeout = 10 * time.Second
}
return httpClient
}
func (c *client) Enqueue(msg Message) (err error) {
if err = msg.validate(); err != nil {
return
}
var id = c.uid()
var ts = c.now()
switch m := msg.(type) {
case Alias:
m.Type = "alias"
m.MessageId = makeMessageId(m.MessageId, id)
m.Timestamp = makeTimestamp(m.Timestamp, ts)
msg = m
case Group:
m.Type = "group"
m.MessageId = makeMessageId(m.MessageId, id)
m.Timestamp = makeTimestamp(m.Timestamp, ts)
msg = m
case Identify:
m.Type = "identify"
m.MessageId = makeMessageId(m.MessageId, id)
m.Timestamp = makeTimestamp(m.Timestamp, ts)
msg = m
case Page:
m.Type = "page"
m.MessageId = makeMessageId(m.MessageId, id)
m.Timestamp = makeTimestamp(m.Timestamp, ts)
msg = m
case Screen:
m.Type = "screen"
m.MessageId = makeMessageId(m.MessageId, id)
m.Timestamp = makeTimestamp(m.Timestamp, ts)
msg = m
case Track:
m.Type = "track"
m.MessageId = makeMessageId(m.MessageId, id)
m.Timestamp = makeTimestamp(m.Timestamp, ts)
msg = m
}
defer func() {
// When the `msgs` channel is closed writing to it will trigger a panic.
// To avoid letting the panic propagate to the caller we recover from it
// and instead report that the client has been closed and shouldn't be
// used anymore.
if recover() != nil {
err = ErrClosed
}
}()
c.msgs <- msg
return
}
// Close and flush metrics.
func (c *client) Close() (err error) {
defer func() {
// Always recover, a panic could be raised if `c`.quit was closed which
// means the method was called more than once.
if recover() != nil {
err = ErrClosed
}
}()
close(c.quit)
<-c.shutdown
return
}
// Asychronously send a batched requests.
func (c *client) sendAsync(msgs []message, wg *sync.WaitGroup, ex *executor) {
wg.Add(1)
if !ex.do(func() {
defer wg.Done()
defer func() {
// In case a bug is introduced in the send function that triggers
// a panic, we don't want this to ever crash the application so we
// catch it here and log it instead.
if err := recover(); err != nil {
c.errorf("panic - %s", err)
}
}()
c.send(msgs)
}) {
wg.Done()
c.errorf("sending messages failed - %s", ErrTooManyRequests)
c.notifyFailure(msgs, ErrTooManyRequests)
}
}
// Send batch request.
func (c *client) send(msgs []message) {
const attempts = 10
b, err := json.Marshal(batch{
MessageId: c.uid(),
SentAt: c.now(),
Messages: msgs,
Context: c.DefaultContext,
})
if err != nil {
c.errorf("marshalling messages - %s", err)
c.notifyFailure(msgs, err)
return
}
for i := 0; i != attempts; i++ {
if err = c.upload(b); err == nil {
c.notifySuccess(msgs)
return
}
// Wait for either a retry timeout or the client to be closed.
select {
case <-time.After(c.RetryAfter(i)):
case <-c.quit:
c.errorf("%d messages dropped because they failed to be sent and the client was closed", len(msgs))
c.notifyFailure(msgs, err)
return
}
}
c.errorf("%d messages dropped because they failed to be sent after %d attempts", len(msgs), attempts)
c.notifyFailure(msgs, err)
}
// Upload serialized batch message.
func (c *client) upload(b []byte) error {
url := c.Endpoint + "/v1/batch"
req, err := http.NewRequest("POST", url, bytes.NewReader(b))
if err != nil {
c.errorf("creating request - %s", err)
return err
}
req.Header.Add("User-Agent", "analytics-go (version: "+Version+")")
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Content-Length", string(len(b)))
req.SetBasicAuth(c.key, "")
res, err := c.http.Do(req)
if err != nil {
c.errorf("sending request - %s", err)
return err
}
defer res.Body.Close()
return c.report(res)
}
// Report on response body.
func (c *client) report(res *http.Response) (err error) {
var body []byte
if res.StatusCode < 300 {
c.debugf("response %s", res.Status)
return
}
if body, err = ioutil.ReadAll(res.Body); err != nil {
c.errorf("response %d %s - %s", res.StatusCode, res.Status, err)
return
}
c.logf("response %d %s %s", res.StatusCode, res.Status, string(body))
return fmt.Errorf("%d %s", res.StatusCode, res.Status)
}
// Batch loop.
func (c *client) loop() {
defer close(c.shutdown)
wg := &sync.WaitGroup{}
defer wg.Wait()
tick := time.NewTicker(c.Interval)
defer tick.Stop()
ex := newExecutor(c.maxConcurrentRequests)
defer ex.close()
mq := messageQueue{
maxBatchSize: c.BatchSize,
maxBatchBytes: c.maxBatchBytes(),
}
for {
select {
case msg := <-c.msgs:
c.push(&mq, msg, wg, ex)
case <-tick.C:
c.flush(&mq, wg, ex)
case <-c.quit:
c.debugf("exit requested draining messages")
// Drain the msg channel, we have to close it first so no more
// messages can be pushed and otherwise the loop would never end.
close(c.msgs)
for msg := range c.msgs {
c.push(&mq, msg, wg, ex)
}
c.flush(&mq, wg, ex)
c.debugf("exit")
return
}
}
}
func (c *client) push(q *messageQueue, m Message, wg *sync.WaitGroup, ex *executor) {
var msg message
var err error
if msg, err = makeMessage(m, maxMessageBytes); err != nil {
c.errorf("%s - %v", err, m)
c.notifyFailure([]message{{m, nil}}, err)
return
}
c.debugf("buffer (%d/%d) %v", len(q.pending), c.BatchSize, m)
if msgs := q.push(msg); msgs != nil {
c.debugf("exceeded messages batch limit with batch of %d messages flushing", len(msgs))
c.sendAsync(msgs, wg, ex)
}
}
func (c *client) flush(q *messageQueue, wg *sync.WaitGroup, ex *executor) {
if msgs := q.flush(); msgs != nil {
c.debugf("flushing %d messages", len(msgs))
c.sendAsync(msgs, wg, ex)
}
}
func (c *client) debugf(format string, args ...interface{}) {
if c.Verbose {
c.logf(format, args...)
}
}
func (c *client) logf(format string, args ...interface{}) {
c.Logger.Logf(format, args...)
}
func (c *client) errorf(format string, args ...interface{}) {
c.Logger.Errorf(format, args...)
}
func (c *client) maxBatchBytes() int {
b, _ := json.Marshal(batch{
MessageId: c.uid(),
SentAt: c.now(),
Context: c.DefaultContext,
})
return maxBatchBytes - len(b)
}
func (c *client) notifySuccess(msgs []message) {
if c.Callback != nil {
for _, m := range msgs {
c.Callback.Success(m.msg)
}
}
}
func (c *client) notifyFailure(msgs []message, err error) {
if c.Callback != nil {
for _, m := range msgs {
c.Callback.Failure(m.msg, err)
}
}
}