From 63845b370d2fb1601d030873052bce37e5dd1d18 Mon Sep 17 00:00:00 2001 From: Andrew Ayer Date: Wed, 14 May 2025 18:44:16 -0400 Subject: [PATCH] sequencer: add Reserve method --- sequencer/sequencer.go | 21 +++++++++++++++++ sequencer/sequencer_test.go | 46 +++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/sequencer/sequencer.go b/sequencer/sequencer.go index f4c7cc6..8039ece 100644 --- a/sequencer/sequencer.go +++ b/sequencer/sequencer.go @@ -72,6 +72,27 @@ func (seq *Channel[T]) index(seqNbr uint64) int { return int(seqNbr % seq.Cap()) } +// Wait until the channel has capacity for an item with the given sequence number. +// After this function returns nil, calling Add with the same sequence number will not block. +func (seq *Channel[T]) Reserve(ctx context.Context, sequenceNumber uint64) error { + seq.mu.Lock() + if sequenceNumber >= seq.next+seq.Cap() { + ready := seq.parkWriter(sequenceNumber) + seq.mu.Unlock() + select { + case <-ctx.Done(): + seq.mu.Lock() + seq.forgetWriter(sequenceNumber) + seq.mu.Unlock() + return ctx.Err() + case <-ready: + } + } else { + seq.mu.Unlock() + } + return nil +} + // Send an item with the given sequence number. Blocks if the channel does not have capacity for the item. // It is undefined behavior to send a sequence number that has previously been sent. func (seq *Channel[T]) Add(ctx context.Context, sequenceNumber uint64, item *T) error { diff --git a/sequencer/sequencer_test.go b/sequencer/sequencer_test.go index 5f95450..27a9211 100644 --- a/sequencer/sequencer_test.go +++ b/sequencer/sequencer_test.go @@ -147,3 +147,49 @@ func TestSequencerOutOfOrder(t *testing.T) { //t.Logf("seq.Next %d", i) } } + +func TestSequencerOutOfOrderReserve(t *testing.T) { + ctx := context.Background() + seq := New[uint64](0, 10) + ch := make(chan uint64) + go func() { + for i := range uint64(10_000) { + ch <- i + } + }() + ch2 := make(chan uint64) + for job := range 4 { + go func() { + for i := range ch { + time.Sleep(mathrand.N(11 * time.Duration(job+1) * time.Millisecond)) + err := seq.Reserve(ctx, i) + if err != nil { + panic(fmt.Sprintf("%d: seq.Reserve returned unexpected error %v", i, err)) + } + ch2 <- i + } + }() + } + for range 4 { + go func() { + for i := range ch2 { + time.Sleep(mathrand.N(7 * time.Millisecond)) + t.Logf("seq.Add %d", i) + err := seq.Add(ctx, i, &i) + if err != nil { + panic(fmt.Sprintf("%d: seq.Add returned unexpected error %v", i, err)) + } + } + }() + } + for i := range uint64(10_000) { + next, err := seq.Next(ctx) + if err != nil { + t.Fatalf("%d: seq.Next returned unexpected error %v", i, err) + } + if *next != i { + t.Fatalf("%d: got unexpected value %d", i, *next) + } + t.Logf("seq.Next %d", i) + } +}