Skip to content

Commit

Permalink
fix: main dispatcher not terminate cause messsage stream leak. (#40061)
Browse files Browse the repository at this point in the history
Main dispatcher will leak when we remove solo dispatcher in the end. 
#40046

Signed-off-by: aoiasd <[email protected]>
  • Loading branch information
aoiasd authored Feb 27, 2025
1 parent 3b7b7e7 commit d67d6b7
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 8 deletions.
37 changes: 37 additions & 0 deletions pkg/mq/msgdispatcher/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,40 @@ func TestClient_Concurrency(t *testing.T) {
n := c.managers.Len()
assert.Equal(t, expected, n)
}

func TestClientMainDispatcherLeak(t *testing.T) {
client := NewClient(newMockFactory(), typeutil.ProxyRole, 1)
assert.NotNil(t, client)
pchannel := "mock_vchannel_0"

vchannel1 := fmt.Sprintf("%s_abc_v0", pchannel) //"mock_vchannel_0_abc_v0"
vchannel2 := fmt.Sprintf("%s_abc_v1", pchannel) //"mock_vchannel_0_abc_v0"
_, err := client.Register(context.Background(), NewStreamConfig(vchannel1, nil, common.SubscriptionPositionUnknown))
assert.NoError(t, err)

_, err = client.Register(context.Background(), NewStreamConfig(vchannel2, nil, common.SubscriptionPositionUnknown))
assert.NoError(t, err)

client.Deregister(vchannel2)
client.Deregister(vchannel1)

assert.NotPanics(
t, func() {
_, err = client.Register(context.Background(), NewStreamConfig(vchannel1, nil, common.SubscriptionPositionUnknown))
assert.NoError(t, err)
_, err = client.Register(context.Background(), NewStreamConfig(vchannel2, nil, common.SubscriptionPositionUnknown))
assert.NoError(t, err)
},
)

client.Deregister(vchannel1)
client.Deregister(vchannel2)
assert.NotPanics(
t, func() {
_, err = client.Register(context.Background(), NewStreamConfig(vchannel1, nil, common.SubscriptionPositionUnknown))
assert.NoError(t, err)
_, err = client.Register(context.Background(), NewStreamConfig(vchannel2, nil, common.SubscriptionPositionUnknown))
assert.NoError(t, err)
},
)
}
18 changes: 10 additions & 8 deletions pkg/mq/msgdispatcher/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,24 +126,26 @@ func (c *dispatcherManager) Remove(vchannel string) {
zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel))
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.soloDispatchers[vchannel]; ok {
c.soloDispatchers[vchannel].Handle(terminate)
c.soloDispatchers[vchannel].CloseTarget(vchannel)
delete(c.soloDispatchers, vchannel)
c.deleteMetric(vchannel)
log.Info("remove soloDispatcher done")
}
c.lagTargets.GetAndRemove(vchannel)

if c.mainDispatcher != nil {
c.mainDispatcher.Handle(pause)
c.mainDispatcher.CloseTarget(vchannel)
if c.mainDispatcher.TargetNum() == 0 && len(c.soloDispatchers) == 0 {
c.mainDispatcher.Handle(terminate)
c.mainDispatcher = nil
log.Info("remove mainDispatcher done")
} else {
c.mainDispatcher.Handle(resume)
}
}
if _, ok := c.soloDispatchers[vchannel]; ok {
c.soloDispatchers[vchannel].Handle(terminate)
c.soloDispatchers[vchannel].CloseTarget(vchannel)
delete(c.soloDispatchers, vchannel)
c.deleteMetric(vchannel)
log.Info("remove soloDispatcher done")
}
c.lagTargets.GetAndRemove(vchannel)
}

func (c *dispatcherManager) NumTarget() int {
Expand Down

0 comments on commit d67d6b7

Please sign in to comment.