From 8f063ce560a7dc30cf6e8e6e3cdb9b1b235a7698 Mon Sep 17 00:00:00 2001 From: Dominic Barnes Date: Fri, 9 Sep 2022 10:50:01 -0700 Subject: [PATCH] feat(reader): handle io.ErrNoProgress more gracefully (#941) --- reader.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/reader.go b/reader.go index c83c2389..facaf709 100644 --- a/reader.go +++ b/reader.go @@ -1321,6 +1321,14 @@ func (r *reader) run(ctx context.Context, offset int64) { errcount = 0 continue + case errors.Is(err, io.ErrNoProgress): + // This error is returned by the Conn when it believes the connection + // has been corrupted, so we need to explicitly close it. Since we are + // explicitly handling it and a retry will pick up, we can suppress the + // error metrics and logs for this case. + conn.Close() + break readLoop + case errors.Is(err, UnknownTopicOrPartition): r.withErrorLogger(func(log Logger) { log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, toHumanOffset(offset), r.brokers)