From c9b40ceb1e2f014c1eead044bfa63ac925a72dbf Mon Sep 17 00:00:00 2001 From: shaoting-huang Date: Thu, 25 Jul 2024 19:12:12 +0800 Subject: [PATCH] demo Signed-off-by: shaoting-huang --- internal/storage/serde_events_test.go | 115 ++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/internal/storage/serde_events_test.go b/internal/storage/serde_events_test.go index 83953de999453..4361bbcc3f539 100644 --- a/internal/storage/serde_events_test.go +++ b/internal/storage/serde_events_test.go @@ -19,7 +19,9 @@ package storage import ( "bytes" "context" + "fmt" "io" + "math" "strconv" "testing" @@ -493,3 +495,116 @@ func readDeltaLog(size int, blob *Blob) error { } return nil } + +func TestReaderIO(t *testing.T) { + totalMemory := 64 // MB + wFileSize := 1000 // MB + nFileSize := 32 // MB + totalRows := 400000 + + wRowsPerRowGroup := totalRows / wFileSize + nRowsPerRowGroup := totalRows / nFileSize + + wRowGroupSize := 1 // MB + nRowGroupSize := 1 // MB + + maxAlignment := func(wRowsInMemory, nRowsInMemory int, memorySize int) (int, int, int) { + dp := make([][]int, memorySize+1) + for i := range dp { + dp[i] = make([]int, memorySize+1) + } + for i := 0; i <= memorySize; i++ { + for j := 0; j <= i; j++ { + a := j / wRowGroupSize // + b := (i - j) / nRowGroupSize + wRows := wRowsInMemory + a*wRowsPerRowGroup + nRows := nRowsInMemory + b*nRowsPerRowGroup + dp[i][j] = min(wRows, nRows) + } + } + maxMinRows := 0 + wRowGroupToRead := 0 + nRowGroupToRead := 0 + for i := 0; i <= memorySize; i++ { + for j := 0; j <= i; j++ { + if dp[i][j] > maxMinRows { + maxMinRows = dp[i][j] + wRowGroupToRead = j + nRowGroupToRead = i - j + } + } + } + return wRowGroupToRead, nRowGroupToRead, maxMinRows + } + + wio := 0 + nio := 0 + + wRowsInMemory := 0 + nRowsInMemory := 0 + + memory := totalMemory + + for totalRows > 0 { + // memory allocator + wRowGroupToRead, nRowGroupToRead, maxMinRows := maxAlignment(wRowsInMemory, nRowsInMemory, memory) + fmt.Printf("read wide %d row groups , read narrow %d row groups, max rows %d \n", wRowGroupToRead, nRowGroupToRead, maxMinRows) + + // IO Merge + if wRowGroupToRead > 0 { + wio += 1 + } + if nRowGroupToRead > 0 { + nio += 1 + } + + // deserailize maxMinRows + totalRows -= maxMinRows + wRowsInMemory += wRowGroupToRead*wRowsPerRowGroup - maxMinRows + nRowsInMemory += nRowGroupToRead*nRowsPerRowGroup - maxMinRows + + ceilFloatToInt := func(n float64) int { + return int(math.Ceil(n)) + } + memory -= (ceilFloatToInt(float64(wRowGroupSize/wRowsPerRowGroup)) + ceilFloatToInt(float64(nRowGroupSize/nRowsPerRowGroup))) * maxMinRows + + fmt.Printf("%d wide rows in memory, %d narrow rows in memory, memory left: %d\n", wRowsInMemory, nRowsInMemory, memory) + } + + fmt.Printf("wide column read IO: %d, narrow columns read IO: %d, total read IO: %d under reading memory %d MB", wio, nio, wio+nio, totalMemory) +} + +func TestWriterIO(t *testing.T) { + records := 400000 + wide_size := 2.5 + narrow_size := 0.08 + memory := float64(64 * 1024) + + wm := float64(0) + nm := float64(0) + wio := 0 + nio := 0 + for i := 0; i < records; i++ { + wm += wide_size + nm += narrow_size + // 扩展:use max heap to get max memory use of column group + if wm+nm >= memory { + if wm >= nm { + wm = 0 + wio += 1 + } else { + nm = 0 + nio += 1 + } + } + } + if wm > 0 { + wio += 1 + } + if nm > 0 { + nio += 1 + } + fmt.Println(wio) + fmt.Println(nio) + fmt.Println(wio + nio) +}