Skip to content

Commit

Permalink
feat(reader): handle io.ErrNoProgress more gracefully (#941)
Browse files Browse the repository at this point in the history
  • Loading branch information
dominicbarnes authored Sep 9, 2022
1 parent ba6f442 commit 8f063ce
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,14 @@ func (r *reader) run(ctx context.Context, offset int64) {
errcount = 0
continue

case errors.Is(err, io.ErrNoProgress):
// This error is returned by the Conn when it believes the connection
// has been corrupted, so we need to explicitly close it. Since we are
// explicitly handling it and a retry will pick up, we can suppress the
// error metrics and logs for this case.
conn.Close()
break readLoop

case errors.Is(err, UnknownTopicOrPartition):
r.withErrorLogger(func(log Logger) {
log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, toHumanOffset(offset), r.brokers)
Expand Down

0 comments on commit 8f063ce

Please sign in to comment.