Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
enhance: erase the rpc level when wal is located at same node
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <chyezh@outlook.com>
chyezh committed Jan 23, 2025
1 parent 1adde1f commit 9060f98
Showing 41 changed files with 736 additions and 664 deletions.
Original file line number Diff line number Diff line change
@@ -74,11 +74,12 @@ func (rc *resumableConsumerImpl) resumeLoop() {
// consumer need to resume when error occur, so message handler shouldn't close if the internal consumer encounter failure.
nopCloseMH := nopCloseHandler{
Handler: rc.mh,
HandleInterceptor: func(ctx context.Context, msg message.ImmutableMessage, handle handleFunc) (bool, error) {
g := rc.metrics.StartConsume(msg.EstimateSize())
ok, err := handle(ctx, msg)
g.Finish()
return ok, err
HandleInterceptor: func(handleParam message.HandleParam, h message.Handler) message.HandleResult {
if handleParam.Message != nil {
g := rc.metrics.StartConsume(handleParam.Message.EstimateSize())
defer func() { g.Finish() }()
}
return h.Handle(handleParam)
},
}

43 changes: 25 additions & 18 deletions internal/distributed/streaming/internal/consumer/consumer_test.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/client/handler"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/consumer"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
)
@@ -22,22 +23,25 @@ func TestResumableConsumer(t *testing.T) {
ch := make(chan struct{})
c.EXPECT().Done().Return(ch)
c.EXPECT().Error().Return(errors.New("test"))
c.EXPECT().Close().Return()
c.EXPECT().Close().Return(nil)
rc := NewResumableConsumer(func(ctx context.Context, opts *handler.ConsumerOptions) (consumer.Consumer, error) {
if i == 0 {
i++
ok, err := opts.MessageHandler.Handle(context.Background(), message.NewImmutableMesasge(
walimplstest.NewTestMessageID(123),
[]byte("payload"),
map[string]string{
"key": "value",
"_t": "1",
"_tt": message.EncodeUint64(456),
"_v": "1",
"_lc": walimplstest.NewTestMessageID(123).Marshal(),
}))
assert.True(t, ok)
assert.NoError(t, err)
result := opts.MessageHandler.Handle(message.HandleParam{
Ctx: context.Background(),
Message: message.NewImmutableMesasge(
walimplstest.NewTestMessageID(123),
[]byte("payload"),
map[string]string{
"key": "value",
"_t": "1",
"_tt": message.EncodeUint64(456),
"_v": "1",
"_lc": walimplstest.NewTestMessageID(123).Marshal(),
}),
})
assert.True(t, result.MessageHandled)
assert.NoError(t, result.Error)
return c, nil
} else if i == 1 {
i++
@@ -46,15 +50,15 @@ func TestResumableConsumer(t *testing.T) {
newC := mock_consumer.NewMockConsumer(t)
newC.EXPECT().Done().Return(make(<-chan struct{}))
newC.EXPECT().Error().Return(errors.New("test"))
newC.EXPECT().Close().Return()
newC.EXPECT().Close().Return(nil)
return newC, nil
}, &ConsumerOptions{
PChannel: "test",
DeliverPolicy: options.DeliverPolicyAll(),
DeliverFilters: []options.DeliverFilter{
options.DeliverFilterTimeTickGT(1),
},
MessageHandler: message.ChanMessageHandler(make(chan message.ImmutableMessage, 2)),
MessageHandler: adaptor.ChanMessageHandler(make(chan message.ImmutableMessage, 2)),
})

select {
@@ -76,10 +80,13 @@ func TestResumableConsumer(t *testing.T) {
func TestHandler(t *testing.T) {
ch := make(chan message.ImmutableMessage, 100)
hNop := nopCloseHandler{
Handler: message.ChanMessageHandler(ch),
Handler: adaptor.ChanMessageHandler(ch),
}
hNop.Handle(context.Background(), nil)
assert.Nil(t, <-ch)
hNop.Handle(message.HandleParam{
Ctx: context.Background(),
Message: message.NewImmutableMesasge(walimplstest.NewTestMessageID(123), []byte("payload"), nil),
})
assert.NotNil(t, <-ch)
hNop.Close()
select {
case <-ch:
12 changes: 4 additions & 8 deletions internal/distributed/streaming/internal/consumer/handler.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
package consumer

import (
"context"

"github.com/milvus-io/milvus/pkg/streaming/util/message"
)

type handleFunc func(ctx context.Context, msg message.ImmutableMessage) (bool, error)

// nopCloseHandler is a handler that do nothing when close.
type nopCloseHandler struct {
message.Handler
HandleInterceptor func(ctx context.Context, msg message.ImmutableMessage, handle handleFunc) (bool, error)
HandleInterceptor func(handleParam message.HandleParam, h message.Handler) message.HandleResult
}

// Handle is the callback for handling message.
func (nch nopCloseHandler) Handle(ctx context.Context, msg message.ImmutableMessage) (bool, error) {
func (nch nopCloseHandler) Handle(handleParam message.HandleParam) message.HandleResult {
if nch.HandleInterceptor != nil {
return nch.HandleInterceptor(ctx, msg, nch.Handler.Handle)
return nch.HandleInterceptor(handleParam, nch.Handler)
}
return nch.Handler.Handle(ctx, msg)
return nch.Handler.Handle(handleParam)
}

// Close is called after all messages are handled or handling is interrupted.
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package consumer

import (
"context"

"github.com/milvus-io/milvus/pkg/streaming/util/message"
)

@@ -13,16 +11,20 @@ type timeTickOrderMessageHandler struct {
lastTimeTick uint64
}

func (mh *timeTickOrderMessageHandler) Handle(ctx context.Context, msg message.ImmutableMessage) (bool, error) {
lastConfirmedMessageID := msg.LastConfirmedMessageID()
timetick := msg.TimeTick()
func (mh *timeTickOrderMessageHandler) Handle(handleParam message.HandleParam) message.HandleResult {
var lastConfirmedMessageID message.MessageID
var lastTimeTick uint64
if handleParam.Message != nil {
lastConfirmedMessageID = handleParam.Message.LastConfirmedMessageID()
lastTimeTick = handleParam.Message.TimeTick()
}

ok, err := mh.inner.Handle(ctx, msg)
if ok {
result := mh.inner.Handle(handleParam)
if result.MessageHandled {
mh.lastConfirmedMessageID = lastConfirmedMessageID
mh.lastTimeTick = timetick
mh.lastTimeTick = lastTimeTick
}
return ok, err
return result
}

func (mh *timeTickOrderMessageHandler) Close() {
5 changes: 3 additions & 2 deletions internal/distributed/streaming/internal/producer/producer.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ import (
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@@ -77,7 +78,7 @@ type ResumableProducer struct {
}

// Produce produce a new message to log service.
func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMessage) (result *producer.ProduceResult, err error) {
func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMessage) (result *types.AppendResult, err error) {
if !p.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, errors.Wrapf(errs.ErrClosed, "produce on closed producer")
}
@@ -94,7 +95,7 @@ func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMess
return nil, err
}

produceResult, err := producerHandler.Produce(ctx, msg)
produceResult, err := producerHandler.Append(ctx, msg)
if err == nil {
return produceResult, nil
}
Original file line number Diff line number Diff line change
@@ -14,12 +14,13 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/producer"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)

func TestResumableProducer(t *testing.T) {
p := mock_producer.NewMockProducer(t)
msgID := mock_message.NewMockMessageID(t)
p.EXPECT().Produce(mock.Anything, mock.Anything).Return(&producer.ProduceResult{
p.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.AppendResult{
MessageID: msgID,
TimeTick: 100,
}, nil)
@@ -47,11 +48,11 @@ func TestResumableProducer(t *testing.T) {
} else if i == 2 {
p := mock_producer.NewMockProducer(t)
msgID := mock_message.NewMockMessageID(t)
p.EXPECT().Produce(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (*producer.ProduceResult, error) {
p.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (*types.AppendResult, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
return &producer.ProduceResult{
return &types.AppendResult{
MessageID: msgID,
TimeTick: 100,
}, nil
3 changes: 2 additions & 1 deletion internal/distributed/streaming/streaming_test.go
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
_ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@@ -109,7 +110,7 @@ func TestStreamingConsume(t *testing.T) {
t.Skip()
streaming.Init()
defer streaming.Release()
ch := make(message.ChanMessageHandler, 10)
ch := make(adaptor.ChanMessageHandler, 10)
s := streaming.WAL().Read(context.Background(), streaming.ReadOption{
VChannel: vChannels[0],
DeliverPolicy: options.DeliverPolicyAll(),
2 changes: 1 addition & 1 deletion internal/distributed/streaming/wal_test.go
Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ func TestWAL(t *testing.T) {
return true
}
})
p.EXPECT().Produce(mock.Anything, mock.Anything).Return(&types.AppendResult{
p.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.AppendResult{
MessageID: walimplstest.NewTestMessageID(1),
TimeTick: 10,
TxnCtx: &message.TxnContext{
1 change: 0 additions & 1 deletion internal/distributed/streamingnode/service.go
Original file line number Diff line number Diff line change
@@ -242,7 +242,6 @@ func (s *Server) init() (err error) {
WithDataCoordClient(s.dataCoord).
WithSession(s.session).
WithMetaKV(s.metaKV).
WithChunkManager(s.chunkManager).
Build()
if err := s.streamingnode.Init(s.ctx); err != nil {
return errors.Wrap(err, "StreamingNode service init failed")

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9060f98

Please sign in to comment.