certspotter/sequencer/sequencer.go

129 lines
3.2 KiB
Go

// Copyright (C) 2025 Opsmate, Inc.
//
// This Source Code Form is subject to the terms of the Mozilla
// Public License, v. 2.0. If a copy of the MPL was not distributed
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// This software is distributed WITHOUT A WARRANTY OF ANY KIND.
// See the Mozilla Public License for details.
package sequencer
import (
"context"
"slices"
"sync"
)
type seqWriter struct {
seqNbr uint64
ready chan<- struct{}
}
// Channel[T] is a multi-producer, single-consumer channel of items with monotonicaly-increasing sequence numbers.
// Items can be sent in any order, but they are always received in order of their sequence number.
type Channel[T any] struct {
mu sync.Mutex
next uint64
buf []*T
writers []seqWriter
readWaiting bool
readReady chan struct{}
}
func New[T any](initialSequenceNumber uint64, capacity uint64) *Channel[T] {
return &Channel[T]{
buf: make([]*T, capacity),
next: initialSequenceNumber,
readReady: make(chan struct{}, 1),
}
}
func (seq *Channel[T]) parkWriter(seqNbr uint64) <-chan struct{} {
ready := make(chan struct{})
seq.writers = append(seq.writers, seqWriter{seqNbr: seqNbr, ready: ready})
return ready
}
func (seq *Channel[T]) signalWriter(seqNbr uint64) {
for i := range seq.writers {
if seq.writers[i].seqNbr == seqNbr {
close(seq.writers[i].ready)
seq.writers = slices.Delete(seq.writers, i, i+1)
return
}
}
}
func (seq *Channel[T]) forgetWriter(seqNbr uint64) {
for i := range seq.writers {
if seq.writers[i].seqNbr == seqNbr {
seq.writers = slices.Delete(seq.writers, i, i+1)
return
}
}
}
func (seq *Channel[T]) Cap() uint64 {
return uint64(len(seq.buf))
}
func (seq *Channel[T]) index(seqNbr uint64) int {
return int(seqNbr % seq.Cap())
}
// Send an item with the given sequence number. Blocks if the channel does not have capacity for the item.
// It is undefined behavior to send a sequence number that has previously been sent.
func (seq *Channel[T]) Add(ctx context.Context, sequenceNumber uint64, item *T) error {
seq.mu.Lock()
if sequenceNumber >= seq.next+seq.Cap() {
ready := seq.parkWriter(sequenceNumber)
seq.mu.Unlock()
select {
case <-ctx.Done():
seq.mu.Lock()
seq.forgetWriter(sequenceNumber)
seq.mu.Unlock()
return ctx.Err()
case <-ready:
}
seq.mu.Lock()
}
seq.buf[seq.index(sequenceNumber)] = item
if sequenceNumber == seq.next && seq.readWaiting {
seq.readReady <- struct{}{}
}
seq.mu.Unlock()
return nil
}
// Return the item with the next sequence number, blocking if necessary.
// Not safe to call concurrently with other Next calls.
func (seq *Channel[T]) Next(ctx context.Context) (*T, error) {
seq.mu.Lock()
if seq.buf[seq.index(seq.next)] == nil {
seq.readWaiting = true
seq.mu.Unlock()
select {
case <-ctx.Done():
seq.mu.Lock()
select {
case <-seq.readReady:
default:
}
seq.readWaiting = false
seq.mu.Unlock()
return nil, ctx.Err()
case <-seq.readReady:
}
seq.mu.Lock()
seq.readWaiting = false
}
item := seq.buf[seq.index(seq.next)]
seq.buf[seq.index(seq.next)] = nil
seq.signalWriter(seq.next + seq.Cap())
seq.next++
seq.mu.Unlock()
return item, nil
}