Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport release-1.31] Applier manager improvements #5171

Open
wants to merge 7 commits into
base: release-1.31
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 70 additions & 61 deletions pkg/applier/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package applier

import (
"context"
"errors"
"fmt"
"path"
"time"
Expand All @@ -29,6 +30,8 @@ import (
"github.com/k0sproject/k0s/pkg/constant"
kubeutil "github.com/k0sproject/k0s/pkg/kubernetes"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
)
Expand All @@ -38,20 +41,18 @@ type Manager struct {
K0sVars *config.CfgVars
KubeClientFactory kubeutil.ClientFactoryInterface

// client kubernetes.Interface
applier Applier
bundlePath string
cancelWatcher context.CancelFunc
log *logrus.Entry
stacks map[string]stack
bundleDir string
stop func(reason string)
log *logrus.Entry

LeaderElector leaderelector.Interface
}

var _ manager.Component = (*Manager)(nil)

type stack = struct {
context.CancelFunc
cancel context.CancelCauseFunc
stopped <-chan struct{}
*StackApplier
}

Expand All @@ -62,21 +63,21 @@ func (m *Manager) Init(ctx context.Context) error {
return fmt.Errorf("failed to create manifest bundle dir %s: %w", m.K0sVars.ManifestsDir, err)
}
m.log = logrus.WithField("component", constant.ApplierManagerComponentName)
m.stacks = make(map[string]stack)
m.bundlePath = m.K0sVars.ManifestsDir

m.applier = NewApplier(m.K0sVars.ManifestsDir, m.KubeClientFactory)
m.bundleDir = m.K0sVars.ManifestsDir

m.LeaderElector.AddAcquiredLeaseCallback(func() {
watcherCtx, cancel := context.WithCancel(ctx)
m.cancelWatcher = cancel
ctx, cancel := context.WithCancelCause(ctx)
stopped := make(chan struct{})

m.stop = func(reason string) { cancel(errors.New(reason)); <-stopped }
go func() {
_ = m.runWatchers(watcherCtx)
defer close(stopped)
wait.UntilWithContext(ctx, m.runWatchers, 1*time.Minute)
}()
})
m.LeaderElector.AddLostLeaseCallback(func() {
if m.cancelWatcher != nil {
m.cancelWatcher()
if m.stop != nil {
m.stop("lost leadership")
}
})

Expand All @@ -90,107 +91,115 @@ func (m *Manager) Start(_ context.Context) error {

// Stop stops the Manager
func (m *Manager) Stop() error {
if m.cancelWatcher != nil {
m.cancelWatcher()
if m.stop != nil {
m.stop("applier manager is stopping")
}
return nil
}

func (m *Manager) runWatchers(ctx context.Context) error {
log := logrus.WithField("component", constant.ApplierManagerComponentName)

func (m *Manager) runWatchers(ctx context.Context) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.WithError(err).Error("failed to create watcher")
return err
m.log.WithError(err).Error("Failed to create watcher")
return
}
defer watcher.Close()
defer func() {
if err := watcher.Close(); err != nil {
m.log.WithError(err).Error("Failed to close watcher")
}
}()

err = watcher.Add(m.bundlePath)
err = watcher.Add(m.bundleDir)
if err != nil {
log.Warnf("Failed to start watcher: %s", err.Error())
m.log.WithError(err).Error("Failed to watch bundle directory")
return
}

m.log.Info("Starting watch loop")

// Add all directories after the bundle dir has been added to the watcher.
// Doing it the other way round introduces a race condition when directories
// get created after the initial listing but before the watch starts.

dirs, err := dir.GetAll(m.bundlePath)
dirs, err := dir.GetAll(m.bundleDir)
if err != nil {
return err
m.log.WithError(err).Error("Failed to read bundle directory")
return
}

ctx, cancel := context.WithCancelCause(ctx)
stacks := make(map[string]stack, len(dirs))

for _, dir := range dirs {
m.createStack(ctx, path.Join(m.bundlePath, dir))
m.createStack(ctx, stacks, path.Join(m.bundleDir, dir))
}

for {
select {
case err, ok := <-watcher.Errors:
if !ok {
return err
}
case err := <-watcher.Errors:
m.log.WithError(err).Error("Watch error")
cancel(err)

log.Warnf("watch error: %s", err.Error())
case event, ok := <-watcher.Events:
if !ok {
return nil
}
case event := <-watcher.Events:
switch event.Op {
case fsnotify.Create:
if dir.IsDirectory(event.Name) {
m.createStack(ctx, event.Name)
m.createStack(ctx, stacks, event.Name)
}
case fsnotify.Remove:
m.removeStack(ctx, event.Name)
m.removeStack(ctx, stacks, event.Name)
}

case <-ctx.Done():
log.Info("manifest watcher done")
return nil
m.log.Infof("Watch loop done (%v)", context.Cause(ctx))
for _, stack := range stacks {
<-stack.stopped
}

return
}
}
}

func (m *Manager) createStack(ctx context.Context, name string) {
func (m *Manager) createStack(ctx context.Context, stacks map[string]stack, name string) {
// safeguard in case the fswatcher would trigger an event for an already existing stack
if _, ok := m.stacks[name]; ok {
if _, ok := stacks[name]; ok {
return
}

stackCtx, cancelStack := context.WithCancel(ctx)
stack := stack{cancelStack, NewStackApplier(name, m.KubeClientFactory)}
m.stacks[name] = stack
ctx, cancel := context.WithCancelCause(ctx)
stopped := make(chan struct{})

stack := stack{cancel, stopped, NewStackApplier(name, m.KubeClientFactory)}
stacks[name] = stack

go func() {
defer close(stopped)
log := m.log.WithField("stack", name)
for {

wait.UntilWithContext(ctx, func(ctx context.Context) {
log.Info("Running stack")
if err := stack.Run(stackCtx); err != nil {
if err := stack.Run(ctx); err != nil {
log.WithError(err).Error("Failed to run stack")
}
}, 1*time.Minute)

select {
case <-time.After(10 * time.Second):
continue
case <-stackCtx.Done():
log.Info("Stack done")
return
}
}
log.Infof("Stack done (%v)", context.Cause(ctx))
}()
}

func (m *Manager) removeStack(ctx context.Context, name string) {
stack, ok := m.stacks[name]
func (m *Manager) removeStack(ctx context.Context, stacks map[string]stack, name string) {
stack, ok := stacks[name]
if !ok {
m.log.
WithField("path", name).
Debug("attempted to remove non-existent stack, probably not a directory")
return
}

delete(m.stacks, name)
stack.CancelFunc()
delete(stacks, name)
stack.cancel(errors.New("stack removed"))
<-stack.stopped

log := m.log.WithField("stack", name)
if err := stack.DeleteStack(ctx); err != nil {
Expand Down
Loading
Loading