Skip to content

Commit

Permalink
enhance: always enable streaming service (#40253)
Browse files Browse the repository at this point in the history
issue: #38399

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Feb 28, 2025
1 parent 0a4e7b5 commit bc8e02d
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 13 deletions.
5 changes: 5 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@ import (
"github.com/milvus-io/milvus/cmd/asan"
"github.com/milvus-io/milvus/cmd/milvus"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)

func main() {
// after 2.6.0, we enable streaming service by default.
// TODO: after remove all streamingutil.IsStreamingServiceEnabled(), we can remove this code.
streamingutil.SetStreamingServiceEnabled()

defer asan.LsanDoLeakCheck()
idx := slices.Index(os.Args, "--run-with-subprocess")

Expand Down
5 changes: 0 additions & 5 deletions cmd/milvus/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
case typeutil.IndexNodeRole:
role.EnableIndexNode = true
case typeutil.StreamingNodeRole:
streamingutil.MustEnableStreamingService()
streamingutil.EnableEmbededQueryNode()
role.EnableStreamingNode = true
role.EnableQueryNode = true
Expand Down Expand Up @@ -206,10 +205,6 @@ func formatFlags(args []string, flags *flag.FlagSet) (alias string, enableRootCo
flags.BoolVar(&enableProxy, typeutil.ProxyRole, false, "enable proxy node")
flags.BoolVar(&enableStreamingNode, typeutil.StreamingNodeRole, false, "enable streaming node")

if enableStreamingNode {
streamingutil.MustEnableStreamingService()
}

serverType := args[2]
if serverType == typeutil.EmbeddedRole {
flags.SetOutput(io.Discard)
Expand Down
8 changes: 8 additions & 0 deletions internal/util/streamingutil/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ func IsStreamingServiceEnabled() bool {
return os.Getenv(MilvusStreamingServiceEnabled) == "1"
}

// SetStreamingServiceEnabled set the env that indicates whether the streaming service is enabled.
func SetStreamingServiceEnabled() {
err := os.Setenv(MilvusStreamingServiceEnabled, "1")
if err != nil {
panic(err)
}
}

// MustEnableStreamingService panics if the streaming service is not enabled.
func MustEnableStreamingService() {
if !IsStreamingServiceEnabled() {
Expand Down
8 changes: 0 additions & 8 deletions internal/util/streamingutil/test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,6 @@ package streamingutil

import "os"

// SetStreamingServiceEnabled set the env that indicates whether the streaming service is enabled.
func SetStreamingServiceEnabled() {
err := os.Setenv(MilvusStreamingServiceEnabled, "1")
if err != nil {
panic(err)
}
}

// UnsetStreamingServiceEnabled unsets the env that indicates whether the streaming service is enabled.
func UnsetStreamingServiceEnabled() {
err := os.Setenv(MilvusStreamingServiceEnabled, "0")
Expand Down
3 changes: 3 additions & 0 deletions scripts/start_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,6 @@ nohup ./bin/milvus run indexcoord --run-with-subprocess > /tmp/indexcoord.log

echo "Starting indexnode..."
nohup ./bin/milvus run indexnode --run-with-subprocess > /tmp/indexnode.log 2>&1 &

echo "Starting streamingnode..."
nohup ./bin/milvus run streamingnode --run-with-subprocess > /tmp/streamingnode.log 2>&1 &

0 comments on commit bc8e02d

Please sign in to comment.