Skip to content

Commit

Permalink
Merge pull request #305 from taosdata/fix/xftan/stmt2bindmarshal-3.0
Browse files Browse the repository at this point in the history
fix: remove marshal stmt2 binary unnecessary serialization
  • Loading branch information
huskar-t authored Nov 11, 2024
2 parents afb5aa3 + d8de000 commit 11d5adc
Show file tree
Hide file tree
Showing 30 changed files with 616 additions and 257 deletions.
4 changes: 4 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ignore:
- "bench"
- "benchmark"
- "examples"
6 changes: 3 additions & 3 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ jobs:
cd TDengine
mkdir debug
cd debug
cmake .. -DBUILD_TEST=off -DBUILD_HTTP=false -DVERNUMBER=3.9.9.9
cmake .. -DBUILD_TEST=off -DBUILD_HTTP=false -DBUILD_DEPENDENCY_TESTS=0 -DVERNUMBER=3.9.9.9
make -j 4
- name: package
Expand Down Expand Up @@ -173,8 +173,8 @@ jobs:
run: sudo go test -v --count=1 -coverprofile=coverage.txt -covermode=atomic ./...

- name: Upload coverage to Codecov
if: ${{ matrix.go }} == 'stable'
uses: codecov/codecov-action@v4-beta
if: ${{ matrix.go == 'stable'}}
uses: codecov/codecov-action@v4
with:
files: ./coverage.txt
env:
Expand Down
66 changes: 66 additions & 0 deletions af/insertstmt/stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,72 @@ func TestStmt(t *testing.T) {
assert.Equal(t, int(1), affected)

}
func TestStmtWithWithReqID(t *testing.T) {
conn, err := wrapper.TaosConnect("", "root", "taosdata", "", 0)
assert.NoError(t, err)
defer wrapper.TaosClose(conn)
s := NewInsertStmt(conn)
defer s.Close()
err = s.Prepare("insert into ? values(?,?,?)")
assert.NoError(t, err)
}

func TestPrepareError(t *testing.T) {
conn, err := wrapper.TaosConnect("", "root", "taosdata", "", 0)
assert.NoError(t, err)
defer wrapper.TaosClose(conn)
s := NewInsertStmt(conn)
stmtHandle := s.stmt
defer wrapper.TaosStmtClose(stmtHandle)
s.stmt = nil
err = s.Prepare("insert into ? values(?,?,?)")
assert.Error(t, err)
s.stmt = stmtHandle
err = s.Prepare("select * from information_schema.ins_databases where name = ?")
assert.Error(t, err)
}

func TestSetTableNameError(t *testing.T) {
conn, err := wrapper.TaosConnect("", "root", "taosdata", "", 0)
assert.NoError(t, err)
defer wrapper.TaosClose(conn)
s := NewInsertStmt(conn)
stmtHandle := s.stmt
defer wrapper.TaosStmtClose(stmtHandle)
s.stmt = nil
err = s.SetTableName("test")
assert.Error(t, err)

err = s.SetSubTableName("test")
assert.Error(t, err)

err = s.SetTableNameWithTags("test", param.NewParam(1).AddBinary([]byte("test")))
assert.Error(t, err)
}

func TestAddBatchError(t *testing.T) {
conn, err := wrapper.TaosConnect("", "root", "taosdata", "", 0)
assert.NoError(t, err)
defer wrapper.TaosClose(conn)
s := NewInsertStmt(conn)
stmtHandle := s.stmt
defer wrapper.TaosStmtClose(stmtHandle)
s.stmt = nil
err = s.AddBatch()
assert.Error(t, err)
}

func TestExecuteError(t *testing.T) {
conn, err := wrapper.TaosConnect("", "root", "taosdata", "", 0)
assert.NoError(t, err)
defer wrapper.TaosClose(conn)
s := NewInsertStmt(conn)
stmtHandle := s.stmt
defer wrapper.TaosStmtClose(stmtHandle)
s.stmt = nil
err = s.Execute()
assert.Error(t, err)
}

