Skip to content

Commit

Permalink
fix: cherry pick warmup async (#39402)
Browse files Browse the repository at this point in the history
related pr: #38690
issue: #38692

---------

Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby authored Jan 20, 2025
1 parent 5853f02 commit 2ead5d6
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 7 deletions.
1 change: 1 addition & 0 deletions internal/querynodev2/delegator/delegator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
DeltaPosition: insertData.StartPosition,
Level: datapb.SegmentLevel_L1,
},
nil,
)
if err != nil {
log.Error("failed to create new segment",
Expand Down
1 change: 1 addition & 0 deletions internal/querynodev2/segments/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (s *ManagerSuite) SetupTest() {
InsertChannel: s.channels[i],
Level: s.levels[i],
},
nil,
)
s.Require().NoError(err)
s.segments = append(s.segments, segment)
Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/segments/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func initWarmupPool() {
conc.WithPreAlloc(false),
conc.WithDisablePurge(false),
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
conc.WithNonBlocking(true), // make warming up non blocking
conc.WithNonBlocking(false),
)

warmupPool.Store(pool)
Expand Down
1 change: 1 addition & 0 deletions internal/querynodev2/segments/reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (suite *ReduceSuite) SetupTest() {
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy,
},
nil,
)
suite.Require().NoError(err)

Expand Down
2 changes: 2 additions & 0 deletions internal/querynodev2/segments/retrieve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (suite *RetrieveSuite) SetupTest() {
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy,
},
nil,
)
suite.Require().NoError(err)

Expand Down Expand Up @@ -124,6 +125,7 @@ func (suite *RetrieveSuite) SetupTest() {
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy,
},
nil,
)
suite.Require().NoError(err)

Expand Down
2 changes: 2 additions & 0 deletions internal/querynodev2/segments/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (suite *SearchSuite) SetupTest() {
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy,
},
nil,
)
suite.Require().NoError(err)

Expand Down Expand Up @@ -115,6 +116,7 @@ func (suite *SearchSuite) SetupTest() {
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy,
},
nil,
)
suite.Require().NoError(err)

Expand Down
67 changes: 62 additions & 5 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"io"
"runtime"
"strings"
"sync"
"time"
"unsafe"

Expand Down Expand Up @@ -261,13 +262,15 @@ type LocalSegment struct {
fields *typeutil.ConcurrentMap[int64, *FieldInfo]
fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo]
space *milvus_storage.Space
warmupDispatcher *AsyncWarmupDispatcher
}

func NewSegment(ctx context.Context,
collection *Collection,
segmentType SegmentType,
version int64,
loadInfo *querypb.SegmentLoadInfo,
warmupDispatcher *AsyncWarmupDispatcher,
) (Segment, error) {
log := log.Ctx(ctx)
/*
Expand Down Expand Up @@ -326,9 +329,10 @@ func NewSegment(ctx context.Context,
fields: typeutil.NewConcurrentMap[int64, *FieldInfo](),
fieldIndexes: typeutil.NewConcurrentMap[int64, *IndexedFieldInfo](),

memSize: atomic.NewInt64(-1),
rowNum: atomic.NewInt64(-1),
insertCount: atomic.NewInt64(0),
memSize: atomic.NewInt64(-1),
rowNum: atomic.NewInt64(-1),
insertCount: atomic.NewInt64(0),
warmupDispatcher: warmupDispatcher,
}

if err := segment.initializeSegment(); err != nil {
Expand Down Expand Up @@ -1507,7 +1511,7 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap
return nil, nil
}).Await()
case "async":
GetWarmupPool().Submit(func() (any, error) {
task := func() (any, error) {
// failed to wait for state update, return directly
if !s.ptrLock.BlockUntilDataLoadedOrReleased() {
return nil, nil
Expand All @@ -1527,7 +1531,8 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap
}
log.Info("warming up chunk cache asynchronously done")
return nil, nil
})
}
s.warmupDispatcher.AddTask(task)
default:
// no warming up
}
Expand Down Expand Up @@ -1666,3 +1671,55 @@ func (s *LocalSegment) indexNeedLoadRawData(schema *schemapb.CollectionSchema, i
}
return !typeutil.IsVectorType(fieldSchema.DataType) && s.HasRawData(indexInfo.IndexInfo.FieldID), nil
}

type (
WarmupTask = func() (any, error)
AsyncWarmupDispatcher struct {
mu sync.RWMutex
tasks []WarmupTask
notify chan struct{}
}
)

func NewWarmupDispatcher() *AsyncWarmupDispatcher {
return &AsyncWarmupDispatcher{
notify: make(chan struct{}, 1),
}
}

func (d *AsyncWarmupDispatcher) AddTask(task func() (any, error)) {
d.mu.Lock()
d.tasks = append(d.tasks, task)
d.mu.Unlock()
select {
case d.notify <- struct{}{}:
default:
}
}

func (d *AsyncWarmupDispatcher) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-d.notify:
d.mu.RLock()
tasks := make([]WarmupTask, len(d.tasks))
copy(tasks, d.tasks)
d.mu.RUnlock()

for _, task := range tasks {
select {
case <-ctx.Done():
return
default:
GetWarmupPool().Submit(task)
}
}

d.mu.Lock()
d.tasks = d.tasks[len(tasks):]
d.mu.Unlock()
}
}
}
7 changes: 6 additions & 1 deletion internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,12 +569,15 @@ func NewLoader(
duf := NewDiskUsageFetcher(ctx)
go duf.Start()

warmupDispatcher := NewWarmupDispatcher()
go warmupDispatcher.Run(ctx)
loader := &segmentLoader{
manager: manager,
cm: cm,
loadingSegments: typeutil.NewConcurrentMap[int64, *loadResult](),
committedResourceNotifier: syncutil.NewVersionedNotifier(),
duf: duf,
warmupDispatcher: warmupDispatcher,
}

return loader
Expand Down Expand Up @@ -617,7 +620,8 @@ type segmentLoader struct {
committedResource LoadResource
committedResourceNotifier *syncutil.VersionedNotifier

duf *diskUsageFetcher
duf *diskUsageFetcher
warmupDispatcher *AsyncWarmupDispatcher
}

var _ Loader = (*segmentLoader)(nil)
Expand Down Expand Up @@ -700,6 +704,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
segmentType,
version,
loadInfo,
loader.warmupDispatcher,
)
if err != nil {
log.Warn("load segment failed when create new segment",
Expand Down
24 changes: 24 additions & 0 deletions internal/querynodev2/segments/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"fmt"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
storage "github.com/milvus-io/milvus/internal/storage"
Expand Down Expand Up @@ -90,6 +93,7 @@ func (suite *SegmentSuite) SetupTest() {
},
},
},
nil,
)
suite.Require().NoError(err)

Expand Down Expand Up @@ -121,6 +125,7 @@ func (suite *SegmentSuite) SetupTest() {
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy,
},
nil,
)
suite.Require().NoError(err)

Expand Down Expand Up @@ -221,3 +226,22 @@ func (suite *SegmentSuite) TestSegmentReleased() {
func TestSegment(t *testing.T) {
suite.Run(t, new(SegmentSuite))
}

func TestWarmupDispatcher(t *testing.T) {
d := NewWarmupDispatcher()
ctx := context.Background()
go d.Run(ctx)

completed := atomic.NewInt64(0)
taskCnt := 10000
for i := 0; i < taskCnt; i++ {
d.AddTask(func() (any, error) {
completed.Inc()
return nil, nil
})
}

assert.Eventually(t, func() bool {
return completed.Load() == int64(taskCnt)
}, 10*time.Second, time.Second)
}
1 change: 1 addition & 0 deletions internal/querynodev2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (suite *QueryNodeSuite) TestStop() {
Level: datapb.SegmentLevel_Legacy,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", 1),
},
nil,
)
suite.NoError(err)
suite.node.manager.Segment.Put(context.Background(), segments.SegmentTypeSealed, segment)
Expand Down

0 comments on commit 2ead5d6

Please sign in to comment.