Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure proper controller shutdown if also starting #16

Merged
merged 1 commit into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 7 additions & 30 deletions pkg/leader/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/obot-platform/nah/pkg/log"
Expand Down Expand Up @@ -52,7 +50,7 @@ func NewElectionConfig(ttl time.Duration, namespace, name, lockType string, cfg
}
}

func (ec *ElectionConfig) Run(ctx context.Context, id string, onLeader OnLeader, onSwitchLeader OnNewLeader, signalDone chan struct{}) error {
func (ec *ElectionConfig) Run(ctx context.Context, id string, onLeader OnLeader, onSwitchLeader OnNewLeader, signalDone func()) error {
if ec == nil {
// Don't start leader election if there is no config.
return onLeader(ctx)
Expand All @@ -69,7 +67,7 @@ func (ec *ElectionConfig) Run(ctx context.Context, id string, onLeader OnLeader,
return nil
}

func (ec *ElectionConfig) run(ctx context.Context, id string, cb OnLeader, onSwitchLeader OnNewLeader, signalDone chan struct{}) error {
func (ec *ElectionConfig) run(ctx context.Context, id string, cb OnLeader, onSwitchLeader OnNewLeader, signalDone func()) error {
rl, err := resourcelock.NewFromKubeconfig(
ec.ResourceLockType,
ec.Namespace,
Expand All @@ -84,15 +82,6 @@ func (ec *ElectionConfig) run(ctx context.Context, id string, cb OnLeader, onSwi
return fmt.Errorf("error creating leader lock for %s: %v", ec.Name, err)
}

// Catch these signals to ensure a graceful shutdown and leader election release.
sigCtx, cancel := signal.NotifyContext(ctx, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)
defer func() {
if err != nil {
// If we encountered an error, cancel the context because we won't be using it.
cancel()
}
}()

le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: ec.TTL,
Expand All @@ -107,23 +96,11 @@ func (ec *ElectionConfig) run(ctx context.Context, id string, cb OnLeader, onSwi
OnNewLeader: onSwitchLeader,
OnStoppedLeading: func() {
select {
case <-sigCtx.Done():
// Must cancel so that the registered signals are no longer caught.
cancel()

// The context has been canceled or is otherwise complete.
// This is a request to terminate. Exit 0.
// Exiting cleanly is useful when the context is canceled
// so that Kubernetes doesn't record it exiting in error
// when the exit was requested. For example, the wrangler-cli
// package sets up a context that cancels when SIGTERM is
// sent in. If a node is shut down this is the type of signal
// sent. In that case you want the 0 exit code to mark it as
// complete so that everything comes back up correctly after
// a restart.
// The pattern found here can be found inside the kube-scheduler.
case <-ctx.Done():
log.Infof("requested to terminate, exiting")
close(signalDone)
if signalDone != nil {
signalDone()
}
default:
log.Fatalf("leader election lost for %s", ec.Name)
}
Expand All @@ -136,7 +113,7 @@ func (ec *ElectionConfig) run(ctx context.Context, id string, cb OnLeader, onSwi
}

go func() {
le.Run(sigCtx)
le.Run(ctx)
}()
return nil
}
26 changes: 25 additions & 1 deletion pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ func New(handlerSet *HandlerSet, electionConfig *leader.ElectionConfig, healthzP
}

func (r *Router) Stopped() <-chan struct{} {
// Hold the start lock to ensure we aren't starting and stopping at the same time.
r.startLock.Lock()
defer r.startLock.Unlock()

if r.signalStopped == nil {
c := make(chan struct{})
close(c)
return c
}
return r.signalStopped
}

Expand Down Expand Up @@ -220,14 +229,29 @@ func (r *Router) Start(ctx context.Context) error {
// Failed to preload caches, panic
log.Fatalf("failed to preload caches: %v", err)
}
}, r.signalStopped)
}, r.done)
}

// done is a callback used by leader election to signal that the controllers are shut down.
func (r *Router) done() {
// Hold the start lock to ensure we aren't starting and stopping at the same time.
r.startLock.Lock()
defer r.startLock.Unlock()

if r.signalStopped != nil {
close(r.signalStopped)
}
}

// startHandlers gets called when we become the leader or if there is no leader election.
func (r *Router) startHandlers(ctx context.Context) error {
r.startLock.Lock()
defer r.startLock.Unlock()

if r.signalStopped == nil {
r.signalStopped = make(chan struct{})
}

var err error
// This is the leader now, so not ready until the controller is started and caches are ready.
setHealthy(r.name, false)
Expand Down