func exec(conn unsafe.Pointer, sql string) error {
res := wrapper.TaosQuery(conn, sql)
Expand Down
32 changes: 12 additions & 20 deletions af/tmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) err
for _, topic := range topics {
errCode := wrapper.TMQListAppend(topicList, topic)
if errCode != 0 {
return c.tmqError(errCode)
return tmqError(errCode)
}
}
errCode := wrapper.TMQSubscribe(c.cConsumer, topicList)
if errCode != 0 {
return c.tmqError(errCode)
return tmqError(errCode)
}
return nil
}
Expand All @@ -79,7 +79,7 @@ func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) err
func (c *Consumer) Unsubscribe() error {
errCode := wrapper.TMQUnsubscribe(c.cConsumer)
if errCode != taosError.SUCCESS {
return c.tmqError(errCode)
return tmqError(errCode)
}
return nil
}
Expand Down Expand Up @@ -199,7 +199,7 @@ func (c *Consumer) getData(message unsafe.Pointer) ([]*tmq.Data, error) {
func (c *Consumer) Commit() ([]tmq.TopicPartition, error) {
errCode := wrapper.TMQCommitSync(c.cConsumer, nil)
if errCode != taosError.SUCCESS {
return nil, c.tmqError(errCode)
return nil, tmqError(errCode)
}
partitions, err := c.Assignment()
if err != nil {
Expand All @@ -208,26 +208,18 @@ func (c *Consumer) Commit() ([]tmq.TopicPartition, error) {
return c.Committed(partitions, 0)
}

func (c *Consumer) doCommit(message unsafe.Pointer) ([]tmq.TopicPartition, error) {
errCode := wrapper.TMQCommitSync(c.cConsumer, message)
if errCode != taosError.SUCCESS {
return nil, c.tmqError(errCode)
}
return nil, nil
}

func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error) {
errCode, list := wrapper.TMQSubscription(c.cConsumer)
if errCode != taosError.SUCCESS {
return nil, c.tmqError(errCode)
return nil, tmqError(errCode)
}
defer wrapper.TMQListDestroy(list)
size := wrapper.TMQListGetSize(list)
topics := wrapper.TMQListToCArray(list, int(size))
for _, topic := range topics {
errCode, assignment := wrapper.TMQGetTopicAssignment(c.cConsumer, topic)
if errCode != taosError.SUCCESS {
return nil, c.tmqError(errCode)
return nil, tmqError(errCode)
}
for i := 0; i < len(assignment); i++ {
topicName := topic
Expand All @@ -244,7 +236,7 @@ func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error) {
func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error {
errCode := wrapper.TMQOffsetSeek(c.cConsumer, *partition.Topic, partition.Partition, int64(partition.Offset))
if errCode != taosError.SUCCESS {
return c.tmqError(errCode)
return tmqError(errCode)
}
return nil
}
Expand All @@ -255,7 +247,7 @@ func (c *Consumer) Committed(partitions []tmq.TopicPartition, timeoutMs int) (of
cOffset := wrapper.TMQCommitted(c.cConsumer, *partitions[i].Topic, partitions[i].Partition)
offset := tmq.Offset(cOffset)
if !offset.Valid() {
return nil, c.tmqError(int32(offset))
return nil, tmqError(int32(offset))
}
offsets[i] = tmq.TopicPartition{
Topic: partitions[i].Topic,
Expand All @@ -270,7 +262,7 @@ func (c *Consumer) CommitOffsets(offsets []tmq.TopicPartition) ([]tmq.TopicParti
for i := 0; i < len(offsets); i++ {
errCode := wrapper.TMQCommitOffsetSync(c.cConsumer, *offsets[i].Topic, offsets[i].Partition, int64(offsets[i].Offset))
if errCode != taosError.SUCCESS {
return nil, c.tmqError(errCode)
return nil, tmqError(errCode)
}
}
return c.Committed(offsets, 0)
Expand All @@ -281,7 +273,7 @@ func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.Topi
for i := 0; i < len(partitions); i++ {
position := wrapper.TMQPosition(c.cConsumer, *partitions[i].Topic, partitions[i].Partition)
if position < 0 {
return nil, c.tmqError(int32(position))
return nil, tmqError(int32(position))
}
offsets[i] = tmq.TopicPartition{
Topic: partitions[i].Topic,
Expand All @@ -296,12 +288,12 @@ func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.Topi
func (c *Consumer) Close() error {
errCode := wrapper.TMQConsumerClose(c.cConsumer)
if errCode != 0 {
return c.tmqError(errCode)
return tmqError(errCode)
}
return nil
}

func (c *Consumer) tmqError(errCode int32) error {
func tmqError(errCode int32) error {
errStr := wrapper.TMQErr2Str(errCode)
return taosError.NewError(int(errCode), errStr)
}
6 changes: 6 additions & 0 deletions af/tmq/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,3 +493,9 @@ func TestMeta(t *testing.T) {
}
}
}

func Test_tmqError(t *testing.T) {
err := tmqError(-1)
expectError := &errors.TaosError{Code: 65535, ErrStr: "fail"}
assert.Equal(t, expectError, err)
}
2 changes: 1 addition & 1 deletion common/parser/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func TestParseBlock(t *testing.T) {
version := RawBlockGetVersion(block)
t.Log(version)
length := RawBlockGetLength(block)
assert.Equal(t, int32(447), length)
assert.Equal(t, int32(448), length)
rows := RawBlockGetNumOfRows(block)
assert.Equal(t, int32(2), rows)
columns := RawBlockGetNumOfCols(block)
Expand Down
39 changes: 39 additions & 0 deletions common/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"database/sql/driver"
"reflect"
"testing"
"time"
)
Expand Down Expand Up @@ -95,3 +96,41 @@ func TestInterpolateParams(t *testing.T) {
})
}
}

func TestValueArgsToNamedValueArgs(t *testing.T) {
tests := []struct {
name string
args []driver.Value
want []driver.NamedValue
}{
{
name: "empty args",
args: []driver.Value{},
want: []driver.NamedValue{},
},
{
name: "single arg",
args: []driver.Value{int64(1)},
want: []driver.NamedValue{
{Ordinal: 1, Value: int64(1)},
},
},
{
name: "multiple args",
args: []driver.Value{int64(1), "test", nil},
want: []driver.NamedValue{
{Ordinal: 1, Value: int64(1)},
{Ordinal: 2, Value: "test"},
{Ordinal: 3, Value: nil},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := ValueArgsToNamedValueArgs(tt.args); !reflect.DeepEqual(got, tt.want) {
t.Errorf("ValueArgsToNamedValueArgs() = %v, want %v", got, tt.want)
}
})
}
}
3 changes: 0 additions & 3 deletions common/stmt/stmt2.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,6 @@ func MarshalStmt2Binary(bindData []*TaosStmt2BindData, isInsert bool, colType, t
needCols = true
binary.LittleEndian.PutUint32(header[ColCountPosition:], uint32(colCount))
}
if needTableNames {
binary.LittleEndian.PutUint32(header[TableNamesOffsetPosition:], uint32(colCount))
}
if !needTableNames && !needTags && !needCols {
return nil, fmt.Errorf("no data")
}
Expand Down
30 changes: 0 additions & 30 deletions taosRestful/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ func (tc *taosConn) Prepare(query string) (driver.Stmt, error) {
return nil, &taosErrors.TaosError{Code: 0xffff, ErrStr: "restful does not support stmt"}
}

func (tc *taosConn) Exec(query string, args []driver.Value) (driver.Result, error) {
return tc.ExecContext(context.Background(), query, common.ValueArgsToNamedValueArgs(args))
}

func (tc *taosConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (result driver.Result, err error) {
return tc.execCtx(ctx, query, args)
}
Expand All @@ -127,32 +123,6 @@ func (tc *taosConn) execCtx(ctx context.Context, query string, args []driver.Nam
return driver.RowsAffected(result.Data[0][0].(int32)), nil
}

func (tc *taosConn) Query(query string, args []driver.Value) (driver.Rows, error) {
if len(args) != 0 {
if !tc.cfg.interpolateParams {
return nil, driver.ErrSkip
}
// try client-side prepare to reduce round trip
prepared, err := common.InterpolateParams(query, common.ValueArgsToNamedValueArgs(args))
if err != nil {
return nil, err
}
query = prepared
}
result, err := tc.taosQuery(context.TODO(), query, tc.readBufferSize)
if err != nil {
return nil, err
}
if result == nil {
return nil, errors.New("wrong result")
}
// Read Result
rs := &rows{
result: result,
}
return rs, err
}

func (tc *taosConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (rows driver.Rows, err error) {
return tc.queryCtx(ctx, query, args)
}
Expand Down
11 changes: 11 additions & 0 deletions taosRestful/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,3 +530,14 @@ func TestSSL(t *testing.T) {
}
assert.Equal(t, types.RawMessage(`{"a":"b"}`), tt)
}

func TestConnect(t *testing.T) {
conn := connector{
cfg: &config{},
}
db, err := conn.Connect(context.Background())
assert.NoError(t, err)
db.Close()
driver := conn.Driver()
assert.Equal(t, &TDengineDriver{}, driver)
}
Loading

0 comments on commit 11d5adc

Please sign in to comment.