Skip to content

Commit

Permalink
Merge pull request #544 from tigrisdata/main
Browse files Browse the repository at this point in the history
Alpha release
  • Loading branch information
efirs authored Sep 19, 2022
2 parents d5e4b91 + 84a5130 commit 6fd05ef
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 110 deletions.
2 changes: 1 addition & 1 deletion api/proto
Submodule proto updated from 4fc67b to 987e5a
20 changes: 4 additions & 16 deletions api/server/v1/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,27 +619,15 @@ func (x *PublishRequest) UnmarshalJSON(data []byte) error {
if err := jsoniter.Unmarshal(value, &x.Options); err != nil {
return err
}

var options map[string]jsoniter.RawMessage
if err := jsoniter.Unmarshal(value, &options); err != nil {
return err
}

part := false
for oKey := range options {
if oKey == "partition" {
part = true
}
}
if !part {
// use -1 to indicate that no partition option was set
x.Options.Partition = -1
}
}
}
return nil
}

func (x *PublishResponse) MarshalJSON() ([]byte, error) {
return json.Marshal(&dmlResponse{Metadata: CreateMDFromResponseMD(x.Metadata), Status: x.Status})
}

func (x *SubscribeRequest) UnmarshalJSON(data []byte) error {
var mp map[string]jsoniter.RawMessage
if err := jsoniter.Unmarshal(data, &mp); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.12.0
github.com/stretchr/testify v1.8.0
github.com/tigrisdata/tigris-client-go v1.0.0-alpha.33
github.com/tigrisdata/tigris-client-go v1.0.0-alpha.35
github.com/typesense/typesense-go v0.6.0
github.com/uber-go/tally v3.5.0+incompatible
github.com/ugorji/go/codec v1.2.7
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -877,8 +877,8 @@ github.com/tidwall/pretty v1.0.2/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/rtred v0.1.2/go.mod h1:hd69WNXQ5RP9vHd7dqekAz+RIdtfBogmglkZSRxCHFQ=
github.com/tidwall/tinyqueue v0.1.1/go.mod h1:O/QNHwrnjqr6IHItYrzoHAKYhBkLI67Q096fQP5zMYw=
github.com/tigrisdata/tigris-client-go v1.0.0-alpha.33 h1:27YuWF1TXYIZA7oG3wBAw0aKCM0Lvo/L1nk02jzDezo=
github.com/tigrisdata/tigris-client-go v1.0.0-alpha.33/go.mod h1:qAR1FQJXnEoeJPoG89oiR7Heu6b33JKvJTNLbwWo0Dc=
github.com/tigrisdata/tigris-client-go v1.0.0-alpha.35 h1:RO9s87yeWi145f9yRsZlewNiMxt4U639D87OhpKGTm4=
github.com/tigrisdata/tigris-client-go v1.0.0-alpha.35/go.mod h1:qAR1FQJXnEoeJPoG89oiR7Heu6b33JKvJTNLbwWo0Dc=
github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tinylib/msgp v1.1.6 h1:i+SbKraHhnrf9M5MYmvQhFnbLhAXSDWF8WWsuyRdocw=
github.com/tinylib/msgp v1.1.6/go.mod h1:75BAfg2hauQhs3qedfdDZmWAPcFMAvJE5b9rGOMufyw=
Expand Down
4 changes: 2 additions & 2 deletions server/services/v1/query_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1132,8 +1132,8 @@ func (runner *PublishQueryRunner) Run(ctx context.Context, tx transaction.Tx, te
}

var part int
if runner.req.Options != nil && runner.req.Options.Partition != -1 {
part = int(runner.req.Options.Partition)
if runner.req.Options != nil && runner.req.Options.Partition != nil {
part = int(*runner.req.Options.Partition)
if part < 0 || part >= partitions {
return nil, ctx, errors.InvalidArgument("Invalid partition number `%d`", part)
}
Expand Down
199 changes: 111 additions & 88 deletions test/v1/client/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -73,46 +73,6 @@ func testRead(t *testing.T, db driver.Database, filter driver.Filter, expected [
assert.NoError(t, it.Err())
}

func testTxEventsStream(ctx context.Context, t *testing.T, rit *driver.EventIterator, db1 driver.Database, coll string, doc1, doc2, doc3 driver.Document) {
var ev driver.Event
var err error

*rit, err = db1.Events(ctx, coll)
require.NoError(t, err)
it := *rit
defer require.NoError(t, it.Err())

require.True(t, it.Next(&ev))
assert.Equal(t, "insert", ev.Op)
assert.JSONEq(t, string(doc1), string(ev.Data))
assert.Equal(t, coll, ev.Collection)

require.True(t, it.Next(&ev))
assert.Equal(t, "insert", ev.Op)
assert.JSONEq(t, string(doc2), string(ev.Data))
assert.Equal(t, coll, ev.Collection)

require.True(t, it.Next(&ev))
assert.Equal(t, "delete", ev.Op)
assert.Contains(t, string(ev.Key), "value2")
assert.Equal(t, coll, ev.Collection)

require.True(t, it.Next(&ev))
assert.Equal(t, "insert", ev.Op)
assert.JSONEq(t, string(doc2), string(ev.Data))
assert.Equal(t, coll, ev.Collection)

require.True(t, it.Next(&ev))
assert.Equal(t, "insert", ev.Op)
assert.JSONEq(t, string(doc3), string(ev.Data))
assert.Equal(t, coll, ev.Collection)

require.True(t, it.Next(&ev))
assert.Equal(t, "update", ev.Op)
assert.JSONEq(t, strings.Replace(string(doc2), "222", "555", -1), string(ev.Data))
assert.Equal(t, coll, ev.Collection)
}

func testTxReadWrite(t *testing.T, c driver.Driver) {
ctx := context.TODO()

Expand Down Expand Up @@ -147,14 +107,6 @@ func testTxReadWrite(t *testing.T, c driver.Driver) {
doc2 := driver.Document(`{"str_field": "value2", "int_field": 222, "bool_field": false}`)
doc3 := driver.Document(`{"str_field": "value3", "int_field": 333, "bool_field": false}`)

var it driver.EventIterator

wg.Add(1)
go func() {
defer wg.Done()
testTxEventsStream(ctx, t, &it, db1, "c1", doc1, doc2, doc3)
}()

resp, err := db1.Insert(ctx, "c1", []driver.Document{
doc1,
doc2,
Expand Down Expand Up @@ -212,18 +164,6 @@ func testTxReadWrite(t *testing.T, c driver.Driver) {
require.NoError(t, db1.DropCollection(ctx, "c1"))
require.NoError(t, db1.CreateOrUpdateCollection(ctx, "c1", driver.Schema(schema)))

var ev driver.Event

require.True(t, it.Next(&ev))
assert.Equal(t, "delete", ev.Op)
assert.Contains(t, string(ev.Key), "value3")
assert.Equal(t, "c1", ev.Collection)

require.True(t, it.Next(&ev))
assert.Equal(t, "insert", ev.Op)
assert.JSONEq(t, string(doc3), string(ev.Data))
assert.Equal(t, "c1", ev.Collection)

require.NoError(t, db1.DropCollection(ctx, "c1"))
}

Expand Down Expand Up @@ -463,54 +403,137 @@ func testTxClient(t *testing.T, c driver.Driver) {
require.NoError(t, err)
}

func TestDriverGRPCC(t *testing.T) {
func initDriver(t *testing.T, proto string) driver.Driver {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

h, p := getTestServerHostPort()
driver.DefaultProtocol = driver.GRPC
c, err := driver.NewDriver(context.Background(), &clientConfig.Driver{
URL: fmt.Sprintf("%s:%d", h, p),
driver.DefaultProtocol = proto
url := fmt.Sprintf("%s:%d", h, p)
if proto == driver.HTTP {
url = "http://" + url
}
c, err := driver.NewDriver(ctx, &clientConfig.Driver{
URL: url,
})
require.NoError(t, err)
defer func() { _ = c.Close() }()

return c
}

func TestDriverGRPCC(t *testing.T) {
c := initDriver(t, driver.GRPC)
defer func() { err := c.Close(); require.NoError(t, err) }()

testDriver(t, c)
testDriverBinary(t, c)
}

func TestDriverHTTP(t *testing.T) {
h, p := getTestServerHostPort()
driver.DefaultProtocol = driver.HTTP
c, err := driver.NewDriver(context.Background(), &clientConfig.Driver{
URL: fmt.Sprintf("http://%s:%d", h, p),
})
require.NoError(t, err)
defer func() { _ = c.Close() }()
c := initDriver(t, driver.HTTP)
defer func() { err := c.Close(); require.NoError(t, err) }()

testDriver(t, c)
testDriverBinary(t, c)
}

func TestDriverTxGRPC(t *testing.T) {
h, p := getTestServerHostPort()
driver.DefaultProtocol = driver.GRPC
c, err := driver.NewDriver(context.Background(), &clientConfig.Driver{
URL: fmt.Sprintf("%s:%d", h, p),
})
require.NoError(t, err)
defer func() { _ = c.Close() }()
c := initDriver(t, driver.GRPC)
defer func() { err := c.Close(); require.NoError(t, err) }()

testTxClient(t, c)
testTxReadWrite(t, c)
}

func TestDriverTxHTTPDriver(t *testing.T) {
h, p := getTestServerHostPort()
driver.DefaultProtocol = driver.HTTP
c, err := driver.NewDriver(context.Background(), &clientConfig.Driver{
URL: fmt.Sprintf("http://%s:%d", h, p),
})
require.NoError(t, err)
defer func() { _ = c.Close() }()
func TestDriverTxHTTP(t *testing.T) {
c := initDriver(t, driver.HTTP)
defer func() { err := c.Close(); require.NoError(t, err) }()

testTxClient(t, c)
testTxReadWrite(t, c)
}

func testPubSubStream(ctx context.Context, t *testing.T, rit *driver.Iterator, db1 driver.Database, coll string, doc1, doc2, doc3 driver.Document) {
var ev driver.Document

it := *rit
defer require.NoError(t, it.Err())

assert.True(t, it.Next(&ev))
require.NoError(t, it.Err())
assert.JSONEq(t, string(doc1), string(ev))
require.True(t, it.Next(&ev))
assert.JSONEq(t, string(doc2), string(ev))
require.True(t, it.Next(&ev))
assert.JSONEq(t, string(doc3), string(ev))
}

func testPubSub(t *testing.T, c driver.Driver) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

dbName := "db_client_test"
_ = c.DropDatabase(ctx, dbName)

schema := `{
"title": "c1",
"properties": {
"str_field": {
"type": "string"
},
"int_field": {
"type": "integer"
},
"bool_field": {
"type": "boolean"
}
},
"collection_type": "messages"
}`

err := c.CreateDatabase(ctx, dbName)
require.NoError(t, err)
db1 := c.UseDatabase(dbName)
err = db1.CreateOrUpdateCollection(ctx, "c1", driver.Schema(schema))
require.NoError(t, err)

var wg sync.WaitGroup

doc1 := driver.Document(`{"str_field": "value1", "int_field": 111, "bool_field": true}`)
doc2 := driver.Document(`{"str_field": "value2", "int_field": 222, "bool_field": false}`)
doc3 := driver.Document(`{"str_field": "value3", "int_field": 333, "bool_field": false}`)

wg.Add(1)
go func() {
defer wg.Done()
it, err := db1.Subscribe(ctx, "c1", driver.Filter("{}"))
require.NoError(t, err)

testPubSubStream(ctx, t, &it, db1, "c1", doc1, doc2, doc3)
}()

time.Sleep(100 * time.Millisecond)

pr, err := db1.Publish(ctx, "c1", []driver.Message{
driver.Message(doc1), driver.Message(doc2), driver.Message(doc3)})
require.NoError(t, err)
require.Equal(t, "published", pr.Status)

wg.Wait()

require.NoError(t, db1.DropCollection(ctx, "c1"))
}

func TestDriverGRPCPubSub(t *testing.T) {
c := initDriver(t, driver.GRPC)
defer func() { _ = c.Close() }()

testPubSub(t, c)
}

func TestDriverHTTPPubSub(t *testing.T) {
c := initDriver(t, driver.HTTP)
defer func() { _ = c.Close() }()

testPubSub(t, c)
}

0 comments on commit 6fd05ef

Please sign in to comment.