Skip to content

Commit

Permalink
Safeguard concurrent access to agentpf.client with RWMutex.
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Hallgren <[email protected]>
  • Loading branch information
thallgren committed Feb 6, 2025
1 parent 5884cfa commit b5b36a6
Showing 1 changed file with 25 additions and 12 deletions.
37 changes: 25 additions & 12 deletions pkg/client/agentpf/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type client struct {
// cancelClient
// cancelDialWatch
// cli and cancelClient are both safe to use without a mutex once the ready channel is closed.
sync.Mutex
sync.RWMutex
cli agent.AgentClient
session *manager.SessionInfo
info *manager.AgentPodInfo
Expand Down Expand Up @@ -81,7 +81,14 @@ func (ac *client) Tunnel(ctx context.Context, opts ...grpc.CallOption) (tunnel.C
if err := ac.ensureConnect(ctx); err != nil {
return nil, err
}
tc, err := ac.cli.Tunnel(ctx, opts...)
ac.RLock()
cli := ac.cli
ac.RUnlock()
if cli == nil {
// Client was closed.
return nil, io.EOF
}
tc, err := cli.Tunnel(ctx, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -138,24 +145,24 @@ func (ac *client) connect(ctx context.Context, deleteMe func()) {
}

func (ac *client) dormant() bool {
ac.Lock()
ac.RLock()
dormant := !(ac.infant || ac.cli == nil || ac.info.Intercepted) && atomic.LoadInt32(&ac.tunnelCount) == 0
ac.Unlock()
ac.RUnlock()
return dormant
}

func (ac *client) intercepted() bool {
ac.Lock()
ac.RLock()
ret := ac.info.Intercepted
ac.Unlock()
ac.RUnlock()
return ret
}

func (ac *client) cancel() bool {
ac.Lock()
ac.RLock()
cc := ac.cancelClient
cdw := ac.cancelDialWatch
ac.Unlock()
ac.RUnlock()
didCancel := false
if cc != nil {
didCancel = true
Expand All @@ -169,9 +176,9 @@ func (ac *client) cancel() bool {
}

func (ac *client) setIntercepted(ctx context.Context, k string, status bool) {
ac.Lock()
ac.RLock()
aci := ac.info.Intercepted
ac.Unlock()
ac.RUnlock()
if status {
if aci {
return
Expand All @@ -188,9 +195,9 @@ func (ac *client) setIntercepted(ctx context.Context, k string, status bool) {

// This agent is no longer intercepting. Stop the dial watcher
dlog.Debugf(ctx, "Agent %s changed to not intercepted", k)
ac.Lock()
ac.RLock()
cdw := ac.cancelDialWatch
ac.Unlock()
ac.RUnlock()
if cdw != nil {
cdw()
}
Expand All @@ -206,6 +213,12 @@ func (ac *client) startDialWatcher(ctx context.Context) error {
}

func (ac *client) startDialWatcherReady(ctx context.Context) error {
ac.RLock()
cli := ac.cli
ac.RUnlock()
if cli == nil {
return fmt.Errorf("agent connection closed")
}
ctx, cancel := context.WithCancel(ctx)

// Create the dial watcher
Expand Down

0 comments on commit b5b36a6

Please sign in to comment.