Skip to content

Commit

Permalink
feat: support interface taos_schemaless_insert_raw_ttl_with_reqid_tbn…
Browse files Browse the repository at this point in the history
…ame_key
  • Loading branch information
huskar-t committed Sep 5, 2024
1 parent fb5b587 commit 5d27ebc
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 2 deletions.
54 changes: 54 additions & 0 deletions af/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,57 @@ func (conn *Connector) GetTableVGroupID(db, table string) (vgID int, err error)
}
return
}

// InfluxDBInsertLinesWithReqID Insert data using influxdb line format
func (conn *Connector) InfluxDBInsertLinesWithReqID(lines string, precision string, reqID int64, ttl int, tbNameKey string) error {
locker.Lock()
_, result := wrapper.TaosSchemalessInsertRawTTLWithReqIDTBNameKey(conn.taos, lines, wrapper.InfluxDBLineProtocol, precision, ttl, reqID, tbNameKey)
locker.Unlock()
defer func() {
locker.Lock()
wrapper.TaosFreeResult(result)
locker.Unlock()
}()
code := wrapper.TaosError(result)
if code != 0 {
errStr := wrapper.TaosErrorStr(result)
return errors.NewError(code, errStr)
}
return nil
}

// OpenTSDBInsertTelnetLinesWithReqID Insert data using opentsdb telnet format
func (conn *Connector) OpenTSDBInsertTelnetLinesWithReqID(lines string, reqID int64, ttl int, tbNameKey string) error {
locker.Lock()
_, result := wrapper.TaosSchemalessInsertRawTTLWithReqIDTBNameKey(conn.taos, lines, wrapper.OpenTSDBTelnetLineProtocol, "", ttl, reqID, tbNameKey)
locker.Unlock()
defer func() {
locker.Lock()
wrapper.TaosFreeResult(result)
locker.Unlock()
}()
code := wrapper.TaosError(result)
if code != 0 {
errStr := wrapper.TaosErrorStr(result)
return errors.NewError(code, errStr)
}
return nil
}

// OpenTSDBInsertJsonPayloadWithReqID Insert data using opentsdb json format
func (conn *Connector) OpenTSDBInsertJsonPayloadWithReqID(payload string, reqID int64, ttl int, tbNameKey string) error {
locker.Lock()
_, result := wrapper.TaosSchemalessInsertRawTTLWithReqIDTBNameKey(conn.taos, payload, wrapper.OpenTSDBJsonFormatProtocol, "", ttl, reqID, tbNameKey)
locker.Unlock()
defer func() {
locker.Lock()
wrapper.TaosFreeResult(result)
locker.Unlock()
}()
code := wrapper.TaosError(result)
if code != 0 {
errStr := wrapper.TaosErrorStr(result)
return errors.NewError(code, errStr)
}
return nil
}
51 changes: 51 additions & 0 deletions af/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,57 @@ func TestConnector_QueryWithReqID(t *testing.T) {
}
}

func TestInfluxDBInsertLinesWithReqID(t *testing.T) {
db := testDatabase(t)
defer db.Close()
err := db.InfluxDBInsertLinesWithReqID(raw, "ns", 0x1234, 0, "")
if err != nil {
t.Error(err)
return
}
err = db.InfluxDBInsertLinesWithReqID("wrong", "ns", 0x1234, 0, "")
assert.Errorf(t, err, "expect error")
}

func TestOpenTSDBInsertTelnetLinesWithReqID(t *testing.T) {
db := testDatabase(t)
defer db.Close()
err := db.OpenTSDBInsertTelnetLinesWithReqID(
"sys_if_bytes_out 1479496100 1.3E3 host=web01 interface=eth0\nsys_procs_running 1479496100 42 host=web01",
0x2234, 0, "")

if err != nil {
t.Error(err)
return
}
err = db.OpenTSDBInsertTelnetLinesWithReqID(
"wrong",
0x2234, 0, "")
assert.Errorf(t, err, "expect error")
}

