Skip to content

Commit

Permalink
enhance: remove redundant resource key watch operation, just keep con…
Browse files Browse the repository at this point in the history
…sistency of wal (#40235)

issue: #38399
related PR: #39522

- Just implement exclusive broadcaster between broadcast message with
same resource key to keep same order in different wal.
- After simplify the broadcast model, original watch-based broadcast is
too complicated and redundant, remove it.

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Mar 3, 2025
1 parent 679a145 commit f47ab31
Show file tree
Hide file tree
Showing 40 changed files with 1,027 additions and 3,846 deletions.
1 change: 0 additions & 1 deletion internal/datacoord/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1445,7 +1445,6 @@ func TestImportV2(t *testing.T) {
wal := mock_streaming.NewMockWALAccesser(t)
b := mock_streaming.NewMockBroadcast(t)
wal.EXPECT().Broadcast().Return(b).Maybe()
b.EXPECT().BlockUntilResourceKeyAckOnce(mock.Anything, mock.Anything).Return(nil).Maybe()
streaming.SetWALForTest(wal)
defer streaming.RecoverWALForTest()

Expand Down
16 changes: 0 additions & 16 deletions internal/distributed/streaming/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,3 @@ func (b broadcast) Ack(ctx context.Context, req types.BroadcastAckRequest) error

return b.streamingCoordClient.Broadcast().Ack(ctx, req)
}

func (b broadcast) BlockUntilResourceKeyAckOnce(ctx context.Context, rk message.ResourceKey) error {
if !b.lifetime.Add(typeutil.LifetimeStateWorking) {
return ErrWALAccesserClosed
}
defer b.lifetime.Done()
return b.streamingCoordClient.Broadcast().BlockUntilEvent(ctx, message.NewResourceKeyAckOneBroadcastEvent(rk))
}

func (b broadcast) BlockUntilResourceKeyAckAll(ctx context.Context, rk message.ResourceKey) error {
if !b.lifetime.Add(typeutil.LifetimeStateWorking) {
return ErrWALAccesserClosed
}
defer b.lifetime.Done()
return b.streamingCoordClient.Broadcast().BlockUntilEvent(ctx, message.NewResourceKeyAckAllBroadcastEvent(rk))
}
12 changes: 3 additions & 9 deletions internal/distributed/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,20 +122,14 @@ type WALAccesser interface {
type Broadcast interface {
// Append of Broadcast sends a broadcast message to all target vchannels.
// Guarantees the atomicity written of the messages and eventual consistency.
// The resource-key bound at the message will be held until the message is acked at consumer.
// Once the resource-key is held, the append operation will be rejected.
// Use resource-key to make a sequential operation at same resource-key.
// The resource-key bound at the message will be held as a mutex until the message is broadcasted to all vchannels,
// so the other append operation with the same resource-key will be searialized with a deterministic order on every vchannel.
// The Append operation will be blocked until the message is consumed and acknowledged by the flusher at streamingnode.
Append(ctx context.Context, msg message.BroadcastMutableMessage) (*types.BroadcastAppendResult, error)

// Ack acknowledges a broadcast message at the specified vchannel.
// It must be called after the message is comsumed by the unique-consumer.
Ack(ctx context.Context, req types.BroadcastAckRequest) error

// BlockUntilResourceKeyAckOnce blocks until the resource-key-bind broadcast message is acked at any one vchannel.
BlockUntilResourceKeyAckOnce(ctx context.Context, rk message.ResourceKey) error

// BlockUntilResourceKeyAckOnce blocks until the resource-key-bind broadcast message is acked at all vchannel.
BlockUntilResourceKeyAckAll(ctx context.Context, rk message.ResourceKey) error
}

// Txn is the interface for writing transaction into the wal.
Expand Down
51 changes: 0 additions & 51 deletions internal/distributed/streaming/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/options"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
_ "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/pulsar"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
Expand All @@ -40,9 +39,6 @@ func TestStreamingBroadcast(t *testing.T) {
streaming.Init()
defer streaming.Release()

err := streaming.WAL().Broadcast().BlockUntilResourceKeyAckAll(context.Background(), message.NewCollectionNameResourceKey(collectionName))
assert.NoError(t, err)

msg, _ := message.NewCreateCollectionMessageBuilderV1().
WithHeader(&message.CreateCollectionMessageHeader{
CollectionId: 1,
Expand All @@ -69,53 +65,6 @@ func TestStreamingBroadcast(t *testing.T) {
assert.Error(t, err)
assert.True(t, status.AsStreamingError(err).IsResourceAcquired())
assert.Nil(t, resp2)

// resource key should be block until ack.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel()
err = streaming.WAL().Broadcast().BlockUntilResourceKeyAckAll(ctx, message.NewCollectionNameResourceKey(collectionName))
assert.ErrorIs(t, err, context.DeadlineExceeded)

ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel()
err = streaming.WAL().Broadcast().BlockUntilResourceKeyAckOnce(ctx, message.NewCollectionNameResourceKey(collectionName))
assert.ErrorIs(t, err, context.DeadlineExceeded)

err = streaming.WAL().Broadcast().Ack(context.Background(), types.BroadcastAckRequest{
BroadcastID: resp.BroadcastID,
VChannel: vChannels[0],
})
assert.NoError(t, err)

// all should be blocked
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel()
err = streaming.WAL().Broadcast().BlockUntilResourceKeyAckAll(ctx, message.NewCollectionNameResourceKey(collectionName))
assert.ErrorIs(t, err, context.DeadlineExceeded)

// once should be returned
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel()
err = streaming.WAL().Broadcast().BlockUntilResourceKeyAckOnce(ctx, message.NewCollectionNameResourceKey(collectionName))
assert.NoError(t, err)

err = streaming.WAL().Broadcast().Ack(context.Background(), types.BroadcastAckRequest{
BroadcastID: resp.BroadcastID,
VChannel: vChannels[1],
})
assert.NoError(t, err)

// all should be blocked
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel()
err = streaming.WAL().Broadcast().BlockUntilResourceKeyAckAll(ctx, message.NewCollectionNameResourceKey(collectionName))
assert.NoError(t, err)

// once should be returned
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel()
err = streaming.WAL().Broadcast().BlockUntilResourceKeyAckOnce(ctx, message.NewCollectionNameResourceKey(collectionName))
assert.NoError(t, err)
}

func TestStreamingProduce(t *testing.T) {
Expand Down
13 changes: 0 additions & 13 deletions internal/distributed/streaming/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func TestWAL(t *testing.T) {
}, nil
})
broadcastServce.EXPECT().Ack(mock.Anything, mock.Anything).Return(nil)
broadcastServce.EXPECT().BlockUntilEvent(mock.Anything, mock.Anything).Return(nil)
coordClient.EXPECT().Broadcast().Return(broadcastServce)
handler := mock_handler.NewMockHandlerClient(t)
handler.EXPECT().Close().Return()
Expand Down Expand Up @@ -139,12 +138,6 @@ func TestWAL(t *testing.T) {
err = w.Broadcast().Ack(ctx, types.BroadcastAckRequest{BroadcastID: 1, VChannel: vChannel1})
assert.NoError(t, err)

err = w.Broadcast().BlockUntilResourceKeyAckAll(ctx, message.NewCollectionNameResourceKey("r1"))
assert.NoError(t, err)

err = w.Broadcast().BlockUntilResourceKeyAckOnce(ctx, message.NewCollectionNameResourceKey("r2"))
assert.NoError(t, err)

w.Close()

w.GetLatestMVCCTimestampIfLocal(ctx, vChannel1)
Expand All @@ -158,12 +151,6 @@ func TestWAL(t *testing.T) {

err = w.Broadcast().Ack(ctx, types.BroadcastAckRequest{BroadcastID: 1, VChannel: vChannel1})
assert.Error(t, err)

err = w.Broadcast().BlockUntilResourceKeyAckAll(ctx, message.NewCollectionNameResourceKey("r1"))
assert.Error(t, err)

err = w.Broadcast().BlockUntilResourceKeyAckOnce(ctx, message.NewCollectionNameResourceKey("r2"))
assert.Error(t, err)
}

func newInsertMessage(vChannel string) message.MutableMessage {
Expand Down
94 changes: 0 additions & 94 deletions internal/mocks/distributed/mock_streaming/mock_Broadcast.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 0 additions & 49 deletions internal/mocks/streamingcoord/mock_client/mock_BroadcastService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 0 additions & 17 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/http"
"github.com/milvus-io/milvus/internal/proxy/connection"
"github.com/milvus-io/milvus/internal/types"
Expand All @@ -56,7 +55,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/proto/proxypb"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/util"
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/v2/util/crypto"
Expand Down Expand Up @@ -6573,21 +6571,6 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest)
return resp, nil
}

// Import is asynchronous consumed from the wal, so we need to wait for the wal to release the resource key.
// The job can be seen by the user after the resource key is acked once at any vchannel.
jobID, err := strconv.ParseInt(resp.GetJobID(), 10, 64)
if err != nil {
return &internalpb.ImportResponse{
Status: merr.Status(merr.WrapErrServiceInternal("invalid job ID")),
}, nil
}
resourceKey := message.NewImportJobIDResourceKey(jobID)
if err := streaming.WAL().Broadcast().BlockUntilResourceKeyAckOnce(ctx, resourceKey); err != nil {
log.Warn("failed to wait for resource key ack", zap.Error(err))
return &internalpb.ImportResponse{
Status: merr.Status(merr.WrapErrServiceInternal("failed to wait for resource key ack")),
}, nil
}
metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.SuccessLabel, req.GetDbName(), req.GetCollectionName()).Inc()
metrics.ProxyReqLatency.WithLabelValues(nodeID, method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return resp, nil
Expand Down
3 changes: 0 additions & 3 deletions internal/proxy/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1617,7 +1617,6 @@ func TestProxy_ImportV2(t *testing.T) {
wal := mock_streaming.NewMockWALAccesser(t)
b := mock_streaming.NewMockBroadcast(t)
wal.EXPECT().Broadcast().Return(b).Maybe()
b.EXPECT().BlockUntilResourceKeyAckOnce(mock.Anything, mock.Anything).Return(nil).Maybe()
b.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.BroadcastAppendResult{}, nil).Maybe()
streaming.SetWALForTest(wal)
defer streaming.RecoverWALForTest()
Expand Down Expand Up @@ -1649,7 +1648,6 @@ func TestProxy_ImportV2(t *testing.T) {
err = node.sched.Start()
assert.NoError(t, err)
chMgr := NewMockChannelsMgr(t)
chMgr.EXPECT().getChannels(mock.Anything).Return([]string{"p1"}, nil)
node.chMgr = chMgr

// no such collection
Expand Down Expand Up @@ -1686,7 +1684,6 @@ func TestProxy_ImportV2(t *testing.T) {
// set partition name and with partition key
chMgr = NewMockChannelsMgr(t)
chMgr.EXPECT().getVChannels(mock.Anything).Return([]string{"ch0"}, nil)
chMgr.EXPECT().getChannels(mock.Anything).Return([]string{"p1"}, nil)
node.chMgr = chMgr
rsp, err = node.ImportV2(ctx, &internalpb.ImportRequest{CollectionName: "aaa", PartitionName: "bbb"})
assert.NoError(t, err)
Expand Down
Loading

0 comments on commit f47ab31

Please sign in to comment.