Skip to content

Commit

Permalink
test(pubsub): Add Test case for batch independent reception
Browse files Browse the repository at this point in the history
  • Loading branch information
mitsos1os committed Aug 8, 2024
1 parent 25c3def commit 765f8d5
Showing 1 changed file with 45 additions and 0 deletions.
45 changes: 45 additions & 0 deletions pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,51 @@ func TestCancelTwoReceives(t *testing.T) {
}
}

type secondReceiveBlockedDriverSub struct {
driver.Subscription
waitDuration time.Duration
receiveCounter int
}

func (s *secondReceiveBlockedDriverSub) ReceiveBatch(_ context.Context, _ int) ([]*driver.Message, error) {
s.receiveCounter++
if s.receiveCounter > 1 {
// wait after 1st request for the specified duration before returning the batch result
<-time.After(s.waitDuration)
}
msg := &driver.Message{Body: []byte(fmt.Sprintf("message #%d", s.receiveCounter))}
return []*driver.Message{msg}, nil
}
func (*secondReceiveBlockedDriverSub) CanNack() bool { return false }
func (*secondReceiveBlockedDriverSub) IsRetryable(error) bool { return false }
func (*secondReceiveBlockedDriverSub) Close() error { return nil }

func TestIndependentBatchReturn(t *testing.T) {
// We want to test the scenario when multiple batch requests are sent, as long as one of them succeeds, it should
// not block the Subscription.Receive result
receiveWaitDuration := 200 * time.Millisecond
s := NewSubscription(
&secondReceiveBlockedDriverSub{waitDuration: receiveWaitDuration},
&batcher.Options{MaxBatchSize: 1, MaxHandlers: 2}, // force 2 batches, by allowing 2 handlers and 1 msg per batch
nil,
)
// set the false calculated subscription batch size to force 2 batches to be called
s.runningBatchSize = 2
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
defer cancel()
defer s.Shutdown(ctx)
start := time.Now()
_, err := s.Receive(ctx)
if err != nil {
t.Fatal("Receive should not fail", err)
return
}
receiveDuration := time.Since(start)
if receiveDuration > receiveWaitDuration {
t.Error("Receive should not be blocked by hanging batch request")
}
}

func TestRetryTopic(t *testing.T) {
// Test that Send is retried if the driver returns a retryable error.
ctx := context.Background()
Expand Down

0 comments on commit 765f8d5

Please sign in to comment.