diff --git a/internal/rootcoord/dml_channels.go b/internal/rootcoord/dml_channels.go index 058fc24c8ba94..8e9719614f4b3 100644 --- a/internal/rootcoord/dml_channels.go +++ b/internal/rootcoord/dml_channels.go @@ -185,17 +185,7 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref } if params.PreCreatedTopicEnabled.GetAsBool() { - subName := fmt.Sprintf("pre-created-topic-check-%s", name) - ms.AsConsumer(ctx, []string{name}, subName, common.SubscriptionPositionUnknown) - // check if topic is existed - // kafka and rmq will err if the topic does not yet exist, pulsar will not - // allow topics is not empty, for the reason that when restart or upgrade, the topic is not empty - // if there are any message that not belong to milvus, will skip it - err := ms.CheckTopicValid(name) - if err != nil { - log.Error("created topic is invaild", zap.String("name", name), zap.Error(err)) - panic("created topic is invaild") - } + d.checkPreCreatedTopic(ctx, factory, name) } ms.AsProducer([]string{name}) @@ -220,6 +210,29 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref return d } +func (d *dmlChannels) checkPreCreatedTopic(ctx context.Context, factory msgstream.Factory, name string) { + tmpMs, err := factory.NewMsgStream(ctx) + if err != nil { + panic(fmt.Sprintf("failed to add msgstream, name:%s, err:%v", name, err)) + } + defer tmpMs.Close() + + subName := fmt.Sprintf("pre-created-topic-check-%s", name) + err = tmpMs.AsConsumer(ctx, []string{name}, subName, common.SubscriptionPositionUnknown) + if err != nil { + panic(fmt.Sprintf("failed to add consumer, name:%s, err:%v", name, err)) + } + + // check if topic is existed + // kafka and rmq will err if the topic does not yet exist, pulsar will not + // allow topics is not empty, for the reason that when restart or upgrade, the topic is not empty + // if there are any message that not belong to milvus, will skip it + err = tmpMs.CheckTopicValid(name) + if err != nil { + panic(fmt.Sprintf("created topic is invalid, name:%s, err:%v", name, err)) + } +} + func (d *dmlChannels) getChannelNames(count int) []string { d.mut.Lock() defer d.mut.Unlock()