-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[exporter] Flip on queue batcher #11637
base: main
Are you sure you want to change the base?
Conversation
2900101
to
55aae5c
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #11637 +/- ##
==========================================
- Coverage 91.62% 91.35% -0.27%
==========================================
Files 442 442
Lines 23776 23804 +28
==========================================
- Hits 21785 21747 -38
- Misses 1619 1682 +63
- Partials 372 375 +3 ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
exporter/internal/queue/batcher.go
Outdated
// Shutdown ensures that queue and all Batcher are stopped. | ||
func (qb *BaseBatcher) Shutdown(_ context.Context) error { | ||
qb.stopWG.Wait() | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the other comment.
qb.currentBatchMu.Lock() | ||
if qb.currentBatch == nil || qb.currentBatch.req == nil { | ||
qb.currentBatchMu.Unlock() | ||
continue | ||
} | ||
batchToFlush := *qb.currentBatch | ||
qb.currentBatch = nil | ||
qb.currentBatchMu.Unlock() | ||
|
||
// flushAsync() blocks until successfully started a goroutine for flushing. | ||
qb.flushAsync(batchToFlush) | ||
qb.resetTimer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand this, can we do this in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Here it is: #11666
batch_sender_test
helped me detect that the original implementation is missing a flush on shutdown.
Given the impact of this change (every collector user with sending_queue enabled, which is the default), I suggest we introduce it with a feature gate, e.g. |
555baa2
to
6bb9b7f
Compare
@dmitryax Hi Dimitrii, I wonder if you know what would be a better way to make sure existing tests pass with both feature gate on and off. Manually enabling then disabling in every single exporter test could work but I wonder if there's other option |
3bf9d0b
to
844cfc6
Compare
844cfc6
to
d158ac8
Compare
"go.opentelemetry.io/collector/pipeline" | ||
) | ||
|
||
var usePullingBasedExporterQueueBatcher = featuregate.GlobalRegistry().MustRegister( | ||
"telemetry.UsePullingBasedExporterQueueBatcher", | ||
featuregate.StageBeta, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why starting with Beta? That sounds too aggressive. Let's start with Alpha
Description
This PR solves #10368.
Previously we use a pushing model between the queue and the batch, resulting the batch size to be constrained by the
sending_queue.num_consumers
, because the batch cannot accumulate more thansending_queue.num_consumers
blocked goroutines provide.This PR changes it to a pulling model. We read from the queue until threshold is met or timeout, then allocate a worker to asynchronously send out the request.
Link to tracking issue
Fixes #10368
#8122
Testing
This PR swaps out
batch_sender
directly and still passes all the existing tests.Documentation