-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
⚠Only allow sources to start once #2686
base: main
Are you sure you want to change the base?
Conversation
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: inteon The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
0cebcae
to
bde0a5e
Compare
bde0a5e
to
6fcaa42
Compare
|
||
cs.once.Do(func() { | ||
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source | ||
go cs.syncLoop(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main issue that this PR solves: here, the context passed to the first call of Start is used to manage the goroutine that is used for all future calls to Start.
6fcaa42
to
a603db1
Compare
Signed-off-by: Tim Ramlot <[email protected]>
a603db1
to
01b8e5b
Compare
Signed-off-by: Tim Ramlot <[email protected]>
@inteon: The following test failed, say
Full PR test history. Your PR dashboard. Please help us cut down on flakes by linking to an open issue when you hit one in your PR. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
I'm really having some trouble understanding what the goal for users of controller-runtime is with this PR. What use case are we trying to solve? |
@sbueringer Below is a small example test to illustrate the issue at hand. On master (second controller does NOT receive any events): func TestMultiStart(t *testing.T) {
rootCtx := context.Background()
watchChan := make(chan event.GenericEvent, 10)
watch := source.Channel(watchChan, &handler.EnqueueRequestForObject{})
m, err := manager.New(&rest.Config{}, manager.Options{})
require.NoError(t, err)
// Controller 1 is started and stopped directly
{
c1, err := controller.New("controller-1", m, controller.Options{Reconciler: reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) {
t.Fail() // Reconcilder should not get called
return reconcile.Result{}, nil
})})
require.NoError(t, err)
require.NoError(t, c1.Watch(watch))
ctx1, cancel1 := context.WithCancel(rootCtx)
cancel1()
require.NoError(t, c1.Start(ctx1))
}
// Controller 2 is listening on the same channel
callCount := int32(0)
c2, err := controller.New("controller-2", m, controller.Options{Reconciler: reconcile.Func(func(ctx context.Context, o reconcile.Request) (reconcile.Result, error) {
t.Log("got event!", atomic.AddInt32(&callCount, 1))
return reconcile.Result{}, nil
})})
require.NoError(t, err)
require.NoError(t, c2.Watch(watch))
ctx2, stop := context.WithTimeout(rootCtx, time.Second*5)
defer stop()
done := make(chan struct{})
go func() {
defer close(done)
require.NoError(t, c2.Start(ctx2))
}()
// Start sending messages on the channel
watchChan <- event.GenericEvent{Object: &corev1.Pod{ObjectMeta: v1.ObjectMeta{Name: "Test1"}}}
watchChan <- event.GenericEvent{Object: &corev1.Pod{ObjectMeta: v1.ObjectMeta{Name: "Test2"}}}
watchChan <- event.GenericEvent{Object: &corev1.Pod{ObjectMeta: v1.ObjectMeta{Name: "Test3"}}}
<-done
// We waited 5 seconds, which should be enough to have at least received
// one of the events on the channel
require.GreaterOrEqual(t, callCount, int32(1))
} $ go test -count=5 -timeout 30s -run ^TestMultiStart$ sigs.k8s.io/controller-runtime
--- FAIL: TestMultiStart (5.00s)
aa_test.go:71:
Error Trace: /home/tramlot/projects/controller-runtime/aa_test.go:71
Error: "0" is not greater than or equal to "1"
Test: TestMultiStart
--- FAIL: TestMultiStart (5.00s)
aa_test.go:71:
Error Trace: /home/tramlot/projects/controller-runtime/aa_test.go:71
Error: "0" is not greater than or equal to "1"
Test: TestMultiStart
--- FAIL: TestMultiStart (5.00s)
aa_test.go:71:
Error Trace: /home/tramlot/projects/controller-runtime/aa_test.go:71
Error: "0" is not greater than or equal to "1"
Test: TestMultiStart
--- FAIL: TestMultiStart (5.00s)
aa_test.go:71:
Error Trace: /home/tramlot/projects/controller-runtime/aa_test.go:71
Error: "0" is not greater than or equal to "1"
Test: TestMultiStart
--- FAIL: TestMultiStart (5.00s)
aa_test.go:71:
Error Trace: /home/tramlot/projects/controller-runtime/aa_test.go:71
Error: "0" is not greater than or equal to "1"
Test: TestMultiStart
FAIL
FAIL sigs.k8s.io/controller-runtime 25.019s
FAIL With this PR (second controller DOES receive any events): func TestMultiStart(t *testing.T) {
rootCtx := context.Background()
watchChan := make(chan event.GenericEvent, 10)
broadcaster := source.NewChannelBroadcaster(watchChan)
watch1 := source.Channel(broadcaster, &handler.EnqueueRequestForObject{})
watch2 := source.Channel(broadcaster, &handler.EnqueueRequestForObject{})
m, err := manager.New(&rest.Config{}, manager.Options{})
require.NoError(t, err)
// Controller 1 is started and stopped directly
{
c1, err := controller.New("controller-1", m, controller.Options{Reconciler: reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) {
t.Fail() // Reconcilder should not get called
return reconcile.Result{}, nil
})})
require.NoError(t, err)
require.NoError(t, c1.Watch(watch1))
ctx1, cancel1 := context.WithCancel(rootCtx)
cancel1()
require.NoError(t, c1.Start(ctx1))
}
// Controller 2 is listening on the same channel
callCount := int32(0)
c2, err := controller.New("controller-2", m, controller.Options{Reconciler: reconcile.Func(func(ctx context.Context, o reconcile.Request) (reconcile.Result, error) {
t.Log("got event!", atomic.AddInt32(&callCount, 1))
return reconcile.Result{}, nil
})})
require.NoError(t, err)
require.NoError(t, c2.Watch(watch2))
ctx2, stop := context.WithTimeout(rootCtx, time.Second*5)
defer stop()
done := make(chan struct{})
go func() {
defer close(done)
require.NoError(t, c2.Start(ctx2))
}()
// Start sending messages on the channel
watchChan <- event.GenericEvent{Object: &corev1.Pod{ObjectMeta: v1.ObjectMeta{Name: "Test1"}}}
watchChan <- event.GenericEvent{Object: &corev1.Pod{ObjectMeta: v1.ObjectMeta{Name: "Test2"}}}
watchChan <- event.GenericEvent{Object: &corev1.Pod{ObjectMeta: v1.ObjectMeta{Name: "Test3"}}}
<-done
// We waited 5 seconds, which should be enough to have at least received
// one of the events on the channel
require.GreaterOrEqual(t, callCount, int32(1))
} $ go test -count=5 -timeout 30s -run ^TestMultiStart$ sigs.k8s.io/controller-runtime
ok sigs.k8s.io/controller-runtime 25.017s |
I get the point that calling Start twice on the same Channel is problematic because of the strange behavior on cancel. I'm not sure if that necessarily means that we should have the channelBroadcaster implementation within controller-runtime to support having multiple Sources with the same chan. |
We can also say that 1 channel source = 1 channel. |
I'm not sure if that is/was an intended feature or it just mostly worked (minus the cancel behavior of the context). Let's see what @alvaroaleman thinks. |
I don't know either if this was intentional, but the cancel behavior means that this never worked correctly. This change means that the UX of |
PR needs rebase. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
The Kubernetes project currently lacks enough contributors to adequately respond to all PRs. This bot triages PRs according to the following rules:
You can:
Please send feedback to sig-contributor-experience at kubernetes/community. /lifecycle stale |
The Kubernetes project currently lacks enough active contributors to adequately respond to all PRs. This bot triages PRs according to the following rules:
You can:
Please send feedback to sig-contributor-experience at kubernetes/community. /lifecycle rotten |
Currently, only the Informer and Channel source can be started more than once (Start function can be called twice on same object).
I do believe however that this pattern is flawed, each source should only be started once, so that the context passed to that Start function is only used to stop that one instance from running and all the properties in the source belong to that single instance. The Channel source, for example, currently stops all sources that were started from the same instance when the first context passed to Start gets canceled (see #2686 (comment)).
For the Channel source, allowing the source to start only once, requires the API to be altered, since we want to make it possible to listen to the same channel using multiple Channel sources (matching the current behavior). This requires a new Channel Broadcaster to be introduced.