Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter] Flip on queue batcher #11637

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .chloggen/11637-exporter-queue-batcher.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterqueue

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Change exporter queue batching to use a pulling model.

# One or more tracking issues or pull requests related to the change
issues: [8122, 10368]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
If both queuing and batching is enabled for exporter, we now use a pulling model instead of a
pushing model. num_consumer in queue configuration is now used to specify the maximum number of
concurrent workers that are sending out the request.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 2 additions & 0 deletions exporter/debugexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
Expand All @@ -45,6 +46,7 @@ require (
go.opentelemetry.io/collector/consumer/consumertest v0.114.0 // indirect
go.opentelemetry.io/collector/extension v0.114.0 // indirect
go.opentelemetry.io/collector/extension/experimental/storage v0.114.0 // indirect
go.opentelemetry.io/collector/featuregate v1.19.0 // indirect
go.opentelemetry.io/collector/pipeline v0.114.0 // indirect
go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.114.0 // indirect
go.opentelemetry.io/collector/receiver v0.114.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions exporter/debugexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions exporter/exporterhelper/exporterhelperprofiles/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.114.0 // indirect
go.opentelemetry.io/collector/extension v0.114.0 // indirect
go.opentelemetry.io/collector/extension/experimental/storage v0.114.0 // indirect
go.opentelemetry.io/collector/featuregate v1.19.0 // indirect
go.opentelemetry.io/collector/pdata v1.20.0 // indirect
go.opentelemetry.io/collector/pipeline v0.114.0 // indirect
go.opentelemetry.io/collector/receiver v0.114.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions exporter/exporterhelper/exporterhelperprofiles/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,17 @@ import (
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterqueue" // BaseExporter contains common fields between different exporter types.
"go.opentelemetry.io/collector/exporter/internal"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pipeline"
)

var usePullingBasedExporterQueueBatcher = featuregate.GlobalRegistry().MustRegister(
"telemetry.UsePullingBasedExporterQueueBatcher",
featuregate.StageBeta,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why starting with Beta? That sounds too aggressive. Let's start with Alpha

featuregate.WithRegisterFromVersion("v0.114.0"),
featuregate.WithRegisterDescription("if set to true, turns on the pulling-based exporter queue bathcer"),
)

type ObsrepSenderFactory = func(obsrep *ObsReport) RequestSender

// Option apply changes to BaseExporter.
Expand Down Expand Up @@ -94,13 +102,14 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
ExporterSettings: be.Set,
},
be.queueCfg)
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep)
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg)
for _, op := range options {
err = multierr.Append(err, op(be))
}
}

if be.BatcherCfg.Enabled {
if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled ||
usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
bs := NewBatchSender(be.BatcherCfg, be.Set)
be.BatchSender = bs
}
Expand Down
100 changes: 0 additions & 100 deletions exporter/exporterhelper/internal/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func TestBatchSender_BatchExportError(t *testing.T) {
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == tt.expectedRequests &&
sink.itemsCount.Load() == tt.expectedItems &&
be.BatchSender.(*BatchSender).activeRequests.Load() == 0 &&
be.QueueSender.(*QueueSender).queue.Size() == 0
}, 100*time.Millisecond, 10*time.Millisecond)
})
Expand Down Expand Up @@ -272,105 +271,6 @@ func TestBatchSender_PostShutdown(t *testing.T) {
assert.Equal(t, int64(8), sink.itemsCount.Load())
}

func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10810")
}
tests := []struct {
name string
batcherCfg exporterbatcher.Config
expectedRequests int64
expectedItems int64
}{
{
name: "merge_only",
batcherCfg: func() exporterbatcher.Config {
cfg := exporterbatcher.NewDefaultConfig()
cfg.FlushTimeout = 20 * time.Millisecond
return cfg
}(),
expectedRequests: 6,
expectedItems: 51,
},
{
name: "merge_without_split_triggered",
batcherCfg: func() exporterbatcher.Config {
cfg := exporterbatcher.NewDefaultConfig()
cfg.FlushTimeout = 20 * time.Millisecond
cfg.MaxSizeItems = 200
return cfg
}(),
expectedRequests: 6,
expectedItems: 51,
},
{
name: "merge_with_split_triggered",
batcherCfg: func() exporterbatcher.Config {
cfg := exporterbatcher.NewDefaultConfig()
cfg.FlushTimeout = 50 * time.Millisecond
cfg.MaxSizeItems = 10
return cfg
}(),
expectedRequests: 8,
expectedItems: 51,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
qCfg := exporterqueue.NewDefaultConfig()
qCfg.NumConsumers = 2
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(tt.batcherCfg),
WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]()))
require.NotNil(t, be)
require.NoError(t, err)
assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})

sink := newFakeRequestSink()
// the 1st and 2nd request should be flushed in the same batched request by max concurrency limit.
assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink}))
assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink}))

assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 4
}, 100*time.Millisecond, 10*time.Millisecond)

// the 3rd request should be flushed by itself due to flush interval
require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 6
}, 100*time.Millisecond, 10*time.Millisecond)

// the 4th and 5th request should be flushed in the same batched request by max concurrency limit.
assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink}))
assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 10
}, 100*time.Millisecond, 10*time.Millisecond)

