Skip to content

Commit

Permalink
Merge pull request #10 from backmarket-oss/fix-statistics-tracked-leak
Browse files Browse the repository at this point in the history
Fix leak of tracked statistic events
  • Loading branch information
Izzette authored Sep 25, 2023
2 parents d7bda9d + 4802b50 commit fd7d9fe
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 62 deletions.
10 changes: 2 additions & 8 deletions cmd/kube-transition-metrics/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,12 @@ func main() {
log.Panic().Err(err).Msg("Failed to build kubernetes client")
}

initialSyncBlacklist, resourceVersion, err :=
statistics.CollectInitialPods(options, clientset)
if err != nil {
panic(err)
}

eventHandler := statistics.NewStatisticEventHandler(options, initialSyncBlacklist)
eventHandler := statistics.NewStatisticEventHandler(options)

go eventHandler.Run()

podCollector := statistics.NewPodCollector(eventHandler)
go podCollector.Run(clientset, resourceVersion)
go podCollector.Run(clientset)

http.Handle("/metrics", promhttp.Handler())
handler := zerologhttp.NewHandler(http.DefaultServeMux)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE=
github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U=
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
Expand Down Expand Up @@ -73,6 +74,7 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/onsi/ginkgo/v2 v2.9.4 h1:xR7vG4IXt5RWx6FfIjyAtsoMAtnc3C/rFXBBd2AjZwE=
github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
8 changes: 3 additions & 5 deletions internal/prommetrics/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,9 @@ func (mc MonitoredChannel[T]) Publish(item T) {
Add(waitDuration.Seconds())
}

// Read reads from the underlying channel.
func (mc MonitoredChannel[T]) Read() (T, bool) {
item, ok := <-mc.c

return item, ok
// Channel returns the underlying channel, only use for chan receive!
func (mc MonitoredChannel[T]) Channel() chan T {
return mc.c
}

func (mc MonitoredChannel[T]) monitor() {
Expand Down
82 changes: 53 additions & 29 deletions internal/statistics/pod_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ package statistics
import (
"context"
"fmt"
"net/http"

"github.com/BackMarket-oss/kube-transition-metrics/internal/options"
"github.com/BackMarket-oss/kube-transition-metrics/internal/prommetrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog/log"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
watch_tools "k8s.io/client-go/tools/watch"
)

// PodCollector uses the Kubernetes Watch API to monitor for all changes on Pods
Expand All @@ -32,19 +36,18 @@ func NewPodCollector(
}
}

// CollectInitialPods generates a list of Pod UIDs currently existing on the
// collectInitialPods generates a list of Pod UIDs currently existing on the
// cluster. This is used to filter pre-existing Pods by the
// StatisticEventHandler to avoid generating inaccurate or incomplete metrics.
// It returns the list of Pod UIDs, the resource version for these UIDs, and an
// error if one occurred.
func CollectInitialPods(
options *options.Options,
func (w *PodCollector) collectInitialPods(
clientset *kubernetes.Clientset,
) ([]types.UID, string, error) {
timeOut := options.KubeWatchTimeout
timeOut := w.eh.options.KubeWatchTimeout
listOptions := metav1.ListOptions{
TimeoutSeconds: &timeOut,
Limit: options.KubeWatchMaxEvents,
Limit: w.eh.options.KubeWatchMaxEvents,
}

blacklistUids := make([]types.UID, 0)
Expand Down Expand Up @@ -170,21 +173,32 @@ func (w *PodCollector) handlePod(
return nil
}

func (w *PodCollector) getWatcher(
clientset *kubernetes.Clientset,
resourceVersion string,
) (*watch_tools.RetryWatcher, error) {
watcher, err := watch_tools.NewRetryWatcher(resourceVersion, &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
//nolint:wrapcheck
return clientset.CoreV1().Pods("").List(context.Background(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
//nolint:wrapcheck
return clientset.CoreV1().Pods("").Watch(context.Background(), options)
},
})
if err != nil {
return nil, fmt.Errorf("failed to create watcher: %w", err)
}

return watcher, nil
}

func (w *PodCollector) watch(
clientset *kubernetes.Clientset,
resourceVersion string,
) {
timeOut := w.eh.options.KubeWatchTimeout
sendInitialEvents := resourceVersion != ""
watchOps := metav1.ListOptions{
TimeoutSeconds: &timeOut,
SendInitialEvents: &sendInitialEvents,
Watch: true,
ResourceVersion: resourceVersion,
Limit: w.eh.options.KubeWatchMaxEvents,
}
watcher, err :=
clientset.CoreV1().Pods("").Watch(context.Background(), watchOps)
watcher, err := w.getWatcher(clientset, resourceVersion)
if err != nil {
log.Panic().Err(err).Msg("Error starting watcher.")
}
Expand All @@ -194,10 +208,21 @@ func (w *PodCollector) watch(
var pod *corev1.Pod
var isAPod bool
if event.Type == watch.Error {
log.Error().Msgf("Watch event error: %+v", event)
prommetrics.PodCollectorErrors.Inc()

break
// API error will not be wrapped and StatusError doesn't implement the
// nessesary interface.
//nolint:errorlint
apiStatus, ok := apierrors.FromObject(event.Object).(*apierrors.StatusError)
if ok && apiStatus.ErrStatus.Code == http.StatusGone {
// The resource version we were watching is too old.
log.Warn().Msg("Resource version too old, resetting watch.")

return
} else {
log.Error().Msgf("Watch event error: %+v", event)
}

continue // RetryWatcher will handle reconnection, so just continue
} else if pod, isAPod = event.Object.(*corev1.Pod); !isAPod {
log.Panic().Msgf("Watch event is not a Pod: %+v", event)
} else if event := w.handlePod(clientset, event.Type, pod); event != nil {
Expand All @@ -214,18 +239,17 @@ func (w *PodCollector) watch(
// StatisticEventHandler used to initialize the PodCollector. It is blocking and
// should be run in another goroutine to the StatisticEventHandler and other
// collectors.
func (w *PodCollector) Run(
clientset *kubernetes.Clientset,
resourceVersion string,
) {
func (w *PodCollector) Run(clientset *kubernetes.Clientset) {
for {
w.watch(clientset, resourceVersion)
resyncUIDs, resourceVersion, err := w.collectInitialPods(clientset)
if err != nil {
log.Panic().Err(err).Msg(
"Failed to resync after 410 Gone from kubernetes Watch API")
}

// Some leak in w.blacklistUids and w.statistics could happen, as Deleted
// events may be lost. This could be mitigated by performing another full List
// and checking for removed pod UIDs.
w.eh.resyncChan.Publish(resyncUIDs)
w.watch(clientset, resourceVersion)
log.Warn().Msg("Watch ended, restarting. Some events may be lost.")
prommetrics.PodCollectorRestarts.Inc()
resourceVersion = ""
}
}
73 changes: 53 additions & 20 deletions internal/statistics/statistic_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,23 @@ type statisticEvent interface {
type StatisticEventHandler struct {
options *options.Options
eventChan prommetrics.MonitoredChannel[statisticEvent]
resyncChan prommetrics.MonitoredChannel[[]types.UID]
blacklistUIDs []types.UID
statistics map[types.UID]*podStatistic
}

// NewStatisticEventHandler creates a new StatisticEventHandler which filters
// out events for the provided initialSyncBlacklist Pod UIDs.
func NewStatisticEventHandler(
options *options.Options,
initialSyncBlacklist []types.UID,
) *StatisticEventHandler {
func NewStatisticEventHandler(options *options.Options) *StatisticEventHandler {
return &StatisticEventHandler{
options: options,
eventChan: prommetrics.NewMonitoredChannel[statisticEvent](
"statistic_events", options.StatisticEventQueueLength),
blacklistUIDs: initialSyncBlacklist,
statistics: map[types.UID]*podStatistic{},
// Must not have queue as it ensures that new PodAdded statistic events aren't
// generated before resync is processed.
resyncChan: prommetrics.NewMonitoredChannel[[]types.UID](
"pod_resync", 0),
statistics: map[types.UID]*podStatistic{},
}
}

Expand Down Expand Up @@ -60,27 +61,59 @@ func (eh *StatisticEventHandler) getPodStatistic(uid types.UID) *podStatistic {
}
}

func (eh *StatisticEventHandler) handleEvent(event statisticEvent) {
uid := event.podUID()
if eh.isBlacklisted(uid) {
return
}

statistic := eh.getPodStatistic(uid)
if event.handle(statistic) {
delete(eh.statistics, uid)
}

prommetrics.PodsTracked.Set(float64(len(eh.statistics)))
prommetrics.EventsHandled.Inc()
}

func (eh *StatisticEventHandler) handleResync(resyncUIDs []types.UID) {
resyncUIDsSet := map[types.UID]interface{}{}
for _, resyncUID := range resyncUIDs {
resyncUIDsSet[resyncUID] = nil
}

for uid := range eh.statistics {
if _, ok := resyncUIDsSet[uid]; !ok {
delete(eh.statistics, uid)
}
}

eh.blacklistUIDs = []types.UID{}
for _, uid := range resyncUIDs {
if _, ok := eh.statistics[uid]; !ok {
eh.blacklistUIDs = append(eh.blacklistUIDs, uid)
}
}
}

// Run launches the statistic event handling loop. It is blocking and should be
// run in another goroutine to each of the collectors. It provides synchronous
// and ordered execution of statistic events.
func (eh *StatisticEventHandler) Run() {
for {
event, ok := eh.eventChan.Read()
if !ok {
break
}
select {
case event, ok := <-eh.eventChan.Channel():
if !ok {
break
}

uid := event.podUID()
if eh.isBlacklisted(uid) {
continue
}
eh.handleEvent(event)
case resyncUIDs, ok := <-eh.resyncChan.Channel():
if !ok {
break
}

statistic := eh.getPodStatistic(uid)
if event.handle(statistic) {
delete(eh.statistics, uid)
eh.handleResync(resyncUIDs)
}

prommetrics.PodsTracked.Set(float64(len(eh.statistics)))
prommetrics.EventsHandled.Inc()
}
}

0 comments on commit fd7d9fe

Please sign in to comment.