Skip to content

Commit

Permalink
Merge pull request #1682 from k8s-infra-cherrypick-robot/cherry-pick-…
Browse files Browse the repository at this point in the history
…1678-to-release-0.10

 🌱 Source should retry to get informers until timeout expires
  • Loading branch information
k8s-ci-robot authored Oct 5, 2021
2 parents a4c56b0 + de32618 commit 78ce10e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 8 deletions.
33 changes: 26 additions & 7 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"errors"
"fmt"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand Down Expand Up @@ -119,17 +121,34 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
ctx, ks.startCancel = context.WithCancel(ctx)
ks.started = make(chan error)
go func() {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, err := ks.cache.GetInformer(ctx, ks.Type)
if err != nil {
kindMatchErr := &meta.NoKindMatchError{}
if errors.As(err, &kindMatchErr) {
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
var (
i cache.Informer
lastErr error
)

// Tries to get an informer until it returns true,
// an error or the specified context is cancelled or expired.
if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
if lastErr != nil {
kindMatchErr := &meta.NoKindMatchError{}
if errors.As(lastErr, &kindMatchErr) {
log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
}
return false, nil // Retry.
}
return true, nil
}); err != nil {
if lastErr != nil {
ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr)
return
}
ks.started <- err
return
}

i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
Expand Down
6 changes: 5 additions & 1 deletion pkg/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package source_test
import (
"context"
"fmt"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -218,13 +219,16 @@ var _ = Describe("Source", func() {
ic.Error = fmt.Errorf("test error")
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")

ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

instance := &source.Kind{
Type: &corev1.Pod{},
}
Expect(instance.InjectCache(ic)).To(Succeed())
err := instance.Start(ctx, handler.Funcs{}, q)
Expect(err).NotTo(HaveOccurred())
Expect(instance.WaitForSync(context.Background())).To(HaveOccurred())
Eventually(instance.WaitForSync(context.Background())).Should(HaveOccurred())
})
})
})
Expand Down

0 comments on commit 78ce10e

Please sign in to comment.