// do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling.
assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 5, sink: sink}))
assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 6, sink: sink}))
if tt.batcherCfg.MaxSizeItems == 10 {
// in case of MaxSizeItems=10, wait for the leftover request to send
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 21
}, 50*time.Millisecond, 10*time.Millisecond)
}

assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink}))
assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 6, sink: sink}))
assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 20, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == tt.expectedRequests && sink.itemsCount.Load() == tt.expectedItems
}, 100*time.Millisecond, 10*time.Millisecond)
})
}
}

func TestBatchSender_BatchBlocking(t *testing.T) {
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.MinSizeItems = 3
Expand Down
52 changes: 41 additions & 11 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal"
"go.opentelemetry.io/collector/exporter/internal/queue"
Expand Down Expand Up @@ -71,30 +72,49 @@
queue exporterqueue.Queue[internal.Request]
numConsumers int
traceAttribute attribute.KeyValue
batcher queue.Batcher
consumers *queue.Consumers[internal.Request]

obsrep *ObsReport
exporterID component.ID
}

func NewQueueSender(q exporterqueue.Queue[internal.Request], set exporter.Settings, numConsumers int,
exportFailureMessage string, obsrep *ObsReport) *QueueSender {
func NewQueueSender(
q exporterqueue.Queue[internal.Request],
set exporter.Settings,
numConsumers int,
exportFailureMessage string,
obsrep *ObsReport,
batcherCfg exporterbatcher.Config) *QueueSender {
qs := &QueueSender{
queue: q,
numConsumers: numConsumers,
traceAttribute: attribute.String(ExporterKey, set.ID.String()),
obsrep: obsrep,
exporterID: set.ID,
}
consumeFunc := func(ctx context.Context, req internal.Request) error {
err := qs.NextSender.Send(ctx, req)
if err != nil {
set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))

if usePullingBasedExporterQueueBatcher.IsEnabled() {
exportFunc := func(ctx context.Context, req internal.Request) error {
err := qs.NextSender.Send(ctx, req)
if err != nil {
set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))
}
return err
}
return err
qs.batcher, _ = queue.NewBatcher(batcherCfg, q, exportFunc, numConsumers)
} else {
consumeFunc := func(ctx context.Context, req internal.Request) error {
err := qs.NextSender.Send(ctx, req)
if err != nil {
set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))
}
return err

Check warning on line 114 in exporter/exporterhelper/internal/queue_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue_sender.go#L107-L114

Added lines #L107 - L114 were not covered by tests
}
qs.consumers = queue.NewQueueConsumers[internal.Request](q, numConsumers, consumeFunc)

Check warning on line 116 in exporter/exporterhelper/internal/queue_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue_sender.go#L116

Added line #L116 was not covered by tests
}
qs.consumers = queue.NewQueueConsumers[internal.Request](q, numConsumers, consumeFunc)
return qs
}

Expand All @@ -103,8 +123,15 @@
if err := qs.queue.Start(ctx, host); err != nil {
return err
}
if err := qs.consumers.Start(ctx, host); err != nil {
return err

if usePullingBasedExporterQueueBatcher.IsEnabled() {
if err := qs.batcher.Start(ctx, host); err != nil {
return err
}
} else {
if err := qs.consumers.Start(ctx, host); err != nil {
return err
}

Check warning on line 134 in exporter/exporterhelper/internal/queue_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue_sender.go#L129-L134

Added lines #L129 - L134 were not covered by tests
}

dataTypeAttr := attribute.String(DataTypeKey, qs.obsrep.Signal.String())
Expand All @@ -123,6 +150,9 @@
if err := qs.queue.Shutdown(ctx); err != nil {
return err
}
if usePullingBasedExporterQueueBatcher.IsEnabled() {
return qs.batcher.Shutdown(ctx)
}
return qs.consumers.Shutdown(ctx)
}

Expand Down
5 changes: 3 additions & 2 deletions exporter/exporterhelper/internal/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal"
Expand Down Expand Up @@ -212,7 +213,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
require.NoError(t, err)

qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 0 // to make every request go straight to the queue
qCfg.NumConsumers = -1 // to make QueueMetricsReportedvery request go straight to the queue
rCfg := configretry.NewDefaultBackOffConfig()
set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
be, err := NewBaseExporter(set, dataType, newObservabilityConsumerSender,
Expand Down Expand Up @@ -437,6 +438,6 @@ func TestQueueSenderNoStartShutdown(t *testing.T) {
ExporterCreateSettings: exportertest.NewNopSettings(),
})
require.NoError(t, err)
qs := NewQueueSender(queue, set, 1, "", obsrep)
qs := NewQueueSender(queue, set, 1, "", obsrep, exporterbatcher.NewDefaultConfig())
assert.NoError(t, qs.Shutdown(context.Background()))
}
2 changes: 2 additions & 0 deletions exporter/exportertest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand All @@ -35,6 +36,7 @@ require (
go.opentelemetry.io/collector/consumer/consumerprofiles v0.114.0 // indirect
go.opentelemetry.io/collector/extension v0.114.0 // indirect
go.opentelemetry.io/collector/extension/experimental/storage v0.114.0 // indirect
go.opentelemetry.io/collector/featuregate v1.19.0 // indirect
go.opentelemetry.io/collector/receiver/receiverprofiles v0.114.0 // indirect
go.opentelemetry.io/otel v1.32.0 // indirect
go.opentelemetry.io/otel/metric v1.32.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions exporter/exportertest/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading