Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: milvus-io/milvus
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: master
Choose a base ref
...
head repository: shaoting-huang/milvus
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref
Can’t automatically merge. Don’t worry, you can still create the pull request.
  • 2 commits
  • 1 file changed
  • 1 contributor

Commits on Jul 25, 2024

  1. demo

    Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
    shaoting-huang committed Jul 25, 2024
    Copy the full SHA
    c9b40ce View commit details

Commits on Sep 12, 2024

  1. Copy the full SHA
    ebd75a5 View commit details
Showing with 115 additions and 0 deletions.
  1. +115 −0 internal/storage/serde_events_test.go
115 changes: 115 additions & 0 deletions internal/storage/serde_events_test.go
Original file line number Diff line number Diff line change
@@ -19,7 +19,9 @@ package storage
import (
"bytes"
"context"
"fmt"
"io"
"math"
"strconv"
"testing"

@@ -490,3 +492,116 @@ func readDeltaLog(size int, blob *Blob) error {
}
return nil
}

func TestReaderIO(t *testing.T) {
totalMemory := 64 // MB
wFileSize := 1000 // MB
nFileSize := 32 // MB
totalRows := 400000

wRowsPerRowGroup := totalRows / wFileSize
nRowsPerRowGroup := totalRows / nFileSize

wRowGroupSize := 1 // MB
nRowGroupSize := 1 // MB

maxAlignment := func(wRowsInMemory, nRowsInMemory int, memorySize int) (int, int, int) {
dp := make([][]int, memorySize+1)
for i := range dp {
dp[i] = make([]int, memorySize+1)
}
for i := 0; i <= memorySize; i++ {
for j := 0; j <= i; j++ {
a := j / wRowGroupSize //
b := (i - j) / nRowGroupSize
wRows := wRowsInMemory + a*wRowsPerRowGroup
nRows := nRowsInMemory + b*nRowsPerRowGroup
dp[i][j] = min(wRows, nRows)
}
}
maxMinRows := 0
wRowGroupToRead := 0
nRowGroupToRead := 0
for i := 0; i <= memorySize; i++ {
for j := 0; j <= i; j++ {
if dp[i][j] > maxMinRows {
maxMinRows = dp[i][j]
wRowGroupToRead = j
nRowGroupToRead = i - j
}
}
}
return wRowGroupToRead, nRowGroupToRead, maxMinRows
}

wio := 0
nio := 0

wRowsInMemory := 0
nRowsInMemory := 0

memory := totalMemory

for totalRows > 0 {
// memory allocator
wRowGroupToRead, nRowGroupToRead, maxMinRows := maxAlignment(wRowsInMemory, nRowsInMemory, memory)
fmt.Printf("read wide %d row groups , read narrow %d row groups, max rows %d \n", wRowGroupToRead, nRowGroupToRead, maxMinRows)

// IO Merge
if wRowGroupToRead > 0 {
wio += 1
}
if nRowGroupToRead > 0 {
nio += 1
}

// deserailize maxMinRows
totalRows -= maxMinRows
wRowsInMemory += wRowGroupToRead*wRowsPerRowGroup - maxMinRows
nRowsInMemory += nRowGroupToRead*nRowsPerRowGroup - maxMinRows

ceilFloatToInt := func(n float64) int {
return int(math.Ceil(n))
}
memory -= (ceilFloatToInt(float64(wRowGroupSize/wRowsPerRowGroup)) + ceilFloatToInt(float64(nRowGroupSize/nRowsPerRowGroup))) * maxMinRows

fmt.Printf("%d wide rows in memory, %d narrow rows in memory, memory left: %d\n", wRowsInMemory, nRowsInMemory, memory)
}

fmt.Printf("wide column read IO: %d, narrow columns read IO: %d, total read IO: %d under reading memory %d MB", wio, nio, wio+nio, totalMemory)
}

func TestWriterIO(t *testing.T) {
records := 400000
wide_size := 2.5
narrow_size := 0.08
memory := float64(64 * 1024)

wm := float64(0)
nm := float64(0)
wio := 0
nio := 0
for i := 0; i < records; i++ {
wm += wide_size
nm += narrow_size
// 扩展:use max heap to get max memory use of column group
if wm+nm >= memory {
if wm >= nm {
wm = 0
wio += 1
} else {
nm = 0
nio += 1
}
}
}
if wm > 0 {
wio += 1
}
if nm > 0 {
nio += 1
}
fmt.Println(wio)
fmt.Println(nio)
fmt.Println(wio + nio)
}