Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka replay speed: add alert for when we miss records in Kafka #9921

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions development/mimir-ingest-storage/config/mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ ingest_storage:
topic: mimir-ingest
last_produced_offset_poll_interval: 500ms
startup_fetch_concurrency: 15
startup_records_per_fetch: 2400
ongoing_fetch_concurrency: 2
ongoing_records_per_fetch: 30

ingester:
track_ingester_owned_series: true
Expand Down
4 changes: 2 additions & 2 deletions development/mimir-ingest-storage/docker-compose.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ std.manifestYamlDoc({
'-ingester.ring.prefix=exclusive-prefix',
'-ingest-storage.kafka.consume-from-position-at-startup=end',
'-ingest-storage.kafka.consume-from-timestamp-at-startup=0',
'-ingest-storage.kafka.ingestion-concurrency=2',
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
'-ingest-storage.kafka.ingestion-concurrency-batch-size=150',
'-ingest-storage.kafka.startup-fetch-concurrency=15',
'-ingest-storage.kafka.ongoing-fetch-concurrency=2',
'-ingest-storage.kafka.ingestion-concurrency-max=2',
'-ingest-storage.kafka.ingestion-concurrency-batch-size=150',
],
extraVolumes: ['.data-mimir-write-zone-c-61:/data:delegated'],
}),
Expand Down
2 changes: 1 addition & 1 deletion development/mimir-ingest-storage/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@
"command":
- "sh"
- "-c"
- "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=end -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.ingestion-concurrency=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150 -ingest-storage.kafka.startup-fetch-concurrency=15 -ingest-storage.kafka.ongoing-fetch-concurrency=2"
- "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=end -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.startup-fetch-concurrency=15 -ingest-storage.kafka.ongoing-fetch-concurrency=2 -ingest-storage.kafka.ingestion-concurrency-max=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150"
"depends_on":
"kafka_1":
"condition": "service_healthy"
Expand Down
18 changes: 18 additions & 0 deletions docs/sources/mimir/manage/mimir-runbooks/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,24 @@ How to **investigate**:
- If the call exists and it's waiting on a lock then there may be a deadlock.
- If the call doesn't exist then it could either mean processing is not stuck (false positive) or the `pushToStorage` wasn't called at all, and so you should investigate the callers in the code.

### MimirIngesterMissedRecordsFromKafka

This alert fires when an ingester has missed processing some records from Kafka. In other words, there has been a gap in offsets.

How it **works**:

- Ingester reads records from Kafka, and processes them sequentially. It keeps track of the offset of the last record it processed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Ingester reads records from Kafka, and processes them sequentially. It keeps track of the offset of the last record it processed.
- The ingester reads records from Kafka and processes them sequentially. It keeps track of the offset of the last record it's processed.

- Upon fetching the next batch of records, it checks if the first available record has an offset one greater than the last processed offset. If the first available offset is larger than that, then the ingester has missed some records.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Upon fetching the next batch of records, it checks if the first available record has an offset one greater than the last processed offset. If the first available offset is larger than that, then the ingester has missed some records.
- Upon fetching the next batch of records, it checks if the first available record has an offset of one greater than the last processed offset. If the first available offset is larger than that, then the ingester has missed some records.

- Kafka doesn't guarantee sequential offsets. If a record has been manually deleted from Kafka or the records have been produced in a transaction and the transaction was aborted, then there may be a gap.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Kafka doesn't guarantee sequential offsets. If a record has been manually deleted from Kafka or the records have been produced in a transaction and the transaction was aborted, then there may be a gap.
- Kafka doesn't guarantee sequential offsets. If a record has been manually deleted from Kafka or if the records have been produced in a transaction and the transaction was aborted, then there may be a gap.

- Mimir doesn't produce in transactions and does not delete records.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Mimir doesn't produce in transactions" reads unclear to me. Is the "in" supposed to be here?

- When the ingester starts up, it will attempt to resume from the last offset it processed. If the ingester has been unavailable for long enough that the next record is already removed due to retention, then the ingester will miss some records.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- When the ingester starts up, it will attempt to resume from the last offset it processed. If the ingester has been unavailable for long enough that the next record is already removed due to retention, then the ingester will miss some records.
- When the ingester starts, it attempts to resume from the last offset it processed. If the ingester has been unavailable for long enough that the next record is already removed due to retention, then the ingester misses some records.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We avoid using future tense in the docs.


How to **investigate**:

- Verify that there have been no deleted records in your Kafka cluster by humans or other applications.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can probably remove "by humans or other applications".

- Verify that the ingester hasn't been down for longer than the retention on the Kafka partition.
- Report a bug.

### MimirStrongConsistencyEnforcementFailed

This alert fires when too many read requests with strong consistency are failing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,15 @@ spec:
for: 5m
labels:
severity: critical
- alert: MimirIngesterMissedRecordsFromKafka
annotations:
message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} missed processing records from Kafka. There may be data loss.
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringestermissedrecordsfromkafka
expr: |
# Alert if the ingester missed some records from Kafka.
increase(cortex_ingest_storage_reader_missed_records_total[10m]) > 0
labels:
severity: critical
- alert: MimirStrongConsistencyEnforcementFailed
annotations:
message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to enforce strong-consistency on read-path.
Expand Down
9 changes: 9 additions & 0 deletions operations/mimir-mixin-compiled-baremetal/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,15 @@ groups:
for: 5m
labels:
severity: critical
- alert: MimirIngesterMissedRecordsFromKafka
annotations:
message: Mimir {{ $labels.instance }} in {{ $labels.cluster }}/{{ $labels.namespace }} missed processing records from Kafka. There may be data loss.
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringestermissedrecordsfromkafka
expr: |
# Alert if the ingester missed some records from Kafka.
increase(cortex_ingest_storage_reader_missed_records_total[10m]) > 0
labels:
severity: critical
- alert: MimirStrongConsistencyEnforcementFailed
annotations:
message: Mimir {{ $labels.instance }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to enforce strong-consistency on read-path.
Expand Down
9 changes: 9 additions & 0 deletions operations/mimir-mixin-compiled/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,15 @@ groups:
for: 5m
labels:
severity: critical
- alert: MimirIngesterMissedRecordsFromKafka
annotations:
message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} missed processing records from Kafka. There may be data loss.
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringestermissedrecordsfromkafka
expr: |
# Alert if the ingester missed some records from Kafka.
increase(cortex_ingest_storage_reader_missed_records_total[10m]) > 0
labels:
severity: critical
- alert: MimirStrongConsistencyEnforcementFailed
annotations:
message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to enforce strong-consistency on read-path.
Expand Down
15 changes: 15 additions & 0 deletions operations/mimir-mixin/alerts/ingest-storage.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,21 @@
},
},

// Alert firing is an ingester is reading from Kafka, there are buffered records to process, but processing is stuck.
{
alert: $.alertName('IngesterMissedRecordsFromKafka'),
expr: |||
# Alert if the ingester missed some records from Kafka.
increase(cortex_ingest_storage_reader_missed_records_total[%s]) > 0
||| % $.alertRangeInterval(10),
labels: {
severity: 'critical',
},
annotations: {
message: '%(product)s {{ $labels.%(per_instance_label)s }} in %(alert_aggregation_variables)s missed processing records from Kafka. There may be data loss.' % $._config,
},
},

// Alert firing if Mimir is failing to enforce strong read consistency.
{
alert: $.alertName('StrongConsistencyEnforcementFailed'),
Expand Down
49 changes: 43 additions & 6 deletions pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
Expand Down Expand Up @@ -227,7 +228,7 @@ type concurrentFetchers struct {
// ordering.
orderedFetches chan fetchResult

lastReturnedRecord int64
lastReturnedOffset int64
startOffsets *genericOffsetReader[int64]

// trackCompressedBytes controls whether to calculate MaxBytes for fetch requests based on previous responses' compressed or uncompressed bytes.
Expand Down Expand Up @@ -283,7 +284,7 @@ func newConcurrentFetchers(
partitionID: partition,
metrics: metrics,
minBytesWaitTime: minBytesWaitTime,
lastReturnedRecord: startOffset - 1,
lastReturnedOffset: startOffset - 1,
startOffsets: startOffsetsReader,
trackCompressedBytes: trackCompressedBytes,
maxBufferedBytesLimit: maxBufferedBytesLimit,
Expand Down Expand Up @@ -343,7 +344,7 @@ func (r *concurrentFetchers) Stop() {
r.bufferedFetchedRecords.Store(0)
r.bufferedFetchedBytes.Store(0)

level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord)
level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_offset", r.lastReturnedOffset)
}

// Update implements fetcher
Expand All @@ -352,7 +353,7 @@ func (r *concurrentFetchers) Update(ctx context.Context, concurrency int) {
r.done = make(chan struct{})

r.wg.Add(1)
go r.start(ctx, r.lastReturnedRecord+1, concurrency)
go r.start(ctx, r.lastReturnedOffset+1, concurrency)
}

// PollFetches implements fetcher
Expand All @@ -369,12 +370,13 @@ func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, cont
// PollFetches() calls).
r.bufferedFetchedRecords.Sub(int64(len(f.Records)))

firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord)
firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedOffset)
r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime)

