Skip to content

Commit

Permalink
[chore] [exporterqueue] Add splitting to the experimental pull-based …
Browse files Browse the repository at this point in the history
…batcher (open-telemetry#11580)

#### Description

This PR follows
open-telemetry#11546 and
add support for splitting (i.e. support setting a maximum request size)

Design doc:

https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing

#### Link to tracking issue

open-telemetry#8122
open-telemetry#10368
  • Loading branch information
sfc-gh-sili authored and djaglowski committed Nov 21, 2024
1 parent d5e27f4 commit d6bb30b
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 48 deletions.
5 changes: 0 additions & 5 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"
"errors"
"sync"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -44,10 +43,6 @@ func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request],
}, nil
}

if batchCfg.MaxSizeConfig.MaxSizeItems != 0 {
return nil, errors.ErrUnsupported
}

return &DefaultBatcher{
BaseBatcher: BaseBatcher{
batchCfg: batchCfg,
Expand Down
87 changes: 63 additions & 24 deletions exporter/internal/queue/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/internal"
)

// DefaultBatcher continuously reads from the queue and flushes asynchronously if size limit is met or on timeout.
Expand Down Expand Up @@ -41,35 +42,73 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
}

qb.currentBatchMu.Lock()
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.resetTimer()
qb.currentBatch = &batch{
req: req,
ctx: ctx,
idxList: []uint64{idx}}
} else {
mergedReq, mergeErr := qb.currentBatch.req.Merge(qb.currentBatch.ctx, req)
if mergeErr != nil {
qb.queue.OnProcessingFinished(idx, mergeErr)

if qb.batchCfg.MaxSizeItems > 0 {
var reqList []internal.Request
var mergeSplitErr error
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.resetTimer()
reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, nil)
} else {
reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, req)
}

if mergeSplitErr != nil || reqList == nil {
qb.queue.OnProcessingFinished(idx, mergeSplitErr)
qb.currentBatchMu.Unlock()
continue
}
qb.currentBatch = &batch{
req: mergedReq,
ctx: qb.currentBatch.ctx,
idxList: append(qb.currentBatch.idxList, idx)}
}

if qb.currentBatch.req.ItemsCount() > qb.batchCfg.MinSizeItems {
batchToFlush := *qb.currentBatch
qb.currentBatch = nil
qb.currentBatchMu.Unlock()

// flushAsync() blocks until successfully started a goroutine for flushing.
qb.flushAsync(batchToFlush)
qb.resetTimer()
// If there was a split, we flush everything immediately.
if reqList[0].ItemsCount() >= qb.batchCfg.MinSizeItems || len(reqList) > 1 {
qb.currentBatch = nil
qb.currentBatchMu.Unlock()
for i := 0; i < len(reqList); i++ {
qb.flushAsync(batch{
req: reqList[i],
ctx: ctx,
idxList: []uint64{idx}})
// TODO: handle partial failure
}
qb.resetTimer()
} else {
qb.currentBatch = &batch{
req: reqList[0],
ctx: ctx,
idxList: []uint64{idx}}
qb.currentBatchMu.Unlock()
}
} else {
qb.currentBatchMu.Unlock()
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.resetTimer()
qb.currentBatch = &batch{
req: req,
ctx: ctx,
idxList: []uint64{idx}}
} else {
mergedReq, mergeErr := qb.currentBatch.req.Merge(qb.currentBatch.ctx, req)
if mergeErr != nil {
qb.queue.OnProcessingFinished(idx, mergeErr)
qb.currentBatchMu.Unlock()
continue
}
qb.currentBatch = &batch{
req: mergedReq,
ctx: qb.currentBatch.ctx,
idxList: append(qb.currentBatch.idxList, idx)}
}

if qb.currentBatch.req.ItemsCount() >= qb.batchCfg.MinSizeItems {
batchToFlush := *qb.currentBatch
qb.currentBatch = nil
qb.currentBatchMu.Unlock()

// flushAsync() blocks until successfully started a goroutine for flushing.
qb.flushAsync(batchToFlush)
qb.resetTimer()
} else {
qb.currentBatchMu.Unlock()
}
}
}
}()
Expand Down
84 changes: 69 additions & 15 deletions exporter/internal/queue/default_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"go.opentelemetry.io/collector/exporter/internal"
)

func TestDefaultBatcher_MinThresholdZero_TimeoutDisabled(t *testing.T) {
func TestDefaultBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) {
tests := []struct {
name string
maxWorkers int
Expand Down Expand Up @@ -75,7 +75,7 @@ func TestDefaultBatcher_MinThresholdZero_TimeoutDisabled(t *testing.T) {
}
}

func TestDefaultBatcher_TimeoutDisabled(t *testing.T) {
func TestDefaultBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
tests := []struct {
name string
maxWorkers int
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestDefaultBatcher_TimeoutDisabled(t *testing.T) {
}
}

func TestDefaultBatcher_WithTimeout(t *testing.T) {
func TestDefaultBatcher_NoSplit_WithTimeout(t *testing.T) {
tests := []struct {
name string
maxWorkers int
Expand Down Expand Up @@ -200,18 +200,72 @@ func TestDefaultBatcher_WithTimeout(t *testing.T) {
}
}

func TestDisabledBatcher_SplitNotImplemented(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = true
maxWorkers := 0
cfg.MaxSizeConfig.MaxSizeItems = 1
func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) {
tests := []struct {
name string
maxWorkers int
}{
{
name: "infinate_workers",
maxWorkers: 0,
},
{
name: "one_worker",
maxWorkers: 1,
},
{
name: "three_workers",
maxWorkers: 3,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = true
cfg.FlushTimeout = 0
cfg.MinSizeConfig = exporterbatcher.MinSizeConfig{
MinSizeItems: 100,
}
cfg.MaxSizeConfig = exporterbatcher.MaxSizeConfig{
MaxSizeItems: 100,
}

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})
q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

ba, err := NewBatcher(cfg, q, tt.maxWorkers)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, q.Shutdown(context.Background()))
require.NoError(t, ba.Shutdown(context.Background()))
})

sink := newFakeRequestSink()

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink}))

// This request will be dropped because of merge error
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, mergeErr: errors.New("transient error"), sink: sink}))

_, err := NewBatcher(cfg, q, maxWorkers)
require.Error(t, err)
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 35, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 2, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 30, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 105
}, 100*time.Millisecond, 10*time.Millisecond)

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 900, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 11 && sink.itemsCount.Load() == 1005
}, 100*time.Millisecond, 10*time.Millisecond)
})
}
}
51 changes: 47 additions & 4 deletions exporter/internal/queue/fake_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"
"errors"
"sync/atomic"
"time"

Expand Down Expand Up @@ -68,7 +67,51 @@ func (r *fakeRequest) Merge(_ context.Context,
}, nil
}

func (r *fakeRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig,
_ internal.Request) ([]internal.Request, error) {
return nil, errors.New("not implemented")
func (r *fakeRequest) MergeSplit(ctx context.Context, cfg exporterbatcher.MaxSizeConfig,
r2 internal.Request) ([]internal.Request, error) {
if r.mergeErr != nil {
return nil, r.mergeErr
}

maxItems := cfg.MaxSizeItems
if maxItems == 0 {
r, err := r.Merge(ctx, r2)
return []internal.Request{r}, err
}

var fr2 *fakeRequest
if r2 == nil {
fr2 = &fakeRequest{sink: r.sink, exportErr: r.exportErr, delay: r.delay}
} else {
if r2.(*fakeRequest).mergeErr != nil {
return nil, r2.(*fakeRequest).mergeErr
}
fr2 = r2.(*fakeRequest)
fr2 = &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}
}
var res []internal.Request
r = &fakeRequest{items: r.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}
if fr2.items <= maxItems-r.items {
r.items += fr2.items
if fr2.exportErr != nil {
r.exportErr = fr2.exportErr
}
return []internal.Request{r}, nil
}
// if split is needed, we don't propagate exportErr from fr2 to fr1 to test more cases
fr2.items -= maxItems - r.items
r.items = maxItems
res = append(res, r)

// split fr2 to maxItems
for {
if fr2.items <= maxItems {
res = append(res, &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay})
break
}
res = append(res, &fakeRequest{items: maxItems, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay})
fr2.items -= maxItems
}

return res, nil
}

0 comments on commit d6bb30b

Please sign in to comment.