Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(exporterhelper): Add concurrency sender to send batches concurrently #10478

Closed

Conversation

moh-osman3
Copy link
Contributor

@moh-osman3 moh-osman3 commented Jun 28, 2024

Description

This PR adds in a new component to the exporterhelper called the concurrency sender. This component is meant to limit the concurrency of RPC's independent from the queue sender's num_consumers. This functionality is desirable because the queue_sender does not apply backpressure, so the concurrency_sender can limit requests based on a configured concurrency limit. Additionally, the batch_sender can generate multiple batches from a request that are exported sequentially, so the concurrency_sender provides a way to export these batches in parallel.

This PR also changes the requestSender interface

type requestSender interface {
	component.Component
	send(context.Context, ...Request) error
	setNextSender(nextSender requestSender)
}

This allows us to optionally send multiple requests, which will allow batch producers to receive any errors associated with any of their exported batches.

Link to tracking issue

Related to #10368

Testing

Added unit tests and performed local testing to ensure concurrency works and errors are propagated back to batch producers

Copy link
Contributor

@jmacd jmacd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach looks 💯 to me, thanks @moh-osman3.

exporter/exporterhelper/common.go Outdated Show resolved Hide resolved
@@ -21,7 +21,7 @@ import (
// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
type requestSender interface {
component.Component
send(context.Context, Request) error
send(context.Context, ...Request) error
Copy link
Member

@dmitryax dmitryax Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows us to optionally send multiple requests, which will allow batch producers to receive any errors associated with any of their exported batches.

Can you please elaborate on why this change is required with some examples? I don't understand why it's not possible without this change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! The main issue comes from returning multiple errors back to a single batch producer. e.g. let's say one request enters the batch_sender and is split into 50 batches. We can't return from sendMergeSplitBatch until we collect a response from all 50 batch exports. In the current implementation we sequentially export these batches waiting for the previous batch to return -- ideally these exports should be concurrent subject to a concurrency limit. To make this concurrent we want to avoid creating too many goroutines that exceeds the concurrency limit. This is reason we need to pass the full slice of requests to the concurrency_sender so it can limit the number of goroutines used for export

if bs.activeBatch.request != nil {
var err error
req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req)
req, err = bs.mergeFunc(ctx, bs.activeBatch.request, reqs[0])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we ignore other requests?

Copy link
Contributor Author

@moh-osman3 moh-osman3 Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh misunderstood this function and mistakenly thought it would only ever be called with len(reqs) == 1. I now realize this function is called when MaxBatchSize is not set so I am now merging all requests into a single request. Thanks for catching this

func WithConcurrency(config ConcurrencySettings) Option {
return func(o *baseExporter) error {
if !config.Enabled {
o.exportFailureMessage += " Try enabling the concurrency config option to send more requests in parallel."
Copy link
Member

@dmitryax dmitryax Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems misleading. Not sure how enabling concurrency limit can help with export errors. There is no limit on concurrency by default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified the config so this error is now removed

const defaultNumSenders = 10

type ConcurrencySettings struct {
Enabled bool `mapstructure:"enabled"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This option seems excessive. It's the same as if concurrency is set to any number other than 0.

We probably don't even need a configuration grouping here. Just one field in the exporter config similar to timeout, e.g.:

otlp:
  timeout: 10s
  concurrency: 100
  sending_queue:
   enabled: true
  retry_on_failure:
    enabled: true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, simplified my config as you suggested

@moh-osman3 moh-osman3 force-pushed the mohosman/exporterhelper-errors branch 2 times, most recently from 42dfdfc to de3f418 Compare July 9, 2024 07:56
if bs, ok := be.batchSender.(*batchSender); ok {
// If queue sender is enabled assign to the batch sender the same number of workers.
if qs, ok := be.queueSender.(*queueSender); ok {
bs.concurrencyLimit = int64(qs.numConsumers)
Copy link
Member

@dmitryax dmitryax Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The biggest problem with this approach is that we ignore the number of queue workers. It means that, typically, every batch would have to be stuck for the flush_timeout period before it can be sent further. That's why we have the concurrency limit tied to it. So, every time all the workers are stuck in one batch, we release them right away. And that behavior is gone with this PR.

Copy link
Member

@dmitryax dmitryax Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, it doesn't resolve the problem stated in #10368. Just adding an independent concurrency limit doesn't help with that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dmitryax Thanks for the feedback I've spent a bit of time thinking about options here so that the queue_sender consumers don't block when there are more items we can batch. I’m wondering if we can swap the order of the batch sender and the queue sender so that we are actually queueing batches instead of queuing the initial request that comes in? Or otherwise it seems that the queue sender could do some prebatching in the queue_sender (i.e. keep popping items from the queue until we have roughly a batch size to send to the batch_sender).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another possibility I explored was unblocking consumers by calling consumeFunc in a goroutine so that the batch_sender isn't blocked for the flush timeout and more requests can be consumed. However this might not work for the persistent queue which needs to know when consumeFunc returns so it can mark items for deletion from the persistent queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any thoughts on these options?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few problems with putting the queue before the batcher:

  • The queue is what adds asynchronous behavior to exporters. If it's not enabled, the batcher is expected to concurrently block the incoming requests.
  • Persistent queue provides durability. A crashed collector is expected to not lose any data. If we put another memory queue in front of batching, it breaks that promise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another possibility I explored was unblocking consumers by calling consumeFunc in a goroutine so that the batch_sender isn't blocked for the flush timeout and more requests can be consumed. However this might not work for the persistent queue which needs to know when consumeFunc returns so it can mark items for deletion from the persistent queue.

I've been coming to this conclusion. I'm imagining it would be something like the following:

  • batcher reads, but does not remove, records from the queue until it has a batch of min size
  • batcher sends the batch to the next consumer in the chain -- this could be the concurrency sender
  • on success, batcher removes the records from the queue

Perhaps there could be an option on the batcher to remove records from the queue optimistically, for users that prefer at most once vs. at least once delivery (default being at least once, for durability).

This is not trivial of course - it would mean a substantial change to the queue API.

@moh-osman3 moh-osman3 force-pushed the mohosman/exporterhelper-errors branch from ef787ea to 5432c27 Compare August 5, 2024 09:03
Copy link
Contributor

This PR was marked stale due to lack of activity. It will be closed in 14 days.

@github-actions github-actions bot added the Stale label Aug 20, 2024
Copy link
Contributor

github-actions bot commented Sep 4, 2024

Closed as inactive. Feel free to reopen if this PR is still being worked on.

@github-actions github-actions bot closed this Sep 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants