129 lines
3.2 KiB
Go
129 lines
3.2 KiB
Go
// Copyright (C) 2025 Opsmate, Inc.
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla
|
|
// Public License, v. 2.0. If a copy of the MPL was not distributed
|
|
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
//
|
|
// This software is distributed WITHOUT A WARRANTY OF ANY KIND.
|
|
// See the Mozilla Public License for details.
|
|
|
|
package sequencer
|
|
|
|
import (
|
|
"context"
|
|
"slices"
|
|
"sync"
|
|
)
|
|
|
|
type seqWriter struct {
|
|
seqNbr uint64
|
|
ready chan<- struct{}
|
|
}
|
|
|
|
// Channel[T] is a multi-producer, single-consumer channel of items with monotonicaly-increasing sequence numbers.
|
|
// Items can be sent in any order, but they are always received in order of their sequence number.
|
|
type Channel[T any] struct {
|
|
mu sync.Mutex
|
|
next uint64
|
|
buf []*T
|
|
writers []seqWriter
|
|
readWaiting bool
|
|
readReady chan struct{}
|
|
}
|
|
|
|
func New[T any](initialSequenceNumber uint64, capacity uint64) *Channel[T] {
|
|
return &Channel[T]{
|
|
buf: make([]*T, capacity),
|
|
next: initialSequenceNumber,
|
|
readReady: make(chan struct{}, 1),
|
|
}
|
|
}
|
|
|
|
func (seq *Channel[T]) parkWriter(seqNbr uint64) <-chan struct{} {
|
|
ready := make(chan struct{})
|
|
seq.writers = append(seq.writers, seqWriter{seqNbr: seqNbr, ready: ready})
|
|
return ready
|
|
}
|
|
|
|
func (seq *Channel[T]) signalWriter(seqNbr uint64) {
|
|
for i := range seq.writers {
|
|
if seq.writers[i].seqNbr == seqNbr {
|
|
close(seq.writers[i].ready)
|
|
seq.writers = slices.Delete(seq.writers, i, i+1)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (seq *Channel[T]) forgetWriter(seqNbr uint64) {
|
|
for i := range seq.writers {
|
|
if seq.writers[i].seqNbr == seqNbr {
|
|
seq.writers = slices.Delete(seq.writers, i, i+1)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (seq *Channel[T]) Cap() uint64 {
|
|
return uint64(len(seq.buf))
|
|
}
|
|
|
|
func (seq *Channel[T]) index(seqNbr uint64) int {
|
|
return int(seqNbr % seq.Cap())
|
|
}
|
|
|
|
// Send an item with the given sequence number. Blocks if the channel does not have capacity for the item.
|
|
// It is undefined behavior to send a sequence number that has previously been sent.
|
|
func (seq *Channel[T]) Add(ctx context.Context, sequenceNumber uint64, item *T) error {
|
|
seq.mu.Lock()
|
|
if sequenceNumber >= seq.next+seq.Cap() {
|
|
ready := seq.parkWriter(sequenceNumber)
|
|
seq.mu.Unlock()
|
|
select {
|
|
case <-ctx.Done():
|
|
seq.mu.Lock()
|
|
seq.forgetWriter(sequenceNumber)
|
|
seq.mu.Unlock()
|
|
return ctx.Err()
|
|
case <-ready:
|
|
}
|
|
seq.mu.Lock()
|
|
}
|
|
seq.buf[seq.index(sequenceNumber)] = item
|
|
if sequenceNumber == seq.next && seq.readWaiting {
|
|
seq.readReady <- struct{}{}
|
|
}
|
|
seq.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// Return the item with the next sequence number, blocking if necessary.
|
|
// Not safe to call concurrently with other Next calls.
|
|
func (seq *Channel[T]) Next(ctx context.Context) (*T, error) {
|
|
seq.mu.Lock()
|
|
if seq.buf[seq.index(seq.next)] == nil {
|
|
seq.readWaiting = true
|
|
seq.mu.Unlock()
|
|
select {
|
|
case <-ctx.Done():
|
|
seq.mu.Lock()
|
|
select {
|
|
case <-seq.readReady:
|
|
default:
|
|
}
|
|
seq.readWaiting = false
|
|
seq.mu.Unlock()
|
|
return nil, ctx.Err()
|
|
case <-seq.readReady:
|
|
}
|
|
seq.mu.Lock()
|
|
seq.readWaiting = false
|
|
}
|
|
item := seq.buf[seq.index(seq.next)]
|
|
seq.buf[seq.index(seq.next)] = nil
|
|
seq.signalWriter(seq.next + seq.Cap())
|
|
seq.next++
|
|
seq.mu.Unlock()
|
|
return item, nil
|
|
}
|