diff --git a/af/conn.go b/af/conn.go index 77a253f..5907a04 100644 --- a/af/conn.go +++ b/af/conn.go @@ -313,3 +313,57 @@ func (conn *Connector) GetTableVGroupID(db, table string) (vgID int, err error) } return } + +// InfluxDBInsertLinesWithReqID Insert data using influxdb line format +func (conn *Connector) InfluxDBInsertLinesWithReqID(lines string, precision string, reqID int64, ttl int, tbNameKey string) error { + locker.Lock() + _, result := wrapper.TaosSchemalessInsertRawTTLWithReqIDTBNameKey(conn.taos, lines, wrapper.InfluxDBLineProtocol, precision, ttl, reqID, tbNameKey) + locker.Unlock() + defer func() { + locker.Lock() + wrapper.TaosFreeResult(result) + locker.Unlock() + }() + code := wrapper.TaosError(result) + if code != 0 { + errStr := wrapper.TaosErrorStr(result) + return errors.NewError(code, errStr) + } + return nil +} + +// OpenTSDBInsertTelnetLinesWithReqID Insert data using opentsdb telnet format +func (conn *Connector) OpenTSDBInsertTelnetLinesWithReqID(lines string, reqID int64, ttl int, tbNameKey string) error { + locker.Lock() + _, result := wrapper.TaosSchemalessInsertRawTTLWithReqIDTBNameKey(conn.taos, lines, wrapper.OpenTSDBTelnetLineProtocol, "", ttl, reqID, tbNameKey) + locker.Unlock() + defer func() { + locker.Lock() + wrapper.TaosFreeResult(result) + locker.Unlock() + }() + code := wrapper.TaosError(result) + if code != 0 { + errStr := wrapper.TaosErrorStr(result) + return errors.NewError(code, errStr) + } + return nil +} + +// OpenTSDBInsertJsonPayloadWithReqID Insert data using opentsdb json format +func (conn *Connector) OpenTSDBInsertJsonPayloadWithReqID(payload string, reqID int64, ttl int, tbNameKey string) error { + locker.Lock() + _, result := wrapper.TaosSchemalessInsertRawTTLWithReqIDTBNameKey(conn.taos, payload, wrapper.OpenTSDBJsonFormatProtocol, "", ttl, reqID, tbNameKey) + locker.Unlock() + defer func() { + locker.Lock() + wrapper.TaosFreeResult(result) + locker.Unlock() + }() + code := wrapper.TaosError(result) + if code != 0 { + errStr := wrapper.TaosErrorStr(result) + return errors.NewError(code, errStr) + } + return nil +} diff --git a/af/conn_test.go b/af/conn_test.go index 354771a..e3a9ccf 100644 --- a/af/conn_test.go +++ b/af/conn_test.go @@ -928,6 +928,57 @@ func TestConnector_QueryWithReqID(t *testing.T) { } } +func TestInfluxDBInsertLinesWithReqID(t *testing.T) { + db := testDatabase(t) + defer db.Close() + err := db.InfluxDBInsertLinesWithReqID(raw, "ns", 0x1234, 0, "") + if err != nil { + t.Error(err) + return + } + err = db.InfluxDBInsertLinesWithReqID("wrong", "ns", 0x1234, 0, "") + assert.Errorf(t, err, "expect error") +} + +func TestOpenTSDBInsertTelnetLinesWithReqID(t *testing.T) { + db := testDatabase(t) + defer db.Close() + err := db.OpenTSDBInsertTelnetLinesWithReqID( + "sys_if_bytes_out 1479496100 1.3E3 host=web01 interface=eth0\nsys_procs_running 1479496100 42 host=web01", + 0x2234, 0, "") + + if err != nil { + t.Error(err) + return + } + err = db.OpenTSDBInsertTelnetLinesWithReqID( + "wrong", + 0x2234, 0, "") + assert.Errorf(t, err, "expect error") +} + +func TestOpenTSDBInsertJsonPayloadWithReqID(t *testing.T) { + db := testDatabase(t) + defer db.Close() + err := db.OpenTSDBInsertJsonPayloadWithReqID(`{ + "metric": "sys", + "timestamp": + "value": 18, + "tags": { + "host": "web01", + "dc": "lga" + } +}`, 0x3234, 0, "") + if err == nil { + t.Error("expect error") + return + } + err = db.OpenTSDBInsertJsonPayloadWithReqID( + "wrong", + 0x3234, 0, "") + assert.Errorf(t, err, "expect error") +} + func TestNewConnector(t *testing.T) { tc, err := wrapper.TaosConnect("", "root", "taosdata", "", 0) assert.NoError(t, err) diff --git a/wrapper/notify.go b/wrapper/notify.go index 48480d9..f91d249 100644 --- a/wrapper/notify.go +++ b/wrapper/notify.go @@ -1,7 +1,5 @@ package wrapper -import "C" - /* #include #include diff --git a/wrapper/schemaless.go b/wrapper/schemaless.go index bcfb4be..2e44dee 100644 --- a/wrapper/schemaless.go +++ b/wrapper/schemaless.go @@ -184,6 +184,31 @@ func TaosSchemalessInsertRawTTLWithReqID(taosConnect unsafe.Pointer, lines strin return rows, result } +// TaosSchemalessInsertRawTTLWithReqIDTBNameKey TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid_tbname_key(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl, int64_t reqid, char *tbnameKey); +func TaosSchemalessInsertRawTTLWithReqIDTBNameKey(taosConnect unsafe.Pointer, lines string, protocol int, precision string, ttl int, reqID int64, tbNameKey string) (int32, unsafe.Pointer) { + cLine := C.CString(lines) + defer C.free(unsafe.Pointer(cLine)) + cTBNameKey := (*C.char)(nil) + if tbNameKey != "" { + cTBNameKey = C.CString(tbNameKey) + defer C.free(unsafe.Pointer(cTBNameKey)) + } + var rows int32 + pTotalRows := unsafe.Pointer(&rows) + result := C.taos_schemaless_insert_raw_ttl_with_reqid_tbname_key( + taosConnect, + cLine, + (C.int)(len(lines)), + (*C.int32_t)(pTotalRows), + (C.int)(protocol), + (C.int)(exchange(precision)), + (C.int32_t)(ttl), + (C.int64_t)(reqID), + cTBNameKey, + ) + return rows, result +} + func exchange(ts string) int { switch ts { case "": diff --git a/wrapper/schemaless_test.go b/wrapper/schemaless_test.go index db07466..6081ce2 100644 --- a/wrapper/schemaless_test.go +++ b/wrapper/schemaless_test.go @@ -674,3 +674,71 @@ func TestTaosSchemalessInsertRawTTLWithReqID(t *testing.T) { }) } } + +func TestTaosSchemalessInsertRawTTLWithReqIDTBNameKey(t *testing.T) { + conn := prepareEnv() + defer wrapper.TaosClose(conn) + //defer cleanEnv(conn) + cases := []struct { + name string + row string + rows int32 + precision string + ttl int + reqID int64 + tbNameKey string + }{ + { + name: "1", + row: "measurement,host=host1 field1=2i,field2=1.0 1577836800000000000", + rows: 1, + precision: "", + ttl: 1000, + reqID: 1, + tbNameKey: "host", + }, + { + name: "2", + row: "measurement,host=host1 field1=2i,field2=2.0 1577836900000000000", + rows: 1, + precision: "ns", + ttl: 1200, + reqID: 2, + tbNameKey: "host", + }, + { + name: "3", + row: "measurement,host=host1 field1=2i,field2=3.0 1577837200000\n" + + "measurement,host=host1 field1=2i,field2=4.0 1577837300000", + rows: 2, + precision: "ms", + ttl: 1400, + reqID: 3, + tbNameKey: "host", + }, + { + name: "no table name key", + row: "measurement,host=host1 field1=2i,field2=2.0 1577836900000000000", + rows: 1, + precision: "ns", + ttl: 1200, + reqID: 2, + tbNameKey: "", + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + rows, result := wrapper.TaosSchemalessInsertRawTTLWithReqIDTBNameKey(conn, c.row, wrapper.InfluxDBLineProtocol, c.precision, c.ttl, c.reqID, c.tbNameKey) + if rows != c.rows { + t.Fatal("rows miss") + } + code := wrapper.TaosError(result) + if code != 0 { + errStr := wrapper.TaosErrorStr(result) + t.Fatal(errors.NewError(code, errStr)) + } + wrapper.TaosFreeResult(result) + }) + } +}