func TestOpenTSDBInsertJsonPayloadWithReqID(t *testing.T) {
db := testDatabase(t)
defer db.Close()
err := db.OpenTSDBInsertJsonPayloadWithReqID(`{
"metric": "sys",
"timestamp":
"value": 18,
"tags": {
"host": "web01",
"dc": "lga"
}
}`, 0x3234, 0, "")
if err == nil {
t.Error("expect error")
return
}
err = db.OpenTSDBInsertJsonPayloadWithReqID(
"wrong",
0x3234, 0, "")
assert.Errorf(t, err, "expect error")
}

func TestNewConnector(t *testing.T) {
tc, err := wrapper.TaosConnect("", "root", "taosdata", "", 0)
assert.NoError(t, err)
Expand Down
2 changes: 0 additions & 2 deletions wrapper/notify.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package wrapper

import "C"

/*
#include <stdio.h>
#include <stdlib.h>
Expand Down
25 changes: 25 additions & 0 deletions wrapper/schemaless.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,31 @@ func TaosSchemalessInsertRawTTLWithReqID(taosConnect unsafe.Pointer, lines strin
return rows, result
}

// TaosSchemalessInsertRawTTLWithReqIDTBNameKey TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid_tbname_key(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl, int64_t reqid, char *tbnameKey);
func TaosSchemalessInsertRawTTLWithReqIDTBNameKey(taosConnect unsafe.Pointer, lines string, protocol int, precision string, ttl int, reqID int64, tbNameKey string) (int32, unsafe.Pointer) {
cLine := C.CString(lines)
defer C.free(unsafe.Pointer(cLine))
cTBNameKey := (*C.char)(nil)
if tbNameKey != "" {
cTBNameKey = C.CString(tbNameKey)
defer C.free(unsafe.Pointer(cTBNameKey))
}
var rows int32
pTotalRows := unsafe.Pointer(&rows)
result := C.taos_schemaless_insert_raw_ttl_with_reqid_tbname_key(
taosConnect,
cLine,
(C.int)(len(lines)),
(*C.int32_t)(pTotalRows),
(C.int)(protocol),
(C.int)(exchange(precision)),
(C.int32_t)(ttl),
(C.int64_t)(reqID),
cTBNameKey,
)
return rows, result
}

func exchange(ts string) int {
switch ts {
case "":
Expand Down
68 changes: 68 additions & 0 deletions wrapper/schemaless_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,3 +674,71 @@ func TestTaosSchemalessInsertRawTTLWithReqID(t *testing.T) {
})
}
}

func TestTaosSchemalessInsertRawTTLWithReqIDTBNameKey(t *testing.T) {
conn := prepareEnv()
defer wrapper.TaosClose(conn)
//defer cleanEnv(conn)
cases := []struct {
name string
row string
rows int32
precision string
ttl int
reqID int64
tbNameKey string
}{
{
name: "1",
row: "measurement,host=host1 field1=2i,field2=1.0 1577836800000000000",
rows: 1,
precision: "",
ttl: 1000,
reqID: 1,
tbNameKey: "host",
},
{
name: "2",
row: "measurement,host=host1 field1=2i,field2=2.0 1577836900000000000",
rows: 1,
precision: "ns",
ttl: 1200,
reqID: 2,
tbNameKey: "host",
},
{
name: "3",
row: "measurement,host=host1 field1=2i,field2=3.0 1577837200000\n" +
"measurement,host=host1 field1=2i,field2=4.0 1577837300000",
rows: 2,
precision: "ms",
ttl: 1400,
reqID: 3,
tbNameKey: "host",
},
{
name: "no table name key",
row: "measurement,host=host1 field1=2i,field2=2.0 1577836900000000000",
rows: 1,
precision: "ns",
ttl: 1200,
reqID: 2,
tbNameKey: "",
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
rows, result := wrapper.TaosSchemalessInsertRawTTLWithReqIDTBNameKey(conn, c.row, wrapper.InfluxDBLineProtocol, c.precision, c.ttl, c.reqID, c.tbNameKey)
if rows != c.rows {
t.Fatal("rows miss")
}
code := wrapper.TaosError(result)
if code != 0 {
errStr := wrapper.TaosErrorStr(result)
t.Fatal(errors.NewError(code, errStr))
}
wrapper.TaosFreeResult(result)
})
}
}

0 comments on commit 5d27ebc

Please sign in to comment.