f.Records = f.Records[firstUnreturnedRecordIdx:]
if len(f.Records) > 0 {
r.lastReturnedRecord = f.Records[len(f.Records)-1].Offset
instrumentGaps(findGapsInRecords(f.Records, r.lastReturnedOffset), r.metrics.missedRecords, r.logger)
r.lastReturnedOffset = f.Records[len(f.Records)-1].Offset
}

return kgo.Fetches{{
Expand All @@ -388,6 +390,41 @@ func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, cont
}
}

func instrumentGaps(gaps []offsetRange, records prometheus.Counter, logger log.Logger) {
for _, gap := range gaps {
level.Error(logger).Log(
"msg", "there is a gap in consumed offsets; it is likely that there was data loss; see runbook for MimirIngesterMissedRecordsFromKafka",
"gap_start_inclusive", gap.start,
"gap_end_inclusive", gap.end,
)
records.Add(float64(gap.numOffsets()))
level.Error(logger).Log("msg", "found gap in records", "start", gap.start, "end", gap.end)
}
}

type offsetRange struct {
// start is inclusive
start int64

// end is exclusive
end int64
}

func (g offsetRange) numOffsets() int64 {
return g.end - g.start
}

func findGapsInRecords(records []*kgo.Record, lastReturnedOffset int64) []offsetRange {
var gaps []offsetRange
for _, r := range records {
if r.Offset != lastReturnedOffset+1 {
gaps = append(gaps, offsetRange{start: lastReturnedOffset + 1, end: r.Offset})
}
lastReturnedOffset = r.Offset
}
return gaps
}

func recordIndexAfterOffset(records []*kgo.Record, offset int64) int {
for i, r := range records {
if r.Offset > offset {
Expand Down
11 changes: 11 additions & 0 deletions pkg/storage/ingest/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"math"
"strings"
"sync"
"testing"
"time"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/test"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kerr"
Expand Down Expand Up @@ -1149,6 +1151,15 @@ func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Cli
reg := prometheus.NewPedanticRegistry()
metrics := newReaderMetrics(partition, reg, noopReaderMetricsSource{})

t.Cleanup(func() {
// Assuming none of the tests intentionally create gaps in offsets, there should be no missed records.
assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingest_storage_reader_missed_records_total The number of offsets that were never consumed by the reader because they weren't fetched.
# TYPE cortex_ingest_storage_reader_missed_records_total counter
cortex_ingest_storage_reader_missed_records_total 0
`), "cortex_ingest_storage_reader_missed_records_total"))
})

// This instantiates the fields of kprom.
// This is usually done by franz-go, but since now we use the metrics ourselves, we need to instantiate the metrics ourselves.
metrics.kprom.OnNewClient(client)
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,7 @@ type readerMetrics struct {
lastConsumedOffset prometheus.Gauge
consumeLatency prometheus.Histogram
kprom *kprom.Metrics
missedRecords prometheus.Counter
}

type readerMetricsSource interface {
Expand Down Expand Up @@ -1083,6 +1084,10 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc
strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg),
lastConsumedOffset: lastConsumedOffset,
kprom: NewKafkaReaderClientMetrics(component, reg),
missedRecords: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_missed_records_total",
Help: "The number of offsets that were never consumed by the reader because they weren't fetched.",
}),
}

m.Service = services.NewTimerService(100*time.Millisecond, nil, func(context.Context) error {
Expand Down