diff --git a/dataclients/kubernetes/clusterclient.go b/dataclients/kubernetes/clusterclient.go index f476f7ec26..5ca1e3a6cb 100644 --- a/dataclients/kubernetes/clusterclient.go +++ b/dataclients/kubernetes/clusterclient.go @@ -519,16 +519,13 @@ func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skippe // we should delete it, because of eventual consistency // it is actually terminating delete(resEps, address) - } else if ep.Conditions == nil { + } else if ep.Conditions == nil || ep.isReady() { // if conditions are nil then we need to treat is as ready resEps[address] = &skipperEndpoint{ - Address: address, - Zone: ep.Zone, - } - } else if ep.isReady() { - resEps[address] = &skipperEndpoint{ - Address: address, - Zone: ep.Zone, + Address: address, + Zone: ep.Zone, + NodeName: ep.NodeName, + TargetRef: ep.TargetRef, } } } diff --git a/dataclients/kubernetes/clusterstate_test.go b/dataclients/kubernetes/clusterstate_test.go index f7c55e96a8..69cc8bae34 100644 --- a/dataclients/kubernetes/clusterstate_test.go +++ b/dataclients/kubernetes/clusterstate_test.go @@ -23,17 +23,17 @@ func benchmarkCachedEndpoints(b *testing.B, n int) { Subsets: []*subset{ { Addresses: []*address{ - {"192.168.0.1", "node1"}, - {"192.168.0.2", "node2"}, - {"192.168.0.3", "node3"}, - {"192.168.0.4", "node4"}, - {"192.168.0.5", "node5"}, - {"192.168.0.6", "node6"}, - {"192.168.0.7", "node7"}, - {"192.168.0.8", "node8"}, - {"192.168.0.9", "node9"}, - {"192.168.0.10", "node10"}, - {"192.168.0.11", "node11"}, + {IP: "192.168.0.1", NodeName: "node1"}, + {IP: "192.168.0.2", NodeName: "node2"}, + {IP: "192.168.0.3", NodeName: "node3"}, + {IP: "192.168.0.4", NodeName: "node4"}, + {IP: "192.168.0.5", NodeName: "node5"}, + {IP: "192.168.0.6", NodeName: "node6"}, + {IP: "192.168.0.7", NodeName: "node7"}, + {IP: "192.168.0.8", NodeName: "node8"}, + {IP: "192.168.0.9", NodeName: "node9"}, + {IP: "192.168.0.10", NodeName: "node10"}, + {IP: "192.168.0.11", NodeName: "node11"}, }, Ports: []*port{ {"ssh", 22, "TCP"}, diff --git a/dataclients/kubernetes/endpoints.go b/dataclients/kubernetes/endpoints.go index b293b41fed..ed64870b5f 100644 --- a/dataclients/kubernetes/endpoints.go +++ b/dataclients/kubernetes/endpoints.go @@ -98,8 +98,9 @@ type subset struct { } type address struct { - IP string `json:"ip"` - Node string `json:"nodeName"` + IP string `json:"ip"` + NodeName string `json:"nodeName"` + TargetRef *objectReference `json:"targetRef"` } type port struct { diff --git a/dataclients/kubernetes/endpointslices.go b/dataclients/kubernetes/endpointslices.go index 18e460fb87..6e2d41c2af 100644 --- a/dataclients/kubernetes/endpointslices.go +++ b/dataclients/kubernetes/endpointslices.go @@ -16,8 +16,10 @@ type skipperEndpointSlice struct { // Conditions have to be evaluated before creation type skipperEndpoint struct { - Address string - Zone string + Address string + Zone string + NodeName string + TargetRef *objectReference } func (eps *skipperEndpointSlice) getPort(protocol, pName string, pValue int) int { @@ -125,6 +127,10 @@ type EndpointSliceEndpoints struct { // https://kubernetes.io/docs/concepts/services-networking/topology-aware-routing/#safeguards // Zone aware routing will be available if https://github.com/zalando/skipper/issues/1446 is closed. Zone string `json:"zone"` // "eu-central-1c" + // Node hosting this endpoint. This can be used to determine endpoints local to a node. + NodeName string `json:"nodeName"` + // TargetRef is a reference to a Kubernetes object that represents this endpoint. + TargetRef *objectReference `json:"targetRef"` } type endpointsliceCondition struct { diff --git a/dataclients/kubernetes/kube.go b/dataclients/kubernetes/kube.go index 3500c71de0..0e3db7e16d 100644 --- a/dataclients/kubernetes/kube.go +++ b/dataclients/kubernetes/kube.go @@ -18,7 +18,11 @@ import ( "github.com/zalando/skipper/secrets/certregistry" ) -const DefaultLoadBalancerAlgorithm = "roundRobin" +const ( + DefaultLoadBalancerAlgorithm = "roundRobin" + MetadataRouteID = "kube__metadata" + EnableMetadataRoute = true // TODO: flag +) const ( defaultIngressClass = "skipper" @@ -437,6 +441,10 @@ func (c *Client) loadAndConvert() ([]*eskip.Route, error) { r = append(r, globalRedirectRoute(c.httpsRedirectCode)) } + if EnableMetadataRoute { + r = append(r, metadataRoute(state)) + } + return r, nil } diff --git a/dataclients/kubernetes/kube_test.go b/dataclients/kubernetes/kube_test.go index c11a8d15a2..4aba035448 100644 --- a/dataclients/kubernetes/kube_test.go +++ b/dataclients/kubernetes/kube_test.go @@ -86,8 +86,8 @@ func testEndpoints(namespace, name string, labels map[string]string, base string for i := 0; i < n; i++ { adr := &address{ - IP: fmt.Sprintf("%s.%d", base, i), - Node: fmt.Sprintf("node-%d", i), + IP: fmt.Sprintf("%s.%d", base, i), + NodeName: fmt.Sprintf("node-%d", i), } s.Addresses = append(s.Addresses, adr) } diff --git a/dataclients/kubernetes/metadataroute.go b/dataclients/kubernetes/metadataroute.go new file mode 100644 index 0000000000..cf2f0f0eb4 --- /dev/null +++ b/dataclients/kubernetes/metadataroute.go @@ -0,0 +1,187 @@ +package kubernetes + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "net" + + log "github.com/sirupsen/logrus" + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/predicates" + "github.com/zalando/skipper/routing" + + snet "github.com/zalando/skipper/net" +) + +type MetadataPreProcessorOptions struct { + EndpointRegistry *routing.EndpointRegistry +} + +type metadataPreProcessor struct { + options MetadataPreProcessorOptions +} + +type kubeRouteMetadata struct { + Addresses map[string]*kubeRouteMetadataAddress `json:"addresses"` +} + +type kubeRouteMetadataAddress struct { + Zone string `json:"zone,omitempty"` + NodeName string `json:"nodeName,omitempty"` + PodName string `json:"podName,omitempty"` +} + +// NewMetadataPreProcessor creates pre-processor for metadata route. +func NewMetadataPreProcessor(options MetadataPreProcessorOptions) routing.PreProcessor { + return &metadataPreProcessor{options: options} +} + +func (pp *metadataPreProcessor) Do(routes []*eskip.Route) []*eskip.Route { + var metadataRoute *eskip.Route + filtered := make([]*eskip.Route, 0, len(routes)) + + for _, r := range routes { + if r.Id == MetadataRouteID { + if metadataRoute == nil { + metadataRoute = r + } else { + log.Errorf("Found multiple metadata routes, using the first one") + } + } else { + filtered = append(filtered, r) + } + } + + if metadataRoute == nil { + log.Errorf("Metadata route not found") + return routes + } + + metadata, err := decodeMetadata(metadataRoute) + if err != nil { + log.Errorf("Failed to decode metadata route: %v", err) + return filtered + } + + for _, r := range filtered { + if r.BackendType == eskip.NetworkBackend { + pp.addMetadata(metadata, r.Backend) + } else if r.BackendType == eskip.LBBackend { + for _, ep := range r.LBEndpoints { + pp.addMetadata(metadata, ep) + } + } + } + return filtered +} + +// metadataRoute creates a route with [MetadataRouteID] id that matches no requests and +// contains metadata for each endpoint address used by Ingresses and RouteGroups. +func metadataRoute(s *clusterState) *eskip.Route { + metadata := kubeRouteMetadata{ + Addresses: make(map[string]*kubeRouteMetadataAddress), + } + + for id := range s.cachedEndpoints { + if s.enableEndpointSlices { + if eps, ok := s.endpointSlices[id.ResourceID]; ok { + for _, ep := range eps.Endpoints { + metadata.Addresses[ep.Address] = &kubeRouteMetadataAddress{ + Zone: ep.Zone, + NodeName: ep.NodeName, + PodName: ep.TargetRef.getPodName(), + } + } + } + } else { + if ep, ok := s.endpoints[id.ResourceID]; ok { + for _, subset := range ep.Subsets { + for _, addr := range subset.Addresses { + metadata.Addresses[addr.IP] = &kubeRouteMetadataAddress{ + // Endpoints do not provide zone + NodeName: addr.NodeName, + PodName: addr.TargetRef.getPodName(), + } + } + } + } + } + } + + return &eskip.Route{ + Id: MetadataRouteID, + Predicates: []*eskip.Predicate{{Name: predicates.FalseName}}, + BackendType: eskip.NetworkBackend, + Backend: encodeDataURI(&metadata), + } +} + +func decodeMetadata(r *eskip.Route) (map[string]*kubeRouteMetadataAddress, error) { + metadata, err := decodeDataURI(r.Backend) + if err != nil { + return nil, err + } + return metadata.Addresses, nil +} + +const dataUriPrefix = "data:application/json;base64," + +// encodeDataURI encodes metadata into data URI. +// Note that map keys are sorted and used as JSON object keys +// therefore encodeDataURI produces the same output for the same input. +// See https://datatracker.ietf.org/doc/html/rfc2397 +func encodeDataURI(metadata *kubeRouteMetadata) string { + data, _ := json.Marshal(&metadata) + + buf := make([]byte, len(dataUriPrefix)+base64.StdEncoding.EncodedLen(len(data))) + + copy(buf, dataUriPrefix) + base64.StdEncoding.Encode(buf[len(dataUriPrefix):], data) + + return string(buf) +} + +// encodeDataURI encodes metadata into data URI. +// See https://datatracker.ietf.org/doc/html/rfc2397 +func decodeDataURI(uri string) (*kubeRouteMetadata, error) { + var metadata kubeRouteMetadata + + data, err := base64.StdEncoding.DecodeString(uri[len(dataUriPrefix):]) + if err != nil { + return nil, fmt.Errorf("failed to decode base64: %w", err) + } + + if err := json.Unmarshal(data, &metadata); err != nil { + return nil, fmt.Errorf("failed to decode json: %w", err) + } + return &metadata, nil +} + +func (pp *metadataPreProcessor) addMetadata(metadata map[string]*kubeRouteMetadataAddress, endpoint string) { + _, hostPort, err := snet.SchemeHost(endpoint) + if err != nil { + return + } + + host, _, _ := net.SplitHostPort(hostPort) + if err != nil { + host = hostPort + } + + addr, ok := metadata[host] + if !ok { + return + } + + metrics := pp.options.EndpointRegistry.GetMetrics(hostPort) + setTag := func(name, value string) { + if value != "" { + metrics.SetTag(name, value) + } + } + + setTag("zone", addr.Zone) + setTag("nodeName", addr.NodeName) + setTag("podName", addr.PodName) +} diff --git a/dataclients/kubernetes/resources.go b/dataclients/kubernetes/resources.go index 3f35b27885..d509c2d5e7 100644 --- a/dataclients/kubernetes/resources.go +++ b/dataclients/kubernetes/resources.go @@ -24,3 +24,17 @@ type secret struct { type secretList struct { Items []*secret `json:"items"` } + +type objectReference struct { + Kind string `json:"kind"` + Name string `json:"name"` + Namespace string `json:"namespace"` + Uid string `json:"uid"` +} + +func (r *objectReference) getPodName() string { + if r != nil && r.Kind == "Pod" { + return r.Name + } + return "" +} diff --git a/proxy/proxy.go b/proxy/proxy.go index 45b19812be..0db7cf7045 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -45,10 +45,10 @@ import ( ) const ( - proxyBufferSize = 8192 - unknownRouteID = "_unknownroute_" - unknownRouteBackendType = "" - unknownRouteBackend = "" + proxyBufferSize = 8192 + unknownRouteID = "_unknownroute_" + unknownRouteBackend = "" + endpointMetricsKey = "proxy:endpointMetricsKey" // export for filters? // Number of loops allowed by default. DefaultMaxLoopbacks = 9 @@ -852,6 +852,8 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co if endpointMetrics != nil { endpointMetrics.IncInflightRequest() defer endpointMetrics.DecInflightRequest() + + ctx.stateBag[endpointMetricsKey] = endpointMetrics } if p.experimentalUpgrade && isUpgradeRequest(req) { @@ -881,6 +883,12 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co setTag(ctx.proxySpan, HTTPUrlTag, u.String()) p.setCommonSpanInfo(u, req, ctx.proxySpan) + if endpointMetrics != nil { + if podName := endpointMetrics.Tag("podName"); podName != "" { + p.tracing.setTag(ctx.proxySpan, ServiceInstanceIdTag, podName) + } + } + carrier := ot.HTTPHeadersCarrier(req.Header) _ = p.tracing.tracer.Inject(ctx.proxySpan.Context(), ot.HTTPHeaders, carrier) @@ -1260,12 +1268,19 @@ func (p *Proxy) errorResponse(ctx *context, err error) { flowIdLog = fmt.Sprintf(", flow id %s", flowId) } id := unknownRouteID - backendType := unknownRouteBackendType backend := unknownRouteBackend if ctx.route != nil { id = ctx.route.Id - backendType = ctx.route.BackendType.String() - backend = fmt.Sprintf("%s://%s", ctx.request.URL.Scheme, ctx.request.URL.Host) + + backend = fmt.Sprintf("%s %s://%s", ctx.route.BackendType.String(), ctx.request.URL.Scheme, ctx.request.URL.Host) + if m, ok := ctx.stateBag[endpointMetricsKey].(routing.Metrics); ok { + if podName := m.Tag("podName"); podName != "" { + backend += " " + podName + } + if nodeName := m.Tag("nodeName"); nodeName != "" { + backend += " " + nodeName + } + } } if err == errRouteLookupFailed { @@ -1308,11 +1323,10 @@ func (p *Proxy) errorResponse(ctx *context, err error) { uri = uri[:i] } logFunc( - `%s after %v, route %s with backend %s %s%s, status code %d: %v, remote host: %s, request: "%s %s %s", host: %s, user agent: "%s"`, + `%s after %v, route %s with backend %s%s, status code %d: %v, remote host: %s, request: "%s %s %s", host: %s, user agent: "%s"`, msgPrefix, time.Since(ctx.startServe), id, - backendType, backend, flowIdLog, ctx.response.StatusCode, diff --git a/proxy/tracing.go b/proxy/tracing.go index 66b481c8f3..cb610733fb 100644 --- a/proxy/tracing.go +++ b/proxy/tracing.go @@ -21,6 +21,7 @@ const ( HTTPStatusCodeTag = "http.status_code" SkipperRouteIDTag = "skipper.route_id" SpanKindTag = "span.kind" + ServiceInstanceIdTag = "service.instance.id" ClientRequestCanceled = "canceled" SpanKindClient = "client" diff --git a/routing/endpointregistry.go b/routing/endpointregistry.go index f7b94315b1..dbfcf2d5b4 100644 --- a/routing/endpointregistry.go +++ b/routing/endpointregistry.go @@ -22,12 +22,16 @@ type Metrics interface { InflightRequests() int64 IncInflightRequest() DecInflightRequest() + + SetTag(name, value string) + Tag(name string) string } type entry struct { detected atomic.Value // time.Time lastSeen atomic.Value // time.Time inflightRequests atomic.Int64 + tags sync.Map // map[string]string } var _ Metrics = &entry{} @@ -60,6 +64,17 @@ func (e *entry) SetLastSeen(ts time.Time) { e.lastSeen.Store(ts) } +func (e *entry) SetTag(name string, value string) { + e.tags.Store(name, value) +} + +func (e *entry) Tag(name string) string { + if value, ok := e.tags.Load(name); ok { + return value.(string) + } + return "" +} + func newEntry() *entry { result := &entry{} result.SetDetected(time.Time{}) @@ -90,7 +105,6 @@ func (r *EndpointRegistry) Do(routes []*Route) []*Route { if epi.Metrics.DetectedTime().IsZero() { epi.Metrics.SetDetected(now) } - epi.Metrics.SetLastSeen(now) } } else if route.BackendType == eskip.NetworkBackend { diff --git a/skipper.go b/skipper.go index 3a6dc0344c..0c4a164cab 100644 --- a/skipper.go +++ b/skipper.go @@ -1922,6 +1922,11 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { ro.PostProcessors = append(ro.PostProcessors, failClosedRatelimitPostProcessor) } + if kubernetes.EnableMetadataRoute { + opts := kubernetes.MetadataPreProcessorOptions{EndpointRegistry: endpointRegistry} + ro.PreProcessors = append(ro.PreProcessors, kubernetes.NewMetadataPreProcessor(opts)) + } + if o.DefaultFilters != nil { ro.PreProcessors = append(ro.PreProcessors, o.DefaultFilters) }