diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..371ca46 --- /dev/null +++ b/queue.go @@ -0,0 +1,39 @@ +package gonads + +import ( + "context" +) + +// Queue is a MPMS (multiple producer, multiple subscriber) queue that is +// thread-safe and does not contain unsafe code. The only catch is that the +// queue has a fixed size. Ince the queue is full, insertions will block +// forever until the queue has room. +type Queue[T any] struct { + data chan T +} + +// Push a value to the queue, maybe blocking forever if the queue is full. +func (q Queue[T]) Push(val T) { + q.data <- val +} + +// Pop a value from the queue, maybe blocking forever if the queue is empty. +func (q Queue[T]) Pop() T { + return <-q.data +} + +func (q Queue[T]) TryPop(ctx context.Context) T { + select { + case val := <-q.data: + return val + case <-ctx.Done(): + panic("context died, rip") + } +} + +// NewQueue creates a new queue instance with the given size. +func NewQueue[T any](size int) Queue[T] { + return Queue[T]{ + data: make(chan T, size), + } +} diff --git a/queue_test.go b/queue_test.go new file mode 100644 index 0000000..27e8424 --- /dev/null +++ b/queue_test.go @@ -0,0 +1,34 @@ +package gonads + +import ( + "context" + "testing" + "time" +) + +func TestQueue(t *testing.T) { + q := NewQueue[int](5) + for i := range make([]struct{}, 5) { + q.Push(i) + } + + + for range make([]struct{}, 5) { + t.Log(q.Pop()) + } + + defer func() { + if r := recover(); r != nil { + if sr, ok := r.(string); ok { + t.Log(sr) + return + } + panic(r) + } + } () + + ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond) + q.Push(1) + t.Log(q.TryPop(ctx)) + q.TryPop(ctx) +}