diff --git a/deleteacls.go b/deleteacls.go new file mode 100644 index 00000000..64cbd26d --- /dev/null +++ b/deleteacls.go @@ -0,0 +1,114 @@ +package kafka + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/segmentio/kafka-go/protocol/deleteacls" +) + +// DeleteACLsRequest represents a request sent to a kafka broker to delete +// ACLs. +type DeleteACLsRequest struct { + // Address of the kafka broker to send the request to. + Addr net.Addr + + // List of ACL filters to use for deletion. + Filters []DeleteACLsFilter +} + +type DeleteACLsFilter struct { + ResourceTypeFilter ResourceType + ResourceNameFilter string + ResourcePatternTypeFilter PatternType + PrincipalFilter string + HostFilter string + Operation ACLOperationType + PermissionType ACLPermissionType +} + +// DeleteACLsResponse represents a response from a kafka broker to an ACL +// deletion request. +type DeleteACLsResponse struct { + // The amount of time that the broker throttled the request. + Throttle time.Duration + + // List of the results from the deletion request. + Results []DeleteACLsResult +} + +type DeleteACLsResult struct { + Error error + MatchingACLs []DeleteACLsMatchingACLs +} + +type DeleteACLsMatchingACLs struct { + Error error + ResourceType ResourceType + ResourceName string + ResourcePatternType PatternType + Principal string + Host string + Operation ACLOperationType + PermissionType ACLPermissionType +} + +// DeleteACLs sends ACLs deletion request to a kafka broker and returns the +// response. +func (c *Client) DeleteACLs(ctx context.Context, req *DeleteACLsRequest) (*DeleteACLsResponse, error) { + filters := make([]deleteacls.RequestFilter, 0, len(req.Filters)) + + for _, filter := range req.Filters { + filters = append(filters, deleteacls.RequestFilter{ + ResourceTypeFilter: int8(filter.ResourceTypeFilter), + ResourceNameFilter: filter.ResourceNameFilter, + ResourcePatternTypeFilter: int8(filter.ResourcePatternTypeFilter), + PrincipalFilter: filter.PrincipalFilter, + HostFilter: filter.HostFilter, + Operation: int8(filter.Operation), + PermissionType: int8(filter.PermissionType), + }) + } + + m, err := c.roundTrip(ctx, req.Addr, &deleteacls.Request{ + Filters: filters, + }) + if err != nil { + return nil, fmt.Errorf("kafka.(*Client).DeleteACLs: %w", err) + } + + res := m.(*deleteacls.Response) + + results := make([]DeleteACLsResult, 0, len(res.FilterResults)) + + for _, result := range res.FilterResults { + matchingACLs := make([]DeleteACLsMatchingACLs, 0, len(result.MatchingACLs)) + + for _, matchingACL := range result.MatchingACLs { + matchingACLs = append(matchingACLs, DeleteACLsMatchingACLs{ + Error: makeError(matchingACL.ErrorCode, matchingACL.ErrorMessage), + ResourceType: ResourceType(matchingACL.ResourceType), + ResourceName: matchingACL.ResourceName, + ResourcePatternType: PatternType(matchingACL.ResourcePatternType), + Principal: matchingACL.Principal, + Host: matchingACL.Host, + Operation: ACLOperationType(matchingACL.Operation), + PermissionType: ACLPermissionType(matchingACL.PermissionType), + }) + } + + results = append(results, DeleteACLsResult{ + Error: makeError(result.ErrorCode, result.ErrorMessage), + MatchingACLs: matchingACLs, + }) + } + + ret := &DeleteACLsResponse{ + Throttle: makeDuration(res.ThrottleTimeMs), + Results: results, + } + + return ret, nil +} diff --git a/deleteacls_test.go b/deleteacls_test.go new file mode 100644 index 00000000..299cede2 --- /dev/null +++ b/deleteacls_test.go @@ -0,0 +1,112 @@ +package kafka + +import ( + "context" + "testing" + + ktesting "github.com/segmentio/kafka-go/testing" + "github.com/stretchr/testify/assert" +) + +func TestClientDeleteACLs(t *testing.T) { + if !ktesting.KafkaIsAtLeast("2.0.1") { + return + } + + client, shutdown := newLocalClient() + defer shutdown() + + topic := makeTopic() + group := makeGroupID() + + createRes, err := client.CreateACLs(context.Background(), &CreateACLsRequest{ + ACLs: []ACLEntry{ + { + Principal: "User:alice", + PermissionType: ACLPermissionTypeAllow, + Operation: ACLOperationTypeRead, + ResourceType: ResourceTypeTopic, + ResourcePatternType: PatternTypeLiteral, + ResourceName: topic, + Host: "*", + }, + { + Principal: "User:bob", + PermissionType: ACLPermissionTypeAllow, + Operation: ACLOperationTypeRead, + ResourceType: ResourceTypeGroup, + ResourcePatternType: PatternTypeLiteral, + ResourceName: group, + Host: "*", + }, + }, + }) + if err != nil { + t.Fatal(err) + } + + for _, err := range createRes.Errors { + if err != nil { + t.Error(err) + } + } + + deleteResp, err := client.DeleteACLs(context.Background(), &DeleteACLsRequest{ + Filters: []DeleteACLsFilter{ + { + ResourceTypeFilter: ResourceTypeTopic, + ResourceNameFilter: topic, + ResourcePatternTypeFilter: PatternTypeLiteral, + Operation: ACLOperationTypeRead, + PermissionType: ACLPermissionTypeAllow, + }, + }, + }) + if err != nil { + t.Fatal(err) + } + + expectedDeleteResp := DeleteACLsResponse{ + Throttle: 0, + Results: []DeleteACLsResult{ + { + Error: makeError(0, ""), + MatchingACLs: []DeleteACLsMatchingACLs{ + { + Error: makeError(0, ""), + ResourceType: ResourceTypeTopic, + ResourceName: topic, + ResourcePatternType: PatternTypeLiteral, + Principal: "User:alice", + Host: "*", + Operation: ACLOperationTypeRead, + PermissionType: ACLPermissionTypeAllow, + }, + }, + }, + }, + } + + assert.Equal(t, expectedDeleteResp, *deleteResp) + + describeResp, err := client.DescribeACLs(context.Background(), &DescribeACLsRequest{ + Filter: ACLFilter{ + ResourceTypeFilter: ResourceTypeTopic, + ResourceNameFilter: topic, + ResourcePatternTypeFilter: PatternTypeLiteral, + Operation: ACLOperationTypeRead, + PermissionType: ACLPermissionTypeAllow, + }, + }) + if err != nil { + t.Fatal(err) + } + + expectedDescribeResp := DescribeACLsResponse{ + Throttle: 0, + Error: makeError(0, ""), + Resources: []ACLResource{}, + } + + assert.Equal(t, expectedDescribeResp, *describeResp) +} diff --git a/protocol/deleteacls/deleteacls.go b/protocol/deleteacls/deleteacls.go new file mode 100644 index 00000000..7f0f002f --- /dev/null +++ b/protocol/deleteacls/deleteacls.go @@ -0,0 +1,74 @@ +package deleteacls + +import "github.com/segmentio/kafka-go/protocol" + +func init() { + protocol.Register(&Request{}, &Response{}) +} + +type Request struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + Filters []RequestFilter `kafka:"min=v0,max=v3"` +} + +func (r *Request) ApiKey() protocol.ApiKey { return protocol.DeleteAcls } + +func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { + return cluster.Brokers[cluster.Controller], nil +} + +type RequestFilter struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + ResourceTypeFilter int8 `kafka:"min=v0,max=v3"` + ResourceNameFilter string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` + ResourcePatternTypeFilter int8 `kafka:"min=v1,max=v3"` + PrincipalFilter string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` + HostFilter string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` + Operation int8 `kafka:"min=v0,max=v3"` + PermissionType int8 `kafka:"min=v0,max=v3"` +} + +type Response struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + ThrottleTimeMs int32 `kafka:"min=v0,max=v3"` + FilterResults []FilterResult `kafka:"min=v0,max=v3"` +} + +func (r *Response) ApiKey() protocol.ApiKey { return protocol.DeleteAcls } + +type FilterResult struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + ErrorCode int16 `kafka:"min=v0,max=v3"` + ErrorMessage string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` + MatchingACLs []MatchingACL `kafka:"min=v0,max=v3"` +} + +type MatchingACL struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + ErrorCode int16 `kafka:"min=v0,max=v3"` + ErrorMessage string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` + ResourceType int8 `kafka:"min=v0,max=v3"` + ResourceName string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"` + ResourcePatternType int8 `kafka:"min=v1,max=v3"` + Principal string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"` + Host string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"` + Operation int8 `kafka:"min=v0,max=v3"` + PermissionType int8 `kafka:"min=v0,max=v3"` +} + +var _ protocol.BrokerMessage = (*Request)(nil) diff --git a/protocol/deleteacls/deleteacls_test.go b/protocol/deleteacls/deleteacls_test.go new file mode 100644 index 00000000..51f8dd20 --- /dev/null +++ b/protocol/deleteacls/deleteacls_test.go @@ -0,0 +1,165 @@ +package deleteacls_test + +import ( + "testing" + + "github.com/segmentio/kafka-go/protocol/deleteacls" + "github.com/segmentio/kafka-go/protocol/prototest" +) + +const ( + v0 = 0 + v1 = 1 + v2 = 2 + v3 = 3 +) + +func TestDeleteACLsRequest(t *testing.T) { + prototest.TestRequest(t, v0, &deleteacls.Request{ + Filters: []deleteacls.RequestFilter{ + { + ResourceTypeFilter: 2, + ResourceNameFilter: "fake-topic-for-alice", + PrincipalFilter: "User:alice", + HostFilter: "*", + Operation: 3, + PermissionType: 3, + }, + }, + }) + + prototest.TestRequest(t, v1, &deleteacls.Request{ + Filters: []deleteacls.RequestFilter{ + { + ResourceTypeFilter: 2, + ResourceNameFilter: "fake-topic-for-alice", + ResourcePatternTypeFilter: 0, + PrincipalFilter: "User:alice", + HostFilter: "*", + Operation: 3, + PermissionType: 3, + }, + }, + }) + + prototest.TestRequest(t, v2, &deleteacls.Request{ + Filters: []deleteacls.RequestFilter{ + { + ResourceTypeFilter: 2, + ResourceNameFilter: "fake-topic-for-alice", + ResourcePatternTypeFilter: 0, + PrincipalFilter: "User:alice", + HostFilter: "*", + Operation: 3, + PermissionType: 3, + }, + }, + }) + + prototest.TestRequest(t, v3, &deleteacls.Request{ + Filters: []deleteacls.RequestFilter{ + { + ResourceTypeFilter: 2, + ResourceNameFilter: "fake-topic-for-alice", + ResourcePatternTypeFilter: 0, + PrincipalFilter: "User:alice", + HostFilter: "*", + Operation: 3, + PermissionType: 3, + }, + }, + }) +} + +func TestDeleteACLsResponse(t *testing.T) { + prototest.TestResponse(t, v0, &deleteacls.Response{ + ThrottleTimeMs: 1, + FilterResults: []deleteacls.FilterResult{ + { + ErrorCode: 1, + ErrorMessage: "foo", + MatchingACLs: []deleteacls.MatchingACL{ + { + ErrorCode: 1, + ErrorMessage: "bar", + ResourceType: 2, + ResourceName: "fake-topic-for-alice", + Principal: "User:alice", + Host: "*", + Operation: 3, + PermissionType: 3, + }, + }, + }, + }, + }) + + prototest.TestResponse(t, v1, &deleteacls.Response{ + ThrottleTimeMs: 1, + FilterResults: []deleteacls.FilterResult{ + { + ErrorCode: 1, + ErrorMessage: "foo", + MatchingACLs: []deleteacls.MatchingACL{ + { + ErrorCode: 1, + ErrorMessage: "bar", + ResourceType: 2, + ResourceName: "fake-topic-for-alice", + ResourcePatternType: 0, + Principal: "User:alice", + Host: "*", + Operation: 3, + PermissionType: 3, + }, + }, + }, + }, + }) + + prototest.TestResponse(t, v2, &deleteacls.Response{ + ThrottleTimeMs: 1, + FilterResults: []deleteacls.FilterResult{ + { + ErrorCode: 1, + ErrorMessage: "foo", + MatchingACLs: []deleteacls.MatchingACL{ + { + ErrorCode: 1, + ErrorMessage: "bar", + ResourceType: 2, + ResourceName: "fake-topic-for-alice", + ResourcePatternType: 0, + Principal: "User:alice", + Host: "*", + Operation: 3, + PermissionType: 3, + }, + }, + }, + }, + }) + + prototest.TestResponse(t, v3, &deleteacls.Response{ + ThrottleTimeMs: 1, + FilterResults: []deleteacls.FilterResult{ + { + ErrorCode: 1, + ErrorMessage: "foo", + MatchingACLs: []deleteacls.MatchingACL{ + { + ErrorCode: 1, + ErrorMessage: "bar", + ResourceType: 2, + ResourceName: "fake-topic-for-alice", + ResourcePatternType: 0, + Principal: "User:alice", + Host: "*", + Operation: 3, + PermissionType: 3, + }, + }, + }, + }, + }) +}