mirror of
https://github.com/SSLMate/certspotter.git
synced 2025-06-27 10:15:33 +02:00
sequencer: add Reserve method
This commit is contained in:
parent
bdc589762a
commit
63845b370d
@ -72,6 +72,27 @@ func (seq *Channel[T]) index(seqNbr uint64) int {
|
||||
return int(seqNbr % seq.Cap())
|
||||
}
|
||||
|
||||
// Wait until the channel has capacity for an item with the given sequence number.
|
||||
// After this function returns nil, calling Add with the same sequence number will not block.
|
||||
func (seq *Channel[T]) Reserve(ctx context.Context, sequenceNumber uint64) error {
|
||||
seq.mu.Lock()
|
||||
if sequenceNumber >= seq.next+seq.Cap() {
|
||||
ready := seq.parkWriter(sequenceNumber)
|
||||
seq.mu.Unlock()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
seq.mu.Lock()
|
||||
seq.forgetWriter(sequenceNumber)
|
||||
seq.mu.Unlock()
|
||||
return ctx.Err()
|
||||
case <-ready:
|
||||
}
|
||||
} else {
|
||||
seq.mu.Unlock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Send an item with the given sequence number. Blocks if the channel does not have capacity for the item.
|
||||
// It is undefined behavior to send a sequence number that has previously been sent.
|
||||
func (seq *Channel[T]) Add(ctx context.Context, sequenceNumber uint64, item *T) error {
|
||||
|
@ -147,3 +147,49 @@ func TestSequencerOutOfOrder(t *testing.T) {
|
||||
//t.Logf("seq.Next %d", i)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSequencerOutOfOrderReserve(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
seq := New[uint64](0, 10)
|
||||
ch := make(chan uint64)
|
||||
go func() {
|
||||
for i := range uint64(10_000) {
|
||||
ch <- i
|
||||
}
|
||||
}()
|
||||
ch2 := make(chan uint64)
|
||||
for job := range 4 {
|
||||
go func() {
|
||||
for i := range ch {
|
||||
time.Sleep(mathrand.N(11 * time.Duration(job+1) * time.Millisecond))
|
||||
err := seq.Reserve(ctx, i)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("%d: seq.Reserve returned unexpected error %v", i, err))
|
||||
}
|
||||
ch2 <- i
|
||||
}
|
||||
}()
|
||||
}
|
||||
for range 4 {
|
||||
go func() {
|
||||
for i := range ch2 {
|
||||
time.Sleep(mathrand.N(7 * time.Millisecond))
|
||||
t.Logf("seq.Add %d", i)
|
||||
err := seq.Add(ctx, i, &i)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("%d: seq.Add returned unexpected error %v", i, err))
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
for i := range uint64(10_000) {
|
||||
next, err := seq.Next(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("%d: seq.Next returned unexpected error %v", i, err)
|
||||
}
|
||||
if *next != i {
|
||||
t.Fatalf("%d: got unexpected value %d", i, *next)
|
||||
}
|
||||
t.Logf("seq.Next %d", i)
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user