From 346c47617465ff20db068e3daa1bd813134c4d47 Mon Sep 17 00:00:00 2001 From: Isabelle Erin COWAN-BERGMAN Date: Thu, 21 Sep 2023 11:25:56 +0200 Subject: [PATCH 1/5] Fix leak of tracked statistic events --- cmd/kube-transition-metrics/main.go | 10 +--- go.sum | 2 + internal/prommetrics/channel.go | 8 +-- internal/statistics/pod_collector.go | 77 ++++++++++++++++---------- internal/statistics/statistic_event.go | 73 +++++++++++++++++------- 5 files changed, 109 insertions(+), 61 deletions(-) diff --git a/cmd/kube-transition-metrics/main.go b/cmd/kube-transition-metrics/main.go index 4ab33dd..5ef682f 100644 --- a/cmd/kube-transition-metrics/main.go +++ b/cmd/kube-transition-metrics/main.go @@ -33,18 +33,12 @@ func main() { BuildConfigFromFlags("", kubeconfigPath) clientset, _ := kubernetes.NewForConfig(config) - initialSyncBlacklist, resourceVersion, err := - statistics.CollectInitialPods(options, clientset) - if err != nil { - panic(err) - } - - eventHandler := statistics.NewStatisticEventHandler(options, initialSyncBlacklist) + eventHandler := statistics.NewStatisticEventHandler(options) go eventHandler.Run() podCollector := statistics.NewPodCollector(eventHandler) - go podCollector.Run(clientset, resourceVersion) + go podCollector.Run(clientset) http.Handle("/metrics", promhttp.Handler()) handler := zerologhttp.NewHandler(http.DefaultServeMux) diff --git a/go.sum b/go.sum index 9aa0071..a65e89c 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE= github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -73,6 +74,7 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/onsi/ginkgo/v2 v2.9.4 h1:xR7vG4IXt5RWx6FfIjyAtsoMAtnc3C/rFXBBd2AjZwE= github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/internal/prommetrics/channel.go b/internal/prommetrics/channel.go index e8dc24d..eb43621 100644 --- a/internal/prommetrics/channel.go +++ b/internal/prommetrics/channel.go @@ -81,11 +81,9 @@ func (mc MonitoredChannel[T]) Publish(item T) { Add(waitDuration.Seconds()) } -// Read reads from the underlying channel. -func (mc MonitoredChannel[T]) Read() (T, bool) { - item, ok := <-mc.c - - return item, ok +// Channel returns the underlying channel, only use for chan receive! +func (mc MonitoredChannel[T]) Channel() chan T { + return mc.c } func (mc MonitoredChannel[T]) monitor() { diff --git a/internal/statistics/pod_collector.go b/internal/statistics/pod_collector.go index 0e0ac69..86a343b 100644 --- a/internal/statistics/pod_collector.go +++ b/internal/statistics/pod_collector.go @@ -3,16 +3,19 @@ package statistics import ( "context" "fmt" + "net/http" - "github.com/BackMarket-oss/kube-transition-metrics/internal/options" "github.com/BackMarket-oss/kube-transition-metrics/internal/prommetrics" "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog/log" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + watch_tools "k8s.io/client-go/tools/watch" ) // PodCollector uses the Kubernetes Watch API to monitor for all changes on Pods @@ -32,19 +35,18 @@ func NewPodCollector( } } -// CollectInitialPods generates a list of Pod UIDs currently existing on the +// collectInitialPods generates a list of Pod UIDs currently existing on the // cluster. This is used to filter pre-existing Pods by the // StatisticEventHandler to avoid generating inaccurate or incomplete metrics. // It returns the list of Pod UIDs, the resource version for these UIDs, and an // error if one occurred. -func CollectInitialPods( - options *options.Options, +func (w *PodCollector) collectInitialPods( clientset *kubernetes.Clientset, ) ([]types.UID, string, error) { - timeOut := options.KubeWatchTimeout + timeOut := w.eh.options.KubeWatchTimeout listOptions := metav1.ListOptions{ TimeoutSeconds: &timeOut, - Limit: options.KubeWatchMaxEvents, + Limit: w.eh.options.KubeWatchMaxEvents, } blacklistUids := make([]types.UID, 0) @@ -170,21 +172,32 @@ func (w *PodCollector) handlePod( return nil } +func (w *PodCollector) getWatcher( + clientset *kubernetes.Clientset, + resourceVersion string, +) (*watch_tools.RetryWatcher, error) { + watcher, err := watch_tools.NewRetryWatcher(resourceVersion, &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + //nolint:wrapcheck + return clientset.CoreV1().Pods("").List(context.Background(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + //nolint:wrapcheck + return clientset.CoreV1().Pods("").Watch(context.Background(), options) + }, + }) + if err != nil { + return nil, fmt.Errorf("failed to create watcher: %w", err) + } + + return watcher, nil +} + func (w *PodCollector) watch( clientset *kubernetes.Clientset, resourceVersion string, ) { - timeOut := w.eh.options.KubeWatchTimeout - sendInitialEvents := resourceVersion != "" - watchOps := metav1.ListOptions{ - TimeoutSeconds: &timeOut, - SendInitialEvents: &sendInitialEvents, - Watch: true, - ResourceVersion: resourceVersion, - Limit: w.eh.options.KubeWatchMaxEvents, - } - watcher, err := - clientset.CoreV1().Pods("").Watch(context.Background(), watchOps) + watcher, err := w.getWatcher(clientset, resourceVersion) if err != nil { log.Panic().Err(err).Msg("Error starting watcher.") } @@ -194,10 +207,19 @@ func (w *PodCollector) watch( var pod *corev1.Pod var isAPod bool if event.Type == watch.Error { - log.Error().Msgf("Watch event error: %+v", event) + apiStatus, ok := event.Object.(*metav1.Status) + if ok && apiStatus.Code == http.StatusGone { + // The resource version we were watching is too old. + log.Warn().Msg("Resource version too old, cleaning up and resetting watch.") + + return + } else { + // Handle other watch errors as you see fit + log.Error().Msgf("Watch event error: %+v", event) + } prommetrics.PodCollectorErrors.Inc() - break + continue // RetryWatcher will handle reconnection, so just continue } else if pod, isAPod = event.Object.(*corev1.Pod); !isAPod { log.Panic().Msgf("Watch event is not a Pod: %+v", event) } else if event := w.handlePod(clientset, event.Type, pod); event != nil { @@ -214,18 +236,17 @@ func (w *PodCollector) watch( // StatisticEventHandler used to initialize the PodCollector. It is blocking and // should be run in another goroutine to the StatisticEventHandler and other // collectors. -func (w *PodCollector) Run( - clientset *kubernetes.Clientset, - resourceVersion string, -) { +func (w *PodCollector) Run(clientset *kubernetes.Clientset) { for { - w.watch(clientset, resourceVersion) + resyncUIDs, resourceVersion, err := w.collectInitialPods(clientset) + if err != nil { + log.Panic().Err(err).Msg( + "Failed to resync after 410 Gone from kubernetes Watch API") + } - // Some leak in w.blacklistUids and w.statistics could happen, as Deleted - // events may be lost. This could be mitigated by performing another full List - // and checking for removed pod UIDs. + w.eh.resyncChan.Publish(resyncUIDs) + w.watch(clientset, resourceVersion) log.Warn().Msg("Watch ended, restarting. Some events may be lost.") prommetrics.PodCollectorRestarts.Inc() - resourceVersion = "" } } diff --git a/internal/statistics/statistic_event.go b/internal/statistics/statistic_event.go index 871930f..adce698 100644 --- a/internal/statistics/statistic_event.go +++ b/internal/statistics/statistic_event.go @@ -16,22 +16,23 @@ type statisticEvent interface { type StatisticEventHandler struct { options *options.Options eventChan prommetrics.MonitoredChannel[statisticEvent] + resyncChan prommetrics.MonitoredChannel[[]types.UID] blacklistUIDs []types.UID statistics map[types.UID]*podStatistic } // NewStatisticEventHandler creates a new StatisticEventHandler which filters // out events for the provided initialSyncBlacklist Pod UIDs. -func NewStatisticEventHandler( - options *options.Options, - initialSyncBlacklist []types.UID, -) *StatisticEventHandler { +func NewStatisticEventHandler(options *options.Options) *StatisticEventHandler { return &StatisticEventHandler{ options: options, eventChan: prommetrics.NewMonitoredChannel[statisticEvent]( "statistic_events", options.StatisticEventQueueLength), - blacklistUIDs: initialSyncBlacklist, - statistics: map[types.UID]*podStatistic{}, + // Must not have queue as it ensures that new PodAdded statistic events aren't + // generated before resync is processed. + resyncChan: prommetrics.NewMonitoredChannel[[]types.UID]( + "pod_resync", 0), + statistics: map[types.UID]*podStatistic{}, } } @@ -60,27 +61,59 @@ func (eh *StatisticEventHandler) getPodStatistic(uid types.UID) *podStatistic { } } +func (eh *StatisticEventHandler) handleEvent(event statisticEvent) { + uid := event.podUID() + if eh.isBlacklisted(uid) { + return + } + + statistic := eh.getPodStatistic(uid) + if event.handle(statistic) { + delete(eh.statistics, uid) + } + + prommetrics.PodsTracked.Set(float64(len(eh.statistics))) + prommetrics.EventsHandled.Inc() +} + +func (eh *StatisticEventHandler) handleResync(resyncUIDs []types.UID) { + resyncUIDsSet := map[types.UID]interface{}{} + for _, resyncUID := range resyncUIDs { + resyncUIDsSet[resyncUID] = nil + } + + for uid := range eh.statistics { + if _, ok := resyncUIDsSet[uid]; !ok { + delete(eh.statistics, uid) + } + } + + eh.blacklistUIDs = []types.UID{} + for _, uid := range resyncUIDs { + if _, ok := eh.statistics[uid]; !ok { + eh.blacklistUIDs = append(eh.blacklistUIDs, uid) + } + } +} + // Run launches the statistic event handling loop. It is blocking and should be // run in another goroutine to each of the collectors. It provides synchronous // and ordered execution of statistic events. func (eh *StatisticEventHandler) Run() { for { - event, ok := eh.eventChan.Read() - if !ok { - break - } + select { + case event, ok := <-eh.eventChan.Channel(): + if !ok { + break + } - uid := event.podUID() - if eh.isBlacklisted(uid) { - continue - } + eh.handleEvent(event) + case resyncUIDs, ok := <-eh.resyncChan.Channel(): + if !ok { + break + } - statistic := eh.getPodStatistic(uid) - if event.handle(statistic) { - delete(eh.statistics, uid) + eh.handleResync(resyncUIDs) } - - prommetrics.PodsTracked.Set(float64(len(eh.statistics))) - prommetrics.EventsHandled.Inc() } } From c39fe86ef1f649debcf79fa1754a31c7b60f4ca3 Mon Sep 17 00:00:00 2001 From: Isabelle COWAN-BERGMAN Date: Thu, 21 Sep 2023 14:05:27 +0200 Subject: [PATCH 2/5] Update pod_collector.go Signed-off-by: Isabelle COWAN-BERGMAN --- internal/statistics/pod_collector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/statistics/pod_collector.go b/internal/statistics/pod_collector.go index 86a343b..6d71503 100644 --- a/internal/statistics/pod_collector.go +++ b/internal/statistics/pod_collector.go @@ -207,6 +207,7 @@ func (w *PodCollector) watch( var pod *corev1.Pod var isAPod bool if event.Type == watch.Error { + prommetrics.PodCollectorErrors.Inc() apiStatus, ok := event.Object.(*metav1.Status) if ok && apiStatus.Code == http.StatusGone { // The resource version we were watching is too old. @@ -217,7 +218,6 @@ func (w *PodCollector) watch( // Handle other watch errors as you see fit log.Error().Msgf("Watch event error: %+v", event) } - prommetrics.PodCollectorErrors.Inc() continue // RetryWatcher will handle reconnection, so just continue } else if pod, isAPod = event.Object.(*corev1.Pod); !isAPod { From eece49411a62ca7ddfcbf8e7ff0cf552b163b80a Mon Sep 17 00:00:00 2001 From: Isabelle Erin COWAN-BERGMAN Date: Thu, 21 Sep 2023 15:08:33 +0200 Subject: [PATCH 3/5] Use apierrors pattern for http status --- internal/statistics/pod_collector.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/internal/statistics/pod_collector.go b/internal/statistics/pod_collector.go index 6d71503..5203845 100644 --- a/internal/statistics/pod_collector.go +++ b/internal/statistics/pod_collector.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog/log" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -208,14 +209,14 @@ func (w *PodCollector) watch( var isAPod bool if event.Type == watch.Error { prommetrics.PodCollectorErrors.Inc() - apiStatus, ok := event.Object.(*metav1.Status) - if ok && apiStatus.Code == http.StatusGone { + // API error will not be wrapped and StatusError doesn't implement the + // nessesary interface. + //nolint:errorlint + apiStatus, ok := apierrors.FromObject(event.Object).(*apierrors.StatusError) + if ok && apiStatus.ErrStatus.Code == http.StatusGone { // The resource version we were watching is too old. - log.Warn().Msg("Resource version too old, cleaning up and resetting watch.") - - return + log.Warn().Msg("Resource version too old, resetting watch.") } else { - // Handle other watch errors as you see fit log.Error().Msgf("Watch event error: %+v", event) } From d52e447aa8295444dff02602118cef6a7e573a61 Mon Sep 17 00:00:00 2001 From: Isabelle COWAN-BERGMAN Date: Thu, 21 Sep 2023 15:31:40 +0200 Subject: [PATCH 4/5] Update internal/statistics/pod_collector.go Signed-off-by: Isabelle COWAN-BERGMAN --- internal/statistics/pod_collector.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/statistics/pod_collector.go b/internal/statistics/pod_collector.go index 5203845..f6a4165 100644 --- a/internal/statistics/pod_collector.go +++ b/internal/statistics/pod_collector.go @@ -216,6 +216,8 @@ func (w *PodCollector) watch( if ok && apiStatus.ErrStatus.Code == http.StatusGone { // The resource version we were watching is too old. log.Warn().Msg("Resource version too old, resetting watch.") + + return } else { log.Error().Msgf("Watch event error: %+v", event) } From 4802b5003c00bfe971de6d0d7f029896f1541797 Mon Sep 17 00:00:00 2001 From: Isabelle Erin COWAN-BERGMAN Date: Fri, 22 Sep 2023 10:41:05 +0200 Subject: [PATCH 5/5] fix training ws --- internal/statistics/pod_collector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/statistics/pod_collector.go b/internal/statistics/pod_collector.go index f6a4165..cf7ec09 100644 --- a/internal/statistics/pod_collector.go +++ b/internal/statistics/pod_collector.go @@ -216,7 +216,7 @@ func (w *PodCollector) watch( if ok && apiStatus.ErrStatus.Code == http.StatusGone { // The resource version we were watching is too old. log.Warn().Msg("Resource version too old, resetting watch.") - + return } else { log.Error().Msgf("Watch event error: %+v", event)