From 174188e6872cba89043c70b2d54e9b913c3ffacc Mon Sep 17 00:00:00 2001 From: Vamsee Lakamsani Date: Sat, 30 Jul 2022 10:57:01 -0700 Subject: [PATCH] Updated Writer and Reader docs to show use of multiple brokers. (#959) Based on the discussion [here](https://github.com/segmentio/kafka-go/issues/894#issuecomment-1199788686) --- .gitignore | 3 +++ README.md | 63 +++++++++++++++++++++++++++--------------------------- writer.go | 4 ++-- 3 files changed, 37 insertions(+), 33 deletions(-) diff --git a/.gitignore b/.gitignore index 8ade866c..f8b4085e 100644 --- a/.gitignore +++ b/.gitignore @@ -33,5 +33,8 @@ _testmain.go # Goland .idea +#IntelliJ +*.iml + # govendor /vendor/*/ diff --git a/README.md b/README.md index eaff0fdb..20f20e68 100644 --- a/README.md +++ b/README.md @@ -222,7 +222,7 @@ process shutdown. ```go // make a new reader that consumes from topic-A, partition 0, at offset 42 r := kafka.NewReader(kafka.ReaderConfig{ - Brokers: []string{"localhost:9092"}, + Brokers: []string{"localhost:9092","localhost:9093", "localhost:9094"}, Topic: "topic-A", Partition: 0, MinBytes: 10e3, // 10KB @@ -253,7 +253,7 @@ ReadMessage automatically commits offsets when using consumer groups. ```go // make a new reader that consumes from topic-A r := kafka.NewReader(kafka.ReaderConfig{ - Brokers: []string{"localhost:9092"}, + Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", Topic: "topic-A", MinBytes: 10e3, // 10KB @@ -317,7 +317,7 @@ by setting CommitInterval on the ReaderConfig. ```go // make a new reader that consumes from topic-A r := kafka.NewReader(kafka.ReaderConfig{ - Brokers: []string{"localhost:9092"}, + Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", Topic: "topic-A", MinBytes: 10e3, // 10KB @@ -342,7 +342,7 @@ to use in most cases as it provides additional features: ```go // make a writer that produces to topic-A, using the least-bytes distribution w := &kafka.Writer{ - Addr: kafka.TCP("localhost:9092"), + Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: &kafka.LeastBytes{}, } @@ -376,7 +376,7 @@ if err := w.Close(); err != nil { // Make a writer that publishes messages to topic-A. // The topic will be created if it is missing. w := &Writer{ - Addr: TCP("localhost:9092"), + Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", AllowAutoTopicCreation: true, } @@ -427,7 +427,7 @@ the topic on a per-message basis by setting `Message.Topic`. ```go w := &kafka.Writer{ - Addr: kafka.TCP("localhost:9092"), + Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), // NOTE: When Topic is not defined here, each Message must define it instead. Balancer: &kafka.LeastBytes{}, } @@ -478,7 +478,7 @@ aforementioned Sarama partitioners would route them to. ```go w := &kafka.Writer{ - Addr: kafka.TCP("localhost:9092"), + Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: &kafka.Hash{}, } @@ -491,7 +491,7 @@ default ```consistent_random``` partition strategy. ```go w := &kafka.Writer{ - Addr: kafka.TCP("localhost:9092"), + Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: kafka.CRC32Balancer{}, } @@ -505,7 +505,7 @@ the partition which is not permitted. ```go w := &kafka.Writer{ - Addr: kafka.TCP("localhost:9092"), + Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: kafka.Murmur2Balancer{}, } @@ -517,7 +517,7 @@ Compression can be enabled on the `Writer` by setting the `Compression` field: ```go w := &kafka.Writer{ - Addr: kafka.TCP("localhost:9092"), + Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Compression: kafka.Snappy, } @@ -559,7 +559,7 @@ dialer := &kafka.Dialer{ } r := kafka.NewReader(kafka.ReaderConfig{ - Brokers: []string{"localhost:9093"}, + Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", Topic: "topic-A", Dialer: dialer, @@ -568,6 +568,20 @@ r := kafka.NewReader(kafka.ReaderConfig{ ### Writer + +Direct Writer creation + +```go +w := kafka.Writer{ + Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), + Topic: "topic-A", + Balancer: &kafka.Hash{}, + Transport: &kafka.Transport{ + TLS: &tls.Config{}, + }, + } +``` + Using `kafka.NewWriter` ```go @@ -578,26 +592,13 @@ dialer := &kafka.Dialer{ } w := kafka.NewWriter(kafka.WriterConfig{ - Brokers: []string{"localhost:9093"}, + Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, Topic: "topic-A", Balancer: &kafka.Hash{}, Dialer: dialer, }) ``` - -Direct Writer creation - -```go -w := kafka.Writer{ - Addr: kafka.TCP("localhost:9093"), - Topic: "topic-A", - Balancer: &kafka.Hash{}, - Transport: &kafka.Transport{ - TLS: &tls.Config{}, - }, - } - -``` +Note that `kafka.NewWriter` and `kafka.WriterConfig` are deprecated and will be removed in a future release. ## SASL Support @@ -654,7 +655,7 @@ dialer := &kafka.Dialer{ } r := kafka.NewReader(kafka.ReaderConfig{ - Brokers: []string{"localhost:9093"}, + Brokers: []string{"localhost:9092","localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", Topic: "topic-A", Dialer: dialer, @@ -677,7 +678,7 @@ sharedTransport := &kafka.Transport{ } w := kafka.Writer{ - Addr: kafka.TCP("localhost:9092"), + Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: &kafka.Hash{}, Transport: sharedTransport, @@ -700,7 +701,7 @@ sharedTransport := &kafka.Transport{ } client := &kafka.Client{ - Addr: kafka.TCP("localhost:9092"), + Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Timeout: 10 * time.Second, Transport: sharedTransport, } @@ -714,7 +715,7 @@ endTime := time.Now() batchSize := int(10e6) // 10MB r := kafka.NewReader(kafka.ReaderConfig{ - Brokers: []string{"localhost:9092"}, + Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, Topic: "my-topic1", Partition: 0, MinBytes: batchSize, @@ -756,7 +757,7 @@ func logf(msg string, a ...interface{}) { } r := kafka.NewReader(kafka.ReaderConfig{ - Brokers: []string{"localhost:9092"}, + Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, Topic: "my-topic1", Partition: 0, Logger: kafka.LoggerFunc(logf), diff --git a/writer.go b/writer.go index 02e3103e..8d48e95c 100644 --- a/writer.go +++ b/writer.go @@ -29,7 +29,7 @@ import ( // // // Construct a synchronous writer (the default mode). // w := &kafka.Writer{ -// Addr: kafka.TCP("localhost:9092"), +// Addr: Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), // Topic: "topic-A", // RequiredAcks: kafka.RequireAll, // } @@ -55,7 +55,7 @@ import ( // writer to receive notifications of messages being written to kafka: // // w := &kafka.Writer{ -// Addr: kafka.TCP("localhost:9092"), +// Addr: Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), // Topic: "topic-A", // RequiredAcks: kafka.RequireAll, // Async: true, // make the writer asynchronous