Skip to content

Commit

Permalink
util: extend row format with checksum (#42859)
Browse files Browse the repository at this point in the history
ref #42747
  • Loading branch information
zyguan authored Apr 10, 2023
1 parent 08085bc commit ad59092
Show file tree
Hide file tree
Showing 4 changed files with 353 additions and 31 deletions.
5 changes: 4 additions & 1 deletion util/rowcodec/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import (
// CodecVer is the constant number that represent the new row format.
const CodecVer = 128

var errInvalidCodecVer = errors.New("invalid codec version")
var (
errInvalidCodecVer = errors.New("invalid codec version")
errInvalidChecksumVer = errors.New("invalid checksum version")
)

// First byte in the encoded value which specifies the encoding type.
const (
Expand Down
65 changes: 54 additions & 11 deletions util/rowcodec/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,69 @@ type Encoder struct {
values []*types.Datum
// Enable indicates whether this encoder should be use.
Enable bool
// WithChecksum indicates whether to append checksum to the encoded row data.
WithChecksum bool
}

// Encode encodes a row from a datums slice.
func (encoder *Encoder) Encode(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum, buf []byte) ([]byte, error) {
encoder.reset()
err := encoder.encodeDatums(sc, colIDs, values)
if err != nil {
return nil, err
}
return encoder.row.toBytes(buf[:0]), nil
}

// EncodeWithExtraChecksum likes Encode but also appends an extra checksum if checksum is enabled.
func (encoder *Encoder) EncodeWithExtraChecksum(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum, checksum uint32, buf []byte) ([]byte, error) {
encoder.reset()
if encoder.hasChecksum() {
encoder.setExtraChecksum(checksum)
}
err := encoder.encodeDatums(sc, colIDs, values)
if err != nil {
return nil, err
}
return encoder.row.toBytes(buf[:0]), nil
}

// Checksum caclulates the checksum of datumns.
func (encoder *Encoder) Checksum(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum) (uint32, error) {
encoder.reset()
encoder.flags |= rowFlagChecksum
err := encoder.encodeDatums(sc, colIDs, values)
if err != nil {
return 0, err
}
return encoder.checksum1, nil
}

func (encoder *Encoder) encodeDatums(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum) error {
encoder.appendColVals(colIDs, values)
numCols, notNullIdx := encoder.reformatCols()
err := encoder.encodeRowCols(sc, numCols, notNullIdx)
if err != nil {
return nil, err
return err
}
return encoder.row.toBytes(buf[:0]), nil
return nil
}

func (encoder *Encoder) reset() {
encoder.large = false
encoder.flags = 0
encoder.numNotNullCols = 0
encoder.numNullCols = 0
encoder.data = encoder.data[:0]
encoder.tempColIDs = encoder.tempColIDs[:0]
encoder.values = encoder.values[:0]
encoder.offsets32 = encoder.offsets32[:0]
encoder.offsets = encoder.offsets[:0]
encoder.checksumHeader = 0
encoder.checksum1 = 0
encoder.checksum2 = 0
if encoder.WithChecksum {
encoder.flags |= rowFlagChecksum
}
}

func (encoder *Encoder) appendColVals(colIDs []int64, values []types.Datum) {
Expand All @@ -67,7 +107,7 @@ func (encoder *Encoder) appendColVals(colIDs []int64, values []types.Datum) {

func (encoder *Encoder) appendColVal(colID int64, d *types.Datum) {
if colID > 255 {
encoder.large = true
encoder.flags |= rowFlagLarge
}
if d.IsNull() {
encoder.numNullCols++
Expand All @@ -83,7 +123,7 @@ func (encoder *Encoder) reformatCols() (numCols, notNullIdx int) {
numCols = len(encoder.tempColIDs)
nullIdx := numCols - int(r.numNullCols)
notNullIdx = 0
if r.large {
if r.large() {
r.initColIDs32()
r.initOffsets32()
} else {
Expand All @@ -92,14 +132,14 @@ func (encoder *Encoder) reformatCols() (numCols, notNullIdx int) {
}
for i, colID := range encoder.tempColIDs {
if encoder.values[i].IsNull() {
if r.large {
if r.large() {
r.colIDs32[nullIdx] = uint32(colID)
} else {
r.colIDs[nullIdx] = byte(colID)
}
nullIdx++
} else {
if r.large {
if r.large() {
r.colIDs32[notNullIdx] = uint32(colID)
} else {
r.colIDs[notNullIdx] = byte(colID)
Expand All @@ -108,7 +148,7 @@ func (encoder *Encoder) reformatCols() (numCols, notNullIdx int) {
notNullIdx++
}
}
if r.large {
if r.large() {
largeNotNullSorter := (*largeNotNullSorter)(encoder)
sort.Sort(largeNotNullSorter)
if r.numNullCols > 0 {
Expand Down Expand Up @@ -136,7 +176,7 @@ func (encoder *Encoder) encodeRowCols(sc *stmtctx.StatementContext, numCols, not
return err
}
// handle convert to large
if len(r.data) > math.MaxUint16 && !r.large {
if len(r.data) > math.MaxUint16 && !r.large() {
r.initColIDs32()
for j := 0; j < numCols; j++ {
r.colIDs32[j] = uint32(r.colIDs[j])
Expand All @@ -145,14 +185,17 @@ func (encoder *Encoder) encodeRowCols(sc *stmtctx.StatementContext, numCols, not
for j := 0; j <= i; j++ {
r.offsets32[j] = uint32(r.offsets[j])
}
r.large = true
r.flags |= rowFlagLarge
}
if r.large {
if r.large() {
r.offsets32[i] = uint32(len(r.data))
} else {
r.offsets[i] = uint16(len(r.data))
}
}
if r.hasChecksum() {
return r.calcChecksum()
}
return nil
}

Expand Down
155 changes: 136 additions & 19 deletions util/rowcodec/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,98 @@ package rowcodec

import (
"encoding/binary"
"hash/crc32"
)

// row is the struct type used to access a row.
const (
rowFlagLarge byte = 1 << iota
rowFlagChecksum
)

const (
checksumMaskVersion byte = 0b0111
checksumFlagExtra byte = 0b1000
)

// row is the struct type used to access a row and the row format is shown as the following.
//
// Row Format
//
// 0 1 2 3
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | VER | FLAGS | NOT_NULL_COL_CNT |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | NULL_COL_CNT | ...NOT_NULL_COL_IDS... |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | ...NULL_COL_IDS... | ...NOT_NULL_COL_OFFSETS... |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | ...NOT_NULL_COL_DATA... |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | ...CHECKSUM... |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//
// - FLAGS
// - 0x01: large (when max(col_ids) > 255 or len(col_data) > max_u16)
// - size of col_id = large ? 4 : 1
// - size of col_offset = large ? 4 : 2
// - 0x02: has checksum
//
// Checksum
//
// 0 1 2 3 4 5 6 7 8
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | |E| VER | CHECKSUM | EXTRA_CHECKSUM(OPTIONAL) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// HEADER
//
// - HEADER
// - VER: version
// - E: has extra checksum
// - CHECKSUM
// - little-endian CRC32(IEEE) when hdr.ver = 0 (default)
type row struct {
// small: colID []byte, offsets []uint16, optimized for most cases.
// large: colID []uint32, offsets []uint32.
large bool
flags byte
checksumHeader byte
numNotNullCols uint16
numNullCols uint16
colIDs []byte

// for small row: colID []byte, offsets []uint16, optimized for most cases.
colIDs []byte
offsets []uint16
data []byte

// for large row
// for large row: colID []uint32, offsets []uint32.
colIDs32 []uint32
offsets32 []uint32

data []byte
checksum1 uint32
checksum2 uint32
}

func (r *row) large() bool { return r.flags&rowFlagLarge > 0 }

func (r *row) hasChecksum() bool { return r.flags&rowFlagChecksum > 0 }

func (r *row) hasExtraChecksum() bool { return r.checksumHeader&checksumFlagExtra > 0 }

func (r *row) checksumVersion() int { return int(r.checksumHeader & checksumMaskVersion) }

func (r *row) calcChecksum() error {
if r.checksumVersion() != 0 {
return errInvalidChecksumVer
}
r.checksum1 = crc32.ChecksumIEEE(r.data)
return nil
}

func (r *row) setExtraChecksum(v uint32) {
r.checksumHeader |= checksumFlagExtra
r.checksum2 = v
}

func (r *row) getData(i int) []byte {
var start, end uint32
if r.large {
if r.large() {
if i > 0 {
start = r.offsets32[i-1]
}
Expand All @@ -55,46 +125,76 @@ func (r *row) fromBytes(rowData []byte) error {
if rowData[0] != CodecVer {
return errInvalidCodecVer
}
r.large = rowData[1]&1 > 0
r.flags = rowData[1]
r.numNotNullCols = binary.LittleEndian.Uint16(rowData[2:])
r.numNullCols = binary.LittleEndian.Uint16(rowData[4:])
cursor := 6
if r.large {
lastOffset := 0
if r.large() {
colIDsLen := int(r.numNotNullCols+r.numNullCols) * 4
r.colIDs32 = bytesToU32Slice(rowData[cursor : cursor+colIDsLen])
cursor += colIDsLen
offsetsLen := int(r.numNotNullCols) * 4
r.offsets32 = bytesToU32Slice(rowData[cursor : cursor+offsetsLen])
cursor += offsetsLen
if n := len(r.offsets32); n > 0 {
lastOffset = int(r.offsets32[n-1])
}
} else {
colIDsLen := int(r.numNotNullCols + r.numNullCols)
r.colIDs = rowData[cursor : cursor+colIDsLen]
cursor += colIDsLen
offsetsLen := int(r.numNotNullCols) * 2
r.offsets = bytes2U16Slice(rowData[cursor : cursor+offsetsLen])
cursor += offsetsLen
if n := len(r.offsets); n > 0 {
lastOffset = int(r.offsets[n-1])
}
}
r.data = rowData[cursor : cursor+lastOffset]
cursor += lastOffset

if r.hasChecksum() {
r.checksumHeader = rowData[cursor]
if r.checksumVersion() != 0 {
return errInvalidChecksumVer
}
cursor++
r.checksum1 = binary.LittleEndian.Uint32(rowData[cursor:])
cursor += 4
if r.hasExtraChecksum() {
r.checksum2 = binary.LittleEndian.Uint32(rowData[cursor:])
} else {
r.checksum2 = 0
}
} else {
r.checksumHeader = 0
r.checksum1 = 0
r.checksum2 = 0
}
r.data = rowData[cursor:]
return nil
}

func (r *row) toBytes(buf []byte) []byte {
buf = append(buf, CodecVer)
flag := byte(0)
if r.large {
flag = 1
}
buf = append(buf, flag)
buf = append(buf, r.flags)
buf = append(buf, byte(r.numNotNullCols), byte(r.numNotNullCols>>8))
buf = append(buf, byte(r.numNullCols), byte(r.numNullCols>>8))
if r.large {
if r.large() {
buf = append(buf, u32SliceToBytes(r.colIDs32)...)
buf = append(buf, u32SliceToBytes(r.offsets32)...)
} else {
buf = append(buf, r.colIDs...)
buf = append(buf, u16SliceToBytes(r.offsets)...)
}
buf = append(buf, r.data...)
if r.hasChecksum() {
buf = append(buf, r.checksumHeader)
buf = binary.LittleEndian.AppendUint32(buf, r.checksum1)
if r.hasExtraChecksum() {
buf = binary.LittleEndian.AppendUint32(buf, r.checksum2)
}
}
return buf
}

Expand All @@ -105,7 +205,7 @@ func (r *row) findColID(colID int64) (idx int, isNil, notFound bool) {
h := int(uint(i+j) >> 1) // avoid overflow when computing h
// i ≤ h < j
var v int64
if r.large {
if r.large() {
v = int64(r.colIDs32[h])
} else {
v = int64(r.colIDs[h])
Expand All @@ -126,7 +226,7 @@ func (r *row) findColID(colID int64) (idx int, isNil, notFound bool) {
h := int(uint(i+j) >> 1) // avoid overflow when computing h
// i ≤ h < j
var v int64
if r.large {
if r.large() {
v = int64(r.colIDs32[h])
} else {
v = int64(r.colIDs[h])
Expand All @@ -144,6 +244,23 @@ func (r *row) findColID(colID int64) (idx int, isNil, notFound bool) {
return
}

// GetChecksum returns the checksum of row data (not null columns).
func (r *row) GetChecksum() (uint32, bool) {
if !r.hasChecksum() {
return 0, false
}
return r.checksum1, true
}

// GetExtraChecksum returns the extra checksum which shall be calculated in the last stable schema version (whose
// elements are all public).
func (r *row) GetExtraChecksum() (uint32, bool) {
if !r.hasExtraChecksum() {
return 0, false
}
return r.checksum2, true
}

// ColumnIsNull returns if the column value is null. Mainly used for count column aggregation.
// this method will used in unistore.
func (r *row) ColumnIsNull(rowData []byte, colID int64, defaultVal []byte) (bool, error) {
Expand Down
Loading

0 comments on commit ad59092

Please sign in to comment.