diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 3c42442c68..3ee5249df7 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -281,6 +281,51 @@ func TestCancelTwoReceives(t *testing.T) { } } +type secondReceiveBlockedDriverSub struct { + driver.Subscription + waitDuration time.Duration + receiveCounter int +} + +func (s *secondReceiveBlockedDriverSub) ReceiveBatch(_ context.Context, _ int) ([]*driver.Message, error) { + s.receiveCounter++ + if s.receiveCounter > 1 { + // wait after 1st request for the specified duration before returning the batch result + <-time.After(s.waitDuration) + } + msg := &driver.Message{Body: []byte(fmt.Sprintf("message #%d", s.receiveCounter))} + return []*driver.Message{msg}, nil +} +func (*secondReceiveBlockedDriverSub) CanNack() bool { return false } +func (*secondReceiveBlockedDriverSub) IsRetryable(error) bool { return false } +func (*secondReceiveBlockedDriverSub) Close() error { return nil } + +func TestIndependentBatchReturn(t *testing.T) { + // We want to test the scenario when multiple batch requests are sent, as long as one of them succeeds, it should + // not block the Subscription.Receive result + receiveWaitDuration := 200 * time.Millisecond + s := NewSubscription( + &secondReceiveBlockedDriverSub{waitDuration: receiveWaitDuration}, + &batcher.Options{MaxBatchSize: 1, MaxHandlers: 2}, // force 2 batches, by allowing 2 handlers and 1 msg per batch + nil, + ) + // set the false calculated subscription batch size to force 2 batches to be called + s.runningBatchSize = 2 + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond) + defer cancel() + defer s.Shutdown(ctx) + start := time.Now() + _, err := s.Receive(ctx) + if err != nil { + t.Fatal("Receive should not fail", err) + return + } + receiveDuration := time.Since(start) + if receiveDuration > receiveWaitDuration { + t.Error("Receive should not be blocked by hanging batch request") + } +} + func TestRetryTopic(t *testing.T) { // Test that Send is retried if the driver returns a retryable error. ctx := context.Background()