parent
7b766d82b6
commit
aa58cbb9df
|
@ -0,0 +1,39 @@
|
|||
package gonads
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// Queue is a MPMS (multiple producer, multiple subscriber) queue that is
|
||||
// thread-safe and does not contain unsafe code. The only catch is that the
|
||||
// queue has a fixed size. Ince the queue is full, insertions will block
|
||||
// forever until the queue has room.
|
||||
type Queue[T any] struct {
|
||||
data chan T
|
||||
}
|
||||
|
||||
// Push a value to the queue, maybe blocking forever if the queue is full.
|
||||
func (q Queue[T]) Push(val T) {
|
||||
q.data <- val
|
||||
}
|
||||
|
||||
// Pop a value from the queue, maybe blocking forever if the queue is empty.
|
||||
func (q Queue[T]) Pop() T {
|
||||
return <-q.data
|
||||
}
|
||||
|
||||
func (q Queue[T]) TryPop(ctx context.Context) T {
|
||||
select {
|
||||
case val := <-q.data:
|
||||
return val
|
||||
case <-ctx.Done():
|
||||
panic("context died, rip")
|
||||
}
|
||||
}
|
||||
|
||||
// NewQueue creates a new queue instance with the given size.
|
||||
func NewQueue[T any](size int) Queue[T] {
|
||||
return Queue[T]{
|
||||
data: make(chan T, size),
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package gonads
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestQueue(t *testing.T) {
|
||||
q := NewQueue[int](5)
|
||||
for i := range make([]struct{}, 5) {
|
||||
q.Push(i)
|
||||
}
|
||||
|
||||
|
||||
for range make([]struct{}, 5) {
|
||||
t.Log(q.Pop())
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
if sr, ok := r.(string); ok {
|
||||
t.Log(sr)
|
||||
return
|
||||
}
|
||||
panic(r)
|
||||
}
|
||||
} ()
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
q.Push(1)
|
||||
t.Log(q.TryPop(ctx))
|
||||
q.TryPop(ctx)
|
||||
}
|
Loading…
Reference in New Issue