From bc23185486cc36c82169c6b4d5d4d4dabd3b65f5 Mon Sep 17 00:00:00 2001 From: Thomas Hallgren Date: Thu, 6 Feb 2025 11:41:28 +0100 Subject: [PATCH] Safeguard concurrent access to agentpf.client with RWMutex. Signed-off-by: Thomas Hallgren --- pkg/client/agentpf/clients.go | 59 ++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/pkg/client/agentpf/clients.go b/pkg/client/agentpf/clients.go index 78310c6da8..17f46e79cd 100644 --- a/pkg/client/agentpf/clients.go +++ b/pkg/client/agentpf/clients.go @@ -31,7 +31,7 @@ type client struct { // cancelClient // cancelDialWatch // cli and cancelClient are both safe to use without a mutex once the ready channel is closed. - sync.Mutex + sync.RWMutex cli agent.AgentClient session *manager.SessionInfo info *manager.AgentPodInfo @@ -51,7 +51,7 @@ func (ac *client) String() string { return fmt.Sprintf("%s.%s:%d", ai.PodName, ai.Namespace, ai.ApiPort) } -func (ac *client) ensureConnect(ctx context.Context) error { +func (ac *client) ensureConnect(ctx context.Context) (err error) { ac.Lock() infant := ac.infant if infant { @@ -64,24 +64,29 @@ func (ac *client) ensureConnect(ctx context.Context) error { }) } select { - case err, ok := <-ac.ready: - if ok { - // Put error back on channel in case this Tunnel is used again before it's deleted. + case <-ctx.Done(): + err = ctx.Err() + case err = <-ac.ready: + if err != nil { + // Put error back on the channel in case this Tunnel is used again before it's deleted. ac.ready <- err - return err } - // ready channel is closed. We are ready to go. - case <-ctx.Done(): - return ctx.Err() } - return nil + return err } func (ac *client) Tunnel(ctx context.Context, opts ...grpc.CallOption) (tunnel.Client, error) { if err := ac.ensureConnect(ctx); err != nil { return nil, err } - tc, err := ac.cli.Tunnel(ctx, opts...) + ac.RLock() + cli := ac.cli + ac.RUnlock() + if cli == nil { + // Client was closed. + return nil, io.EOF + } + tc, err := cli.Tunnel(ctx, opts...) if err != nil { return nil, err } @@ -101,12 +106,10 @@ func (ac *client) connect(ctx context.Context, deleteMe func()) { var err error defer func() { - if err == nil { - close(ac.ready) - } else { + if err != nil { deleteMe() - ac.ready <- err } + ac.ready <- err }() var conn *grpc.ClientConn @@ -138,24 +141,24 @@ func (ac *client) connect(ctx context.Context, deleteMe func()) { } func (ac *client) dormant() bool { - ac.Lock() + ac.RLock() dormant := !(ac.infant || ac.cli == nil || ac.info.Intercepted) && atomic.LoadInt32(&ac.tunnelCount) == 0 - ac.Unlock() + ac.RUnlock() return dormant } func (ac *client) intercepted() bool { - ac.Lock() + ac.RLock() ret := ac.info.Intercepted - ac.Unlock() + ac.RUnlock() return ret } func (ac *client) cancel() bool { - ac.Lock() + ac.RLock() cc := ac.cancelClient cdw := ac.cancelDialWatch - ac.Unlock() + ac.RUnlock() didCancel := false if cc != nil { didCancel = true @@ -169,9 +172,9 @@ func (ac *client) cancel() bool { } func (ac *client) setIntercepted(ctx context.Context, k string, status bool) { - ac.Lock() + ac.RLock() aci := ac.info.Intercepted - ac.Unlock() + ac.RUnlock() if status { if aci { return @@ -188,9 +191,9 @@ func (ac *client) setIntercepted(ctx context.Context, k string, status bool) { // This agent is no longer intercepting. Stop the dial watcher dlog.Debugf(ctx, "Agent %s changed to not intercepted", k) - ac.Lock() + ac.RLock() cdw := ac.cancelDialWatch - ac.Unlock() + ac.RUnlock() if cdw != nil { cdw() } @@ -206,6 +209,12 @@ func (ac *client) startDialWatcher(ctx context.Context) error { } func (ac *client) startDialWatcherReady(ctx context.Context) error { + ac.RLock() + cli := ac.cli + ac.RUnlock() + if cli == nil { + return fmt.Errorf("agent connection closed") + } ctx, cancel := context.WithCancel(ctx) // Create the dial watcher