use nats for logworker

This commit is contained in:
Cadey Ratio 2017-08-29 14:05:25 -07:00
parent 2d46bec18b
commit 7399245c74
No known key found for this signature in database
GPG Key ID: D607EE27C2E7F89A
31 changed files with 4427 additions and 1440 deletions

View File

@ -9,8 +9,8 @@ import (
"github.com/Xe/ln"
"github.com/bwmarrin/discordgo"
"github.com/drone/mq/stomp"
"github.com/namsral/flag"
nats "github.com/nats-io/go-nats"
opentracing "github.com/opentracing/opentracing-go"
zipkin "github.com/openzipkin/zipkin-go-opentracing"
)
@ -19,7 +19,7 @@ var (
token = flag.String("token", "", "discord bot token")
zipkinURL = flag.String("zipkin-url", "", "URL for Zipkin traces")
databaseURL = flag.String("database-url", "http://", "URL for database (rqlite)")
mqURL = flag.String("mq-url", "tcp://mq:9000", "URL for STOMP server")
natsURL = flag.String("nats-url", "nats://localhost:4222", "URL for nats message queue")
)
func main() {
@ -52,24 +52,26 @@ func main() {
ln.FatalErr(ctx, err, ln.F{"action": "migrate logs table"})
}
mq, err := stomp.Dial(*mqURL)
mq, err := nats.Connect(*natsURL)
if err != nil {
ln.FatalErr(ctx, err, ln.F{"url": *mqURL})
ln.FatalErr(ctx, err)
}
mq.Subscribe("/topic/message_create", stomp.HandlerFunc(func(m *stomp.Message) {
sp, ctx := opentracing.StartSpanFromContext(m.Context(), "logworker.topic.message.create")
mq.QueueSubscribe("/message/create", "logworker", func(m *nats.Msg) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sp, ctx := opentracing.StartSpanFromContext(ctx, "message.create")
defer sp.Finish()
msg := &discordgo.Message{}
err := json.Unmarshal(m.Msg, msg)
err := json.Unmarshal(m.Data, msg)
if err != nil {
ln.Error(ctx, err, ln.F{"action": "can't unmarshal message body to a discordgo message"})
return
}
f := ln.F{
"stomp_id": string(m.ID),
"channel_id": msg.ChannelID,
"message_id": msg.ID,
"message_author": msg.Author.ID,
@ -81,7 +83,7 @@ func main() {
if err != nil {
ln.Error(ctx, err, f, ln.F{"action": "can't add discordgo message to the database"})
}
}))
})
for {
select {}

View File

@ -2,6 +2,7 @@ package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
@ -13,9 +14,9 @@ import (
"github.com/Xe/ln"
"github.com/bwmarrin/discordgo"
"github.com/drone/mq/stomp"
_ "github.com/joho/godotenv/autoload"
"github.com/namsral/flag"
nats "github.com/nats-io/go-nats"
xkcd "github.com/nishanths/go-xkcd"
opentracing "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
@ -30,7 +31,7 @@ var (
token = flag.String("token", "", "discord bot token")
zipkinURL = flag.String("zipkin-url", "", "URL for Zipkin traces")
databaseURL = flag.String("database-url", "http://", "URL for database (rqlite)")
mqURL = flag.String("mq-url", "tcp://mq:9000", "URL for STOMP server")
natsURL = flag.String("nats-url", "nats://localhost:4222", "URL for Nats message queue")
)
func main() {
@ -80,11 +81,10 @@ func main() {
ctx = context.Background()
mq, err := stomp.Dial(*mqURL)
mq, err := nats.Connect(*natsURL)
if err != nil {
ln.FatalErr(ctx, err, ln.F{"url": *mqURL})
ln.FatalErr(ctx, err)
}
_ = mq
c := cron.New()
@ -156,25 +156,15 @@ func main() {
"message_author_is_bot": m.Author.Bot,
}
err := mq.SendJSON("/topic/message_create", m.Message)
data, err := json.Marshal(m)
if err != nil {
if err.Error() == "EOF" {
mq, err = stomp.Dial(*mqURL)
if err != nil {
ln.Error(ctx, err, f, ln.F{"url": *mqURL, "action": "reconnect to mq"})
return
}
ln.Error(ctx, err, f)
return
}
err = mq.SendJSON("/topic/message_create", m.Message)
if err != nil {
ln.Error(ctx, err, f, ln.F{"action": "retry message_create post to message queue"})
return
}
return
}
ln.Error(ctx, err, f, ln.F{"action": "send created message to queue"})
err = mq.Publish("/message/create", data)
if err != nil {
ln.Error(ctx, err, f, ln.F{"action": "send new message to nats"})
return
}
@ -191,7 +181,12 @@ func main() {
"user_name": m.User.Username,
}
err := mq.SendJSON("/topic/member_add", m.Member)
data, err := json.Marshal(m.Member)
if err != nil {
ln.Error(ctx, err, f, ln.F{"action": "prepare member add to queue"})
}
err = mq.Publish("/member/add", data)
if err != nil {
ln.Error(ctx, err, f, ln.F{"action": "send added member to queue"})
}

View File

@ -11,9 +11,11 @@ services:
ports:
- "9411:9411"
# message queue
mq:
image: drone/mq
# nats message queue
nats:
image: nats
ports:
- "4222:4222"
# database
rqlite:
@ -21,6 +23,8 @@ services:
image: rqlite/rqlite:4.0.2
volumes:
- rqlite:/rqlite/file
ports:
- "4001:4001"
command: -on-disk -http-adv-addr rqlite:4001
# the bot and event sourcing ingress
@ -30,7 +34,7 @@ services:
env_file: ./.env
depends_on:
- zipkin
- mq
- nats
- rqlite
logworker:
@ -39,7 +43,7 @@ services:
env_file: ./.env
depends_on:
- zipkin
- mq
- nats
- rqlite
command: /root/go/bin/logworker

View File

@ -73,7 +73,7 @@ func (l *Logs) Add(ctx context.Context, m *discordgo.Message) error {
me = 1
}
bd := stmt.Bind(m.ID, m.ChannelID, m.Content, ts, me, m.Author.ID, m.Author.Username)
bd := stmt.Bind(m.ID, m.ChannelID, m.Content, ts.Unix(), me, m.Author.ID, m.Author.Username)
l.bdl.Add(bd, len(bd))

View File

@ -50,3 +50,7 @@ f5079bd7f6f74e23c4d65efa0f4ce14cbd6a3c0f golang.org/x/net/websocket
66aacef3dd8a676686c7ae3716979581e8b03c47 golang.org/x/net/context
f52d1811a62927559de87708c8913c1650ce4f26 golang.org/x/sync/semaphore
e0e0e6e500066ff47335c7717e2a090ad127adec google.golang.org/api/support/bundler
acf11e4666ad8ab4680680b45d38cf1509a40fed github.com/nats-io/go-nats
acf11e4666ad8ab4680680b45d38cf1509a40fed github.com/nats-io/go-nats/encoders/builtin
acf11e4666ad8ab4680680b45d38cf1509a40fed github.com/nats-io/go-nats/util
3cf34f9fca4e88afa9da8eabd75e3326c9941b44 github.com/nats-io/nuid

BIN
vendor/github.com/drone/.DS_Store generated vendored Normal file

Binary file not shown.

View File

@ -1,61 +0,0 @@
package logger
var std Logger = new(none)
// Debugf writes a debug message to the standard logger.
func Debugf(format string, args ...interface{}) {
std.Debugf(format, args...)
}
// Verbosef writes a verbose message to the standard logger.
func Verbosef(format string, args ...interface{}) {
std.Verbosef(format, args...)
}
// Noticef writes a notice message to the standard logger.
func Noticef(format string, args ...interface{}) {
std.Noticef(format, args...)
}
// Warningf writes a warning message to the standard logger.
func Warningf(format string, args ...interface{}) {
std.Warningf(format, args...)
}
// Printf writes a default message to the standard logger.
func Printf(format string, args ...interface{}) {
std.Printf(format, args...)
}
// SetLogger sets the standard logger.
func SetLogger(logger Logger) {
std = logger
}
// Logger represents a logger.
type Logger interface {
// Debugf writes a debug message.
Debugf(string, ...interface{})
// Verbosef writes a verbose message.
Verbosef(string, ...interface{})
// Noticef writes a notice message.
Noticef(string, ...interface{})
// Warningf writes a warning message.
Warningf(string, ...interface{})
// Printf writes a default message.
Printf(string, ...interface{})
}
// none is a logger that silently ignores all writes.
type none struct{}
func (*none) Debugf(string, ...interface{}) {}
func (*none) Verbosef(string, ...interface{}) {}
func (*none) Noticef(string, ...interface{}) {}
func (*none) Warningf(string, ...interface{}) {}
func (*none) Printf(string, ...interface{}) {}

View File

@ -1,259 +0,0 @@
package stomp
import (
"bytes"
"encoding/json"
"fmt"
"io"
"runtime/debug"
"strconv"
"sync"
"time"
"github.com/drone/mq/logger"
"github.com/drone/mq/stomp/dialer"
)
// Client defines a client connection to a STOMP server.
type Client struct {
mu sync.Mutex
peer Peer
subs map[string]Handler
wait map[string]chan struct{}
done chan error
seq int64
skipVerify bool
readBufferSize int
writeBufferSize int
timeout time.Duration
}
// New returns a new STOMP client using the given connection.
func New(peer Peer) *Client {
return &Client{
peer: peer,
subs: make(map[string]Handler),
wait: make(map[string]chan struct{}),
done: make(chan error, 1),
}
}
// Dial creates a client connection to the given target.
func Dial(target string) (*Client, error) {
conn, err := dialer.Dial(target)
if err != nil {
return nil, err
}
return New(Conn(conn)), nil
}
// Send sends the data to the given destination.
func (c *Client) Send(dest string, data []byte, opts ...MessageOption) error {
m := NewMessage()
m.Method = MethodSend
m.Dest = []byte(dest)
m.Body = data
m.Apply(opts...)
return c.sendMessage(m)
}
// SendJSON sends the JSON encoding of v to the given destination.
func (c *Client) SendJSON(dest string, v interface{}, opts ...MessageOption) error {
data, err := json.Marshal(v)
if err != nil {
return err
}
opts = append(opts,
WithHeader("content-type", "application/json"),
)
return c.Send(dest, data, opts...)
}
// Subscribe subscribes to the given destination.
func (c *Client) Subscribe(dest string, handler Handler, opts ...MessageOption) (id []byte, err error) {
id = c.incr()
m := NewMessage()
m.Method = MethodSubscribe
m.ID = id
m.Dest = []byte(dest)
m.Apply(opts...)
c.mu.Lock()
c.subs[string(id)] = handler
c.mu.Unlock()
err = c.sendMessage(m)
if err != nil {
c.mu.Lock()
delete(c.subs, string(id))
c.mu.Unlock()
return
}
return
}
// Unsubscribe unsubscribes to the destination.
func (c *Client) Unsubscribe(id []byte, opts ...MessageOption) error {
c.mu.Lock()
delete(c.subs, string(id))
c.mu.Unlock()
m := NewMessage()
m.Method = MethodUnsubscribe
m.ID = id
m.Apply(opts...)
return c.sendMessage(m)
}
// Ack acknowledges the messages with the given id.
func (c *Client) Ack(id []byte, opts ...MessageOption) error {
m := NewMessage()
m.Method = MethodAck
m.ID = id
m.Apply(opts...)
return c.sendMessage(m)
}
// Nack negative-acknowledges the messages with the given id.
func (c *Client) Nack(id []byte, opts ...MessageOption) error {
m := NewMessage()
m.Method = MethodNack
m.ID = id
m.Apply(opts...)
return c.peer.Send(m)
}
// Connect opens the connection and establishes the session.
func (c *Client) Connect(opts ...MessageOption) error {
m := NewMessage()
m.Proto = STOMP
m.Method = MethodStomp
m.Apply(opts...)
if err := c.sendMessage(m); err != nil {
return err
}
m, ok := <-c.peer.Receive()
if !ok {
return io.EOF
}
defer m.Release()
if !bytes.Equal(m.Method, MethodConnected) {
return fmt.Errorf("stomp: inbound message: unexpected method, want connected")
}
go c.listen()
return nil
}
// Disconnect terminates the session and closes the connection.
func (c *Client) Disconnect() error {
m := NewMessage()
m.Method = MethodDisconnect
c.sendMessage(m)
return c.peer.Close()
}
// Done returns a channel
func (c *Client) Done() <-chan error {
return c.done
}
func (c *Client) incr() []byte {
c.mu.Lock()
i := c.seq
c.seq++
c.mu.Unlock()
return strconv.AppendInt(nil, i, 10)
}
func (c *Client) listen() {
defer func() {
if r := recover(); r != nil {
logger.Warningf("stomp client: recover panic: %s", r)
err, ok := r.(error)
if !ok {
logger.Warningf("%v: %s", r, debug.Stack())
c.done <- fmt.Errorf("%v", r)
} else {
logger.Warningf("%s", err)
c.done <- err
}
}
}()
for {
m, ok := <-c.peer.Receive()
if !ok {
c.done <- io.EOF
return
}
switch {
case bytes.Equal(m.Method, MethodMessage):
c.handleMessage(m)
case bytes.Equal(m.Method, MethodRecipet):
c.handleReceipt(m)
default:
logger.Noticef("stomp client: unknown message type: %s",
string(m.Method),
)
}
}
}
func (c *Client) handleReceipt(m *Message) {
c.mu.Lock()
receiptc, ok := c.wait[string(m.Receipt)]
c.mu.Unlock()
if !ok {
logger.Noticef("stomp client: unknown read receipt: %s",
string(m.Receipt),
)
return
}
receiptc <- struct{}{}
}
func (c *Client) handleMessage(m *Message) {
c.mu.Lock()
handler, ok := c.subs[string(m.Subs)]
c.mu.Unlock()
if !ok {
logger.Noticef("stomp client: subscription not found: %s",
string(m.Subs),
)
return
}
handler.Handle(m)
}
func (c *Client) sendMessage(m *Message) error {
if len(m.Receipt) == 0 {
return c.peer.Send(m)
}
receiptc := make(chan struct{}, 1)
c.wait[string(m.Receipt)] = receiptc
defer func() {
delete(c.wait, string(m.Receipt))
}()
err := c.peer.Send(m)
if err != nil {
return err
}
select {
case <-receiptc:
return nil
}
}

View File

@ -1,156 +0,0 @@
package stomp
import (
"bufio"
"io"
"net"
"time"
"github.com/drone/mq/logger"
)
const (
bufferSize = 32 << 10 // default buffer size 32KB
bufferLimit = 32 << 15 // default buffer limit 1MB
)
var (
never time.Time
deadline = time.Second * 5
heartbeatTime = time.Second * 30
heartbeatWait = time.Second * 60
)
type connPeer struct {
conn net.Conn
done chan bool
reader *bufio.Reader
writer *bufio.Writer
incoming chan *Message
outgoing chan *Message
}
// Conn creates a network-connected peer that reads and writes
// messages using net.Conn c.
func Conn(c net.Conn) Peer {
p := &connPeer{
reader: bufio.NewReaderSize(c, bufferSize),
writer: bufio.NewWriterSize(c, bufferSize),
incoming: make(chan *Message),
outgoing: make(chan *Message),
done: make(chan bool),
conn: c,
}
go p.readInto(p.incoming)
go p.writeFrom(p.outgoing)
return p
}
func (c *connPeer) Receive() <-chan *Message {
return c.incoming
}
func (c *connPeer) Send(message *Message) error {
select {
case <-c.done:
return io.EOF
default:
c.outgoing <- message
return nil
}
}
func (c *connPeer) Addr() string {
return c.conn.RemoteAddr().String()
}
func (c *connPeer) Close() error {
return c.close()
}
func (c *connPeer) close() error {
select {
case <-c.done:
return io.EOF
default:
close(c.done)
close(c.incoming)
close(c.outgoing)
return nil
}
}
func (c *connPeer) readInto(messages chan<- *Message) {
defer c.close()
for {
// lim := io.LimitReader(c.conn, bufferLimit)
// buf := bufio.NewReaderSize(lim, bufferSize)
buf, err := c.reader.ReadBytes(0)
if err != nil {
break
}
if len(buf) == 1 {
c.conn.SetReadDeadline(time.Now().Add(heartbeatWait))
logger.Verbosef("stomp: received heart-beat")
continue
}
msg := NewMessage()
msg.Parse(buf[:len(buf)-1])
select {
case <-c.done:
break
default:
messages <- msg
}
}
}
func (c *connPeer) writeFrom(messages <-chan *Message) {
tick := time.NewTicker(time.Millisecond * 100).C
heartbeat := time.NewTicker(heartbeatTime).C
loop:
for {
select {
case <-c.done:
break loop
case <-heartbeat:
logger.Verbosef("stomp: send heart-beat.")
c.writer.WriteByte(0)
case <-tick:
c.conn.SetWriteDeadline(time.Now().Add(deadline))
if err := c.writer.Flush(); err != nil {
break loop
}
c.conn.SetWriteDeadline(never)
case msg, ok := <-messages:
if !ok {
break loop
}
writeTo(c.writer, msg)
c.writer.WriteByte(0)
msg.Release()
}
}
c.drain()
}
func (c *connPeer) drain() error {
c.conn.SetWriteDeadline(time.Now().Add(deadline))
for msg := range c.outgoing {
writeTo(c.writer, msg)
c.writer.WriteByte(0)
msg.Release()
}
c.conn.SetWriteDeadline(never)
c.writer.Flush()
return c.conn.Close()
}

View File

@ -1,76 +0,0 @@
package stomp
// STOMP protocol version.
var STOMP = []byte("1.2")
// STOMP protocol methods.
var (
MethodStomp = []byte("STOMP")
MethodConnect = []byte("CONNECT")
MethodConnected = []byte("CONNECTED")
MethodSend = []byte("SEND")
MethodSubscribe = []byte("SUBSCRIBE")
MethodUnsubscribe = []byte("UNSUBSCRIBE")
MethodAck = []byte("ACK")
MethodNack = []byte("NACK")
MethodDisconnect = []byte("DISCONNECT")
MethodMessage = []byte("MESSAGE")
MethodRecipet = []byte("RECEIPT")
MethodError = []byte("ERROR")
)
// STOMP protocol headers.
var (
HeaderAccept = []byte("accept-version")
HeaderAck = []byte("ack")
HeaderExpires = []byte("expires")
HeaderDest = []byte("destination")
HeaderHost = []byte("host")
HeaderLogin = []byte("login")
HeaderPass = []byte("passcode")
HeaderID = []byte("id")
HeaderMessageID = []byte("message-id")
HeaderPersist = []byte("persist")
HeaderPrefetch = []byte("prefetch-count")
HeaderReceipt = []byte("receipt")
HeaderReceiptID = []byte("receipt-id")
HeaderRetain = []byte("retain")
HeaderSelector = []byte("selector")
HeaderServer = []byte("server")
HeaderSession = []byte("session")
HeaderSubscription = []byte("subscription")
HeaderVersion = []byte("version")
)
// Common STOMP header values.
var (
AckAuto = []byte("auto")
AckClient = []byte("client")
PersistTrue = []byte("true")
RetainTrue = []byte("true")
RetainLast = []byte("last")
RetainAll = []byte("all")
RetainRemove = []byte("remove")
)
var headerLookup = map[string]struct{}{
"accept-version": struct{}{},
"ack": struct{}{},
"expires": struct{}{},
"destination": struct{}{},
"host": struct{}{},
"login": struct{}{},
"passcode": struct{}{},
"id": struct{}{},
"message-id": struct{}{},
"persist": struct{}{},
"prefetch-count": struct{}{},
"receipt": struct{}{},
"receipt-id": struct{}{},
"retain": struct{}{},
"selector": struct{}{},
"server": struct{}{},
"session": struct{}{},
"subscription": struct{}{},
"version": struct{}{},
}

View File

@ -1,37 +0,0 @@
package stomp
import "golang.org/x/net/context"
const clientKey = "stomp.client"
// NewContext adds the client to the context.
func (c *Client) NewContext(ctx context.Context, client *Client) context.Context {
// HACK for use with gin and echo
if s, ok := ctx.(setter); ok {
s.Set(clientKey, clientKey)
return ctx
}
return context.WithValue(ctx, clientKey, client)
}
// FromContext retrieves the client from context
func FromContext(ctx context.Context) (*Client, bool) {
client, ok := ctx.Value(clientKey).(*Client)
return client, ok
}
// MustFromContext retrieves the client from context. Panics if not found
func MustFromContext(ctx context.Context) *Client {
client, ok := FromContext(ctx)
if !ok {
panic("stomp.Client not found in context")
}
return client
}
// HACK setter defines a context that enables setting values. This is a
// temporary workaround for use with gin and echo and will eventually
// be removed. DO NOT depend on this.
type setter interface {
Set(string, interface{})
}

View File

@ -1,51 +0,0 @@
package dialer
import (
"net"
"net/url"
"golang.org/x/net/websocket"
)
const (
protoHTTP = "http"
protoHTTPS = "https"
protoWS = "ws"
protoWSS = "wss"
protoTCP = "tcp"
)
// Dial creates a client connection to the given target.
func Dial(target string) (net.Conn, error) {
u, err := url.Parse(target)
if err != nil {
return nil, err
}
switch u.Scheme {
case protoHTTP, protoHTTPS, protoWS, protoWSS:
return dialWebsocket(u)
case protoTCP:
return dialSocket(u)
default:
panic("stomp: invalid protocol")
}
}
func dialWebsocket(target *url.URL) (net.Conn, error) {
origin, err := target.Parse("/")
if err != nil {
return nil, err
}
switch origin.Scheme {
case protoWS:
origin.Scheme = protoHTTP
case protoWSS:
origin.Scheme = protoHTTPS
}
return websocket.Dial(target.String(), "", origin.String())
}
func dialSocket(target *url.URL) (net.Conn, error) {
return net.Dial(protoTCP, target.Host)
}

View File

@ -1,13 +0,0 @@
package stomp
// Handler handles a STOMP message.
type Handler interface {
Handle(*Message)
}
// The HandlerFunc type is an adapter to allow the use of an ordinary
// function as a STOMP message handler.
type HandlerFunc func(*Message)
// Handle calls f(m).
func (f HandlerFunc) Handle(m *Message) { f(m) }

View File

@ -1,109 +0,0 @@
package stomp
import (
"bytes"
"strconv"
)
const defaultHeaderLen = 5
type item struct {
name []byte
data []byte
}
// Header represents the header section of the STOMP message.
type Header struct {
items []item
itemc int
}
func newHeader() *Header {
return &Header{
items: make([]item, defaultHeaderLen),
}
}
// Get returns the named header value.
func (h *Header) Get(name []byte) (b []byte) {
for i := 0; i < h.itemc; i++ {
if v := h.items[i]; bytes.Equal(v.name, name) {
return v.data
}
}
return
}
// GetString returns the named header value.
func (h *Header) GetString(name string) string {
k := []byte(name)
v := h.Get(k)
return string(v)
}
// GetBool returns the named header value.
func (h *Header) GetBool(name string) bool {
s := h.GetString(name)
b, _ := strconv.ParseBool(s)
return b
}
// GetInt returns the named header value.
func (h *Header) GetInt(name string) int {
s := h.GetString(name)
i, _ := strconv.Atoi(s)
return i
}
// GetInt64 returns the named header value.
func (h *Header) GetInt64(name string) int64 {
s := h.GetString(name)
i, _ := strconv.ParseInt(s, 10, 64)
return i
}
// Field returns the named header value in string format. This is used to
// provide compatibility with the SQL expression evaluation package.
func (h *Header) Field(name []byte) []byte {
return h.Get(name)
}
// Add appens the key value pair to the header.
func (h *Header) Add(name, data []byte) {
h.grow()
h.items[h.itemc].name = name
h.items[h.itemc].data = data
h.itemc++
}
// Index returns the keypair at index i.
func (h *Header) Index(i int) (k, v []byte) {
if i > h.itemc {
return
}
k = h.items[i].name
v = h.items[i].data
return
}
// Len returns the header length.
func (h *Header) Len() int {
return h.itemc
}
func (h *Header) grow() {
if h.itemc > defaultHeaderLen-1 {
h.items = append(h.items, item{})
}
}
func (h *Header) reset() {
h.itemc = 0
h.items = h.items[:defaultHeaderLen]
for i := range h.items {
h.items[i].name = zeroBytes
h.items[i].data = zeroBytes
}
}
var zeroBytes []byte

View File

@ -1,146 +0,0 @@
package stomp
import (
"bytes"
"encoding/json"
"math/rand"
"strconv"
"sync"
"golang.org/x/net/context"
)
// Message represents a parsed STOMP message.
type Message struct {
ID []byte // id header
Proto []byte // stomp version
Method []byte // stomp method
User []byte // username header
Pass []byte // password header
Dest []byte // destination header
Subs []byte // subscription id
Ack []byte // ack id
Msg []byte // message-id header
Persist []byte // persist header
Retain []byte // retain header
Prefetch []byte // prefetch count
Expires []byte // expires header
Receipt []byte // receipt header
Selector []byte // selector header
Body []byte
Header *Header // custom headers
ctx context.Context
}
// Copy returns a copy of the Message.
func (m *Message) Copy() *Message {
c := NewMessage()
c.ID = m.ID
c.Proto = m.Proto
c.Method = m.Method
c.User = m.User
c.Pass = m.Pass
c.Dest = m.Dest
c.Subs = m.Subs
c.Ack = m.Ack
c.Prefetch = m.Prefetch
c.Selector = m.Selector
c.Persist = m.Persist
c.Retain = m.Retain
c.Receipt = m.Receipt
c.Expires = m.Expires
c.Body = m.Body
c.ctx = m.ctx
c.Header.itemc = m.Header.itemc
copy(c.Header.items, m.Header.items)
return c
}
// Apply applies the options to the message.
func (m *Message) Apply(opts ...MessageOption) {
for _, opt := range opts {
opt(m)
}
}
// Parse parses the raw bytes into the message.
func (m *Message) Parse(b []byte) error {
return read(b, m)
}
// Bytes returns the Message in raw byte format.
func (m *Message) Bytes() []byte {
var buf bytes.Buffer
writeTo(&buf, m)
return buf.Bytes()
}
// String returns the Message in string format.
func (m *Message) String() string {
return string(m.Bytes())
}
// Release releases the message back to the message pool.
func (m *Message) Release() {
m.Reset()
pool.Put(m)
}
// Reset resets the meesage fields to their zero values.
func (m *Message) Reset() {
m.ID = m.ID[:0]
m.Proto = m.Proto[:0]
m.Method = m.Method[:0]
m.User = m.User[:0]
m.Pass = m.Pass[:0]
m.Dest = m.Dest[:0]
m.Subs = m.Subs[:0]
m.Ack = m.Ack[:0]
m.Prefetch = m.Prefetch[:0]
m.Selector = m.Selector[:0]
m.Persist = m.Persist[:0]
m.Retain = m.Retain[:0]
m.Receipt = m.Receipt[:0]
m.Expires = m.Expires[:0]
m.Body = m.Body[:0]
m.ctx = nil
m.Header.reset()
}
// Context returns the request's context.
func (m *Message) Context() context.Context {
if m.ctx != nil {
return m.ctx
}
return context.Background()
}
// WithContext returns a shallow copy of m with its context changed
// to ctx. The provided ctx must be non-nil.
func (m *Message) WithContext(ctx context.Context) *Message {
c := m.Copy()
c.ctx = ctx
return c
}
// Unmarshal parses the JSON-encoded body of the message and
// stores the result in the value pointed to by v.
func (m *Message) Unmarshal(v interface{}) error {
return json.Unmarshal(m.Body, v)
}
// NewMessage returns an empty message from the message pool.
func NewMessage() *Message {
return pool.Get().(*Message)
}
var pool = sync.Pool{New: func() interface{} {
return &Message{Header: newHeader()}
}}
// Rand returns a random int64 number as a []byte of
// ascii characters.
func Rand() []byte {
return strconv.AppendInt(nil, rand.Int63(), 10)
}

View File

@ -1,96 +0,0 @@
package stomp
import (
"math/rand"
"strconv"
"strings"
)
// MessageOption configures message options.
type MessageOption func(*Message)
// WithCredentials returns a MessageOption which sets credentials.
func WithCredentials(username, password string) MessageOption {
return func(m *Message) {
m.User = []byte(username)
m.Pass = []byte(password)
}
}
// WithHeader returns a MessageOption which sets a header.
func WithHeader(key, value string) MessageOption {
return func(m *Message) {
_, ok := headerLookup[strings.ToLower(key)]
if !ok {
m.Header.Add(
[]byte(key),
[]byte(value),
)
}
}
}
// WithHeaders returns a MessageOption which sets headers.
func WithHeaders(headers map[string]string) MessageOption {
return func(m *Message) {
for key, value := range headers {
_, ok := headerLookup[strings.ToLower(key)]
if !ok {
m.Header.Add(
[]byte(key),
[]byte(value),
)
}
}
}
}
// WithExpires returns a MessageOption configured with an expiration.
func WithExpires(exp int64) MessageOption {
return func(m *Message) {
m.Expires = strconv.AppendInt(nil, exp, 10)
}
}
// WithPrefetch returns a MessageOption configured with a prefetch count.
func WithPrefetch(prefetch int) MessageOption {
return func(m *Message) {
m.Prefetch = strconv.AppendInt(nil, int64(prefetch), 10)
}
}
// WithReceipt returns a MessageOption configured with a receipt request.
func WithReceipt() MessageOption {
return func(m *Message) {
m.Receipt = strconv.AppendInt(nil, rand.Int63(), 10)
}
}
// WithPersistence returns a MessageOption configured to persist.
func WithPersistence() MessageOption {
return func(m *Message) {
m.Persist = PersistTrue
}
}
// WithRetain returns a MessageOption configured to retain the message.
func WithRetain(retain string) MessageOption {
return func(m *Message) {
m.Retain = []byte(retain)
}
}
// WithSelector returns a MessageOption configured to filter messages
// using a sql-like evaluation string.
func WithSelector(selector string) MessageOption {
return func(m *Message) {
m.Selector = []byte(selector)
}
}
// WithAck returns a MessageOption configured with an ack policy.
func WithAck(ack string) MessageOption {
return func(m *Message) {
m.Ack = []byte(ack)
}
}

View File

@ -1,86 +0,0 @@
package stomp
import (
"io"
"net"
"sync"
)
// Peer defines a peer-to-peer connection.
type Peer interface {
// Send sends a message.
Send(*Message) error
// Receive returns a channel of inbound messages.
Receive() <-chan *Message
// Close closes the connection.
Close() error
// Addr returns the peer address.
Addr() string
}
// Pipe creates a synchronous in-memory pipe, where reads on one end are
// matched with writes on the other. This is useful for direct, in-memory
// client-server communication.
func Pipe() (Peer, Peer) {
atob := make(chan *Message, 10)
btoa := make(chan *Message, 10)
a := &localPeer{
incoming: btoa,
outgoing: atob,
finished: make(chan bool),
}
b := &localPeer{
incoming: atob,
outgoing: btoa,
finished: make(chan bool),
}
return a, b
}
type localPeer struct {
finished chan bool
outgoing chan<- *Message
incoming <-chan *Message
}
func (p *localPeer) Receive() <-chan *Message {
return p.incoming
}
func (p *localPeer) Send(m *Message) error {
select {
case <-p.finished:
return io.EOF
default:
p.outgoing <- m
return nil
}
}
func (p *localPeer) Close() error {
close(p.finished)
close(p.outgoing)
return nil
}
func (p *localPeer) Addr() string {
peerAddrOnce.Do(func() {
// get the local address list
addr, _ := net.InterfaceAddrs()
if len(addr) != 0 {
// use the last address in the list
peerAddr = addr[len(addr)-1].String()
}
})
return peerAddr
}
var peerAddrOnce sync.Once
// default address displayed for local pipes
var peerAddr = "127.0.0.1/8"

View File

@ -1,139 +0,0 @@
package stomp
import (
"bytes"
"fmt"
)
func read(input []byte, m *Message) (err error) {
var (
pos int
off int
tot = len(input)
)
// parse the stomp message
for ; ; off++ {
if off == tot {
return fmt.Errorf("stomp: invalid method")
}
if input[off] == '\n' {
m.Method = input[pos:off]
off++
pos = off
break
}
}
// parse the stomp headers
for {
if off == tot {
return fmt.Errorf("stomp: unexpected eof")
}
if input[off] == '\n' {
off++
pos = off
break
}
var (
name []byte
value []byte
)
loop:
// parse each individual header
for ; ; off++ {
if off >= tot {
return fmt.Errorf("stomp: unexpected eof")
}
switch input[off] {
case '\n':
value = input[pos:off]
off++
pos = off
break loop
case ':':
name = input[pos:off]
off++
pos = off
}
}
switch {
case bytes.Equal(name, HeaderAccept):
m.Proto = value
case bytes.Equal(name, HeaderAck):
m.Ack = value
case bytes.Equal(name, HeaderDest):
m.Dest = value
case bytes.Equal(name, HeaderExpires):
m.Expires = value
case bytes.Equal(name, HeaderLogin):
m.User = value
case bytes.Equal(name, HeaderPass):
m.Pass = value
case bytes.Equal(name, HeaderID):
m.ID = value
case bytes.Equal(name, HeaderMessageID):
m.ID = value
case bytes.Equal(name, HeaderPersist):
m.Persist = value
case bytes.Equal(name, HeaderPrefetch):
m.Prefetch = value
case bytes.Equal(name, HeaderReceipt):
m.Receipt = value
case bytes.Equal(name, HeaderReceiptID):
m.Receipt = value
case bytes.Equal(name, HeaderRetain):
m.Retain = value
case bytes.Equal(name, HeaderSelector):
m.Selector = value
case bytes.Equal(name, HeaderSubscription):
m.Subs = value
case bytes.Equal(name, HeaderVersion):
m.Proto = value
default:
m.Header.Add(name, value)
}
}
if tot > pos {
m.Body = input[pos:]
}
return
}
const (
asciiZero = 48
asciiNine = 57
)
// ParseInt returns the ascii integer value.
func ParseInt(d []byte) (n int) {
if len(d) == 0 {
return 0
}
for _, dec := range d {
if dec < asciiZero || dec > asciiNine {
return 0
}
n = n*10 + (int(dec) - asciiZero)
}
return n
}
// ParseInt64 returns the ascii integer value.
func ParseInt64(d []byte) (n int64) {
if len(d) == 0 {
return 0
}
for _, dec := range d {
if dec < asciiZero || dec > asciiNine {
return 0
}
n = n*10 + (int64(dec) - asciiZero)
}
return n
}