Skip to content

Commit

Permalink
Safeguard concurrent access to agentpf.client with RWMutex.
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Hallgren <[email protected]>
  • Loading branch information
thallgren committed Feb 6, 2025
1 parent 5884cfa commit bc23185
Showing 1 changed file with 34 additions and 25 deletions.
59 changes: 34 additions & 25 deletions pkg/client/agentpf/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type client struct {
// cancelClient
// cancelDialWatch
// cli and cancelClient are both safe to use without a mutex once the ready channel is closed.
sync.Mutex
sync.RWMutex
cli agent.AgentClient
session *manager.SessionInfo
info *manager.AgentPodInfo
Expand All @@ -51,7 +51,7 @@ func (ac *client) String() string {
return fmt.Sprintf("%s.%s:%d", ai.PodName, ai.Namespace, ai.ApiPort)
}

func (ac *client) ensureConnect(ctx context.Context) error {
func (ac *client) ensureConnect(ctx context.Context) (err error) {
ac.Lock()
infant := ac.infant
if infant {
Expand All @@ -64,24 +64,29 @@ func (ac *client) ensureConnect(ctx context.Context) error {
})
}
select {
case err, ok := <-ac.ready:
if ok {
// Put error back on channel in case this Tunnel is used again before it's deleted.
case <-ctx.Done():
err = ctx.Err()
case err = <-ac.ready:
if err != nil {
// Put error back on the channel in case this Tunnel is used again before it's deleted.
ac.ready <- err
return err
}
// ready channel is closed. We are ready to go.
case <-ctx.Done():
return ctx.Err()
}
return nil
return err
}

func (ac *client) Tunnel(ctx context.Context, opts ...grpc.CallOption) (tunnel.Client, error) {
if err := ac.ensureConnect(ctx); err != nil {
return nil, err
}
tc, err := ac.cli.Tunnel(ctx, opts...)
ac.RLock()
cli := ac.cli
ac.RUnlock()
if cli == nil {
// Client was closed.
return nil, io.EOF
}
tc, err := cli.Tunnel(ctx, opts...)
if err != nil {
return nil, err
}
Expand All @@ -101,12 +106,10 @@ func (ac *client) connect(ctx context.Context, deleteMe func()) {

var err error
defer func() {
if err == nil {
close(ac.ready)
} else {
if err != nil {
deleteMe()
ac.ready <- err
}
ac.ready <- err
}()

var conn *grpc.ClientConn
Expand Down Expand Up @@ -138,24 +141,24 @@ func (ac *client) connect(ctx context.Context, deleteMe func()) {
}

func (ac *client) dormant() bool {
ac.Lock()
ac.RLock()
dormant := !(ac.infant || ac.cli == nil || ac.info.Intercepted) && atomic.LoadInt32(&ac.tunnelCount) == 0
ac.Unlock()
ac.RUnlock()
return dormant
}

func (ac *client) intercepted() bool {
ac.Lock()
ac.RLock()
ret := ac.info.Intercepted
ac.Unlock()
ac.RUnlock()
return ret
}

func (ac *client) cancel() bool {
ac.Lock()
ac.RLock()
cc := ac.cancelClient
cdw := ac.cancelDialWatch
ac.Unlock()
ac.RUnlock()
didCancel := false
if cc != nil {
didCancel = true
Expand All @@ -169,9 +172,9 @@ func (ac *client) cancel() bool {
}

func (ac *client) setIntercepted(ctx context.Context, k string, status bool) {
ac.Lock()
ac.RLock()
aci := ac.info.Intercepted
ac.Unlock()
ac.RUnlock()
if status {
if aci {
return
Expand All @@ -188,9 +191,9 @@ func (ac *client) setIntercepted(ctx context.Context, k string, status bool) {

// This agent is no longer intercepting. Stop the dial watcher
dlog.Debugf(ctx, "Agent %s changed to not intercepted", k)
ac.Lock()
ac.RLock()
cdw := ac.cancelDialWatch
ac.Unlock()
ac.RUnlock()
if cdw != nil {
cdw()
}
Expand All @@ -206,6 +209,12 @@ func (ac *client) startDialWatcher(ctx context.Context) error {
}

func (ac *client) startDialWatcherReady(ctx context.Context) error {
ac.RLock()
cli := ac.cli
ac.RUnlock()
if cli == nil {
return fmt.Errorf("agent connection closed")
}
ctx, cancel := context.WithCancel(ctx)

// Create the dial watcher
Expand Down

0 comments on commit bc23185

Please sign in to comment.