Skip to content

Commit

Permalink
fix issue 971 (#973)
Browse files Browse the repository at this point in the history
* fix issue 971

* fix int => int32

* fix TestMakeBrokersOneMissing

* update documentation
  • Loading branch information
Achille authored Aug 29, 2022
1 parent fcb5875 commit ab4d8da
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 9 deletions.
14 changes: 10 additions & 4 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,11 +1017,17 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err
}

func makeBrokers(brokers map[int32]Broker, ids ...int32) []Broker {
b := make([]Broker, 0, len(ids))
for _, id := range ids {
if br, ok := brokers[id]; ok {
b = append(b, br)
b := make([]Broker, len(ids))
for i, id := range ids {
br, ok := brokers[id]
if !ok {
// When the broker id isn't found in the current list of known
// brokers, use a placeholder to report that the cluster has
// logical knowledge of the broker but no information about the
// physical host where it is running.
br.ID = int(id)
}
b[i] = br
}
return b
}
Expand Down
13 changes: 8 additions & 5 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,16 +1353,19 @@ func TestMakeBrokersAllPresent(t *testing.T) {
func TestMakeBrokersOneMissing(t *testing.T) {
brokers := make(map[int32]Broker)
brokers[1] = Broker{ID: 1, Host: "203.0.113.101", Port: 9092}
brokers[3] = Broker{ID: 1, Host: "203.0.113.103", Port: 9092}
brokers[3] = Broker{ID: 3, Host: "203.0.113.103", Port: 9092}

b := makeBrokers(brokers, 1, 2, 3)
if len(b) != 2 {
t.Errorf("Expected 2 brokers, got %d", len(b))
if len(b) != 3 {
t.Errorf("Expected 3 brokers, got %d", len(b))
}
if b[0] != brokers[1] {
t.Errorf("Expected broker 1 at index 0, got %d", b[0].ID)
}
if b[1] != brokers[3] {
t.Errorf("Expected broker 3 at index 1, got %d", b[1].ID)
if b[1] != (Broker{ID: 2}) {
t.Errorf("Expected broker 2 at index 1, got %d", b[1].ID)
}
if b[2] != brokers[3] {
t.Errorf("Expected broker 3 at index 1, got %d", b[2].ID)
}
}
5 changes: 5 additions & 0 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type Partition struct {
ID int

// Leader, replicas, and ISR for the partition.
//
// When no physical host is known to be running a broker, the Host and Port
// fields will be set to the zero values. The logical broker ID is always
// set to the value known to the kafka cluster, even if the broker is not
// currently backed by a physical host.
Leader Broker
Replicas []Broker
Isr []Broker
Expand Down

0 comments on commit ab4d8da

Please sign in to comment.