From 02d818228e3c1cecd29e8655a7aa2f5f429aa3e9 Mon Sep 17 00:00:00 2001 From: Yevgeniy Firsov Date: Thu, 28 Jul 2022 21:52:09 -0700 Subject: [PATCH 1/8] fix: Delete events were not emitted Add events integration test --- go.mod | 6 +-- go.sum | 4 +- server/services/v1/api.go | 36 ++++++------- test/v1/client/driver_test.go | 96 +++++++++++++++++++++++++++++++++-- 4 files changed, 115 insertions(+), 27 deletions(-) diff --git a/go.mod b/go.mod index 22f8be270..3f7fe0f83 100644 --- a/go.mod +++ b/go.mod @@ -28,11 +28,13 @@ require ( github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.12.0 github.com/stretchr/testify v1.7.2 - github.com/tigrisdata/tigris-client-go v1.0.0-alpha.20 + github.com/tigrisdata/tigris-client-go v1.0.0-alpha.21 github.com/typesense/typesense-go v0.5.0 github.com/uber-go/tally v3.5.0+incompatible github.com/ugorji/go/codec v1.2.7 github.com/valyala/bytebufferpool v1.0.0 + go.uber.org/atomic v1.9.0 + golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 google.golang.org/genproto v0.0.0-20220725144611-272f38e5d71b google.golang.org/grpc v1.48.0 google.golang.org/protobuf v1.28.0 @@ -107,13 +109,11 @@ require ( github.com/yudai/gojsondiff v1.0.0 // indirect github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect github.com/yudai/pp v2.0.1+incompatible // indirect - go.uber.org/atomic v1.9.0 // indirect golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect golang.org/x/net v0.0.0-20220726230323-06994584191e // indirect golang.org/x/oauth2 v0.0.0-20220722155238-128564f6959c // indirect golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect golang.org/x/text v0.3.7 // indirect - golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/grpc/examples v0.0.0-20220215234149-ec717cad7395 // indirect diff --git a/go.sum b/go.sum index 9e0920f2d..6b3c14b79 100644 --- a/go.sum +++ b/go.sum @@ -863,8 +863,8 @@ github.com/tidwall/pretty v1.0.2/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/rtred v0.1.2/go.mod h1:hd69WNXQ5RP9vHd7dqekAz+RIdtfBogmglkZSRxCHFQ= github.com/tidwall/tinyqueue v0.1.1/go.mod h1:O/QNHwrnjqr6IHItYrzoHAKYhBkLI67Q096fQP5zMYw= -github.com/tigrisdata/tigris-client-go v1.0.0-alpha.20 h1:J4BazYr7EZjhlfZZEHRTQ7OYbfRi4ypwySLxxkXmZTc= -github.com/tigrisdata/tigris-client-go v1.0.0-alpha.20/go.mod h1:/L4tilHC26feio8KDuitajcGXabCLAElYtUQRkewkyM= +github.com/tigrisdata/tigris-client-go v1.0.0-alpha.21 h1:JwtFfVRDHI6qjAFEv5Uk0uiaARSuiN3nvBMjxOg9fY4= +github.com/tigrisdata/tigris-client-go v1.0.0-alpha.21/go.mod h1:/L4tilHC26feio8KDuitajcGXabCLAElYtUQRkewkyM= github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tinylib/msgp v1.1.6 h1:i+SbKraHhnrf9M5MYmvQhFnbLhAXSDWF8WWsuyRdocw= github.com/tinylib/msgp v1.1.6/go.mod h1:75BAfg2hauQhs3qedfdDZmWAPcFMAvJE5b9rGOMufyw= diff --git a/server/services/v1/api.go b/server/services/v1/api.go index fa544cfa2..ef5c05281 100644 --- a/server/services/v1/api.go +++ b/server/services/v1/api.go @@ -440,35 +440,37 @@ func (s *apiService) Events(r *api.EventsRequest, stream api.Tigris_EventsServer } if dbId != reqDatabaseId || cId != reqCollectionId { - // this probably means database/collection is dropped and this entry is some old entry in the log, ignore it + // the event is no for the collection we are listening to continue } + var data []byte if op.Op != kv.DeleteEvent && op.Op != kv.DeleteRangeEvent { td, err := internal.Decode(op.Data) if err != nil { log.Err(err).Str("data", string(op.Data)).Msg("failed to decode data") return api.Errorf(api.Code_INTERNAL, "failed to decode data") } + data = td.RawData + } - event := &api.StreamEvent{ - TxId: tx.Id, - Collection: r.Collection, - Op: op.Op, - Key: op.Key, - Lkey: op.LKey, - Rkey: op.RKey, - Data: td.RawData, - Last: op.Last, - } + event := &api.StreamEvent{ + TxId: tx.Id, + Collection: r.Collection, + Op: op.Op, + Key: op.Key, + Lkey: op.LKey, + Rkey: op.RKey, + Data: data, + Last: op.Last, + } - response := &api.EventsResponse{ - Event: event, - } + response := &api.EventsResponse{ + Event: event, + } - if err := stream.Send(response); ulog.E(err) { - return err - } + if err := stream.Send(response); ulog.E(err) { + return err } } } diff --git a/test/v1/client/driver_test.go b/test/v1/client/driver_test.go index c6770b95c..0d6eee817 100644 --- a/test/v1/client/driver_test.go +++ b/test/v1/client/driver_test.go @@ -20,6 +20,8 @@ import ( "context" "encoding/json" "fmt" + "strings" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -71,6 +73,46 @@ func testRead(t *testing.T, db driver.Database, filter driver.Filter, expected [ assert.NoError(t, it.Err()) } +func testTxEventsStream(ctx context.Context, t *testing.T, rit *driver.EventIterator, db1 driver.Database, coll string, doc1, doc2, doc3 driver.Document) { + var ev driver.Event + var err error + + *rit, err = db1.Events(ctx, coll) + require.NoError(t, err) + it := *rit + defer require.NoError(t, it.Err()) + + require.True(t, it.Next(&ev)) + assert.Equal(t, "insert", ev.Op) + assert.JSONEq(t, string(doc1), string(ev.Data)) + assert.Equal(t, coll, ev.Collection) + + require.True(t, it.Next(&ev)) + assert.Equal(t, "insert", ev.Op) + assert.JSONEq(t, string(doc2), string(ev.Data)) + assert.Equal(t, coll, ev.Collection) + + require.True(t, it.Next(&ev)) + assert.Equal(t, "delete", ev.Op) + assert.Contains(t, string(ev.Key), "value2") + assert.Equal(t, coll, ev.Collection) + + require.True(t, it.Next(&ev)) + assert.Equal(t, "insert", ev.Op) + assert.JSONEq(t, string(doc2), string(ev.Data)) + assert.Equal(t, coll, ev.Collection) + + require.True(t, it.Next(&ev)) + assert.Equal(t, "insert", ev.Op) + assert.JSONEq(t, string(doc3), string(ev.Data)) + assert.Equal(t, coll, ev.Collection) + + require.True(t, it.Next(&ev)) + assert.Equal(t, "update", ev.Op) + assert.JSONEq(t, strings.Replace(string(doc2), "222", "555", -1), string(ev.Data)) + assert.Equal(t, coll, ev.Collection) +} + func testTxReadWrite(t *testing.T, c driver.Driver) { ctx := context.TODO() @@ -99,8 +141,20 @@ func testTxReadWrite(t *testing.T, c driver.Driver) { err = db1.CreateOrUpdateCollection(ctx, "c1", driver.Schema(schema)) require.NoError(t, err) + var wg sync.WaitGroup + doc1 := driver.Document(`{"str_field": "value1", "int_field": 111, "bool_field": true}`) doc2 := driver.Document(`{"str_field": "value2", "int_field": 222, "bool_field": false}`) + doc3 := driver.Document(`{"str_field": "value3", "int_field": 333, "bool_field": false}`) + + var it driver.EventIterator + + wg.Add(1) + go func() { + defer wg.Done() + testTxEventsStream(ctx, t, &it, db1, "c1", doc1, doc2, doc3) + }() + resp, err := db1.Insert(ctx, "c1", []driver.Document{ doc1, doc2, @@ -108,19 +162,20 @@ func testTxReadWrite(t *testing.T, c driver.Driver) { require.NoError(t, err) require.Equal(t, "inserted", resp.Status) - testRead(t, db1, driver.Filter(`{"str_field": "value2"}`), []driver.Document{doc2}) + fldoc2 := driver.Filter(`{"str_field": "value2"}`) - delResp, err := db1.Delete(ctx, "c1", driver.Filter(`{"str_field": "value2"}`)) + testRead(t, db1, fldoc2, []driver.Document{doc2}) + + delResp, err := db1.Delete(ctx, "c1", fldoc2) require.NoError(t, err) require.Equal(t, "deleted", delResp.Status) - testRead(t, db1, driver.Filter(`{"str_field": "value2"}`), nil) + testRead(t, db1, fldoc2, nil) for { tx, err := c.BeginTx(ctx, dbName) require.NoError(t, err) - doc3 := driver.Document(`{"str_field": "value3", "int_field": 333, "bool_field": false}`) resp, err = tx.Insert(ctx, "c1", []driver.Document{ doc2, doc3, @@ -128,17 +183,48 @@ func testTxReadWrite(t *testing.T, c driver.Driver) { require.NoError(t, err) require.Equal(t, "inserted", resp.Status) - it, err := tx.Read(ctx, "c1", driver.Filter(`{"str_field": "value2"}`), nil) + it, err := tx.Read(ctx, "c1", fldoc2, nil) require.NoError(t, err) var doc driver.Document for it.Next(&doc) { assert.JSONEq(t, string(doc2), string(doc)) } + + _, err = tx.Update(ctx, "c1", fldoc2, driver.Update(`{"$set":{"int_field": 555}}`)) + require.NoError(t, err) + if err = tx.Commit(ctx); err == nil || err.Error() != "transaction not committed due to conflict with another transaction" { break } } require.NoError(t, err) + + wg.Wait() + + fldoc3 := driver.Filter(`{"str_field": "value3"}`) + _, err = db1.Delete(ctx, "c1", fldoc3) + require.NoError(t, err) + + _, err = db1.Insert(ctx, "c1", []driver.Document{doc3}) + require.NoError(t, err) + + // Check the unread events continues after drop and recreate with the same name + require.NoError(t, db1.DropCollection(ctx, "c1")) + require.NoError(t, db1.CreateOrUpdateCollection(ctx, "c1", driver.Schema(schema))) + + var ev driver.Event + + require.True(t, it.Next(&ev)) + assert.Equal(t, "delete", ev.Op) + assert.Contains(t, string(ev.Key), "value3") + assert.Equal(t, "c1", ev.Collection) + + require.True(t, it.Next(&ev)) + assert.Equal(t, "insert", ev.Op) + assert.JSONEq(t, string(doc3), string(ev.Data)) + assert.Equal(t, "c1", ev.Collection) + + require.NoError(t, db1.DropCollection(ctx, "c1")) } func testDriverBinary(t *testing.T, c driver.Driver) { From 417815edefecb0e3f15328a7d62106c2c03e077b Mon Sep 17 00:00:00 2001 From: Adil Ansari Date: Fri, 29 Jul 2022 14:26:12 -0700 Subject: [PATCH 2/8] fix: Allow Searching and Faceting over nested fields (#390) --- schema/collection.go | 4 + server/services/v1/query_runner.go | 15 ++-- server/services/v1/query_runner_test.go | 98 ++++++++++++++++++------- 3 files changed, 86 insertions(+), 31 deletions(-) diff --git a/schema/collection.go b/schema/collection.go index 8db327254..87dde0b00 100644 --- a/schema/collection.go +++ b/schema/collection.go @@ -108,6 +108,10 @@ func (d *DefaultCollection) GetIndexes() *Indexes { return d.Indexes } +func (d *DefaultCollection) GetQueryableFields() []*QueryableField { + return d.QueryableFields +} + // Validate expects an unmarshalled document which it will validate again the schema of this collection. func (d *DefaultCollection) Validate(document interface{}) error { err := d.Validator.Validate(document) diff --git a/server/services/v1/query_runner.go b/server/services/v1/query_runner.go index edfc44ddf..b6bd45736 100644 --- a/server/services/v1/query_runner.go +++ b/server/services/v1/query_runner.go @@ -526,17 +526,17 @@ func (runner *SearchQueryRunner) Run(ctx context.Context, tx transaction.Tx, ten return nil, ctx, err } - searchFields, err := runner.getSearchFields(collection.GetFields()) + searchFields, err := runner.getSearchFields(collection.GetQueryableFields()) if err != nil { return nil, ctx, err } - facets, err := runner.getFacetFields(collection.GetFields()) + facets, err := runner.getFacetFields(collection.GetQueryableFields()) if err != nil { return nil, ctx, err } - fieldSelection, err := runner.getFieldSelection(collection.GetFields()) + fieldSelection, err := runner.getFieldSelection(collection.GetQueryableFields()) if err != nil { return nil, ctx, err } @@ -624,7 +624,7 @@ func (runner *SearchQueryRunner) Run(ctx context.Context, tx transaction.Tx, ten return &Response{}, ctx, nil } -func (runner *SearchQueryRunner) getSearchFields(collFields []*schema.Field) ([]string, error) { +func (runner *SearchQueryRunner) getSearchFields(collFields []*schema.QueryableField) ([]string, error) { var searchFields = runner.req.SearchFields if len(searchFields) == 0 { // this is to include all searchable fields if not present in the query @@ -654,7 +654,7 @@ func (runner *SearchQueryRunner) getSearchFields(collFields []*schema.Field) ([] return searchFields, nil } -func (runner *SearchQueryRunner) getFacetFields(collFields []*schema.Field) (qsearch.Facets, error) { +func (runner *SearchQueryRunner) getFacetFields(collFields []*schema.QueryableField) (qsearch.Facets, error) { facets, err := qsearch.UnmarshalFacet(runner.req.Facet) if err != nil { return qsearch.Facets{}, err @@ -666,6 +666,9 @@ func (runner *SearchQueryRunner) getFacetFields(collFields []*schema.Field) (qse if ff.Name != cf.FieldName { continue } + if !cf.Faceted { + return qsearch.Facets{}, api.Errorf(api.Code_INVALID_ARGUMENT, "Faceting not enabled for `%s`", ff.Name) + } if cf.DataType != schema.StringType { return qsearch.Facets{}, api.Errorf(api.Code_INVALID_ARGUMENT, "Cannot generate facets for `%s`. Faceting is only supported for text fields", ff.Name) } @@ -680,7 +683,7 @@ func (runner *SearchQueryRunner) getFacetFields(collFields []*schema.Field) (qse return facets, nil } -func (runner *SearchQueryRunner) getFieldSelection(collFields []*schema.Field) (*read.FieldFactory, error) { +func (runner *SearchQueryRunner) getFieldSelection(collFields []*schema.QueryableField) (*read.FieldFactory, error) { var selectionFields []string // Only one of include/exclude. Honor inclusion over exclusion diff --git a/server/services/v1/query_runner_test.go b/server/services/v1/query_runner_test.go index 31615d97e..4e967840d 100644 --- a/server/services/v1/query_runner_test.go +++ b/server/services/v1/query_runner_test.go @@ -8,16 +8,78 @@ import ( "github.com/tigrisdata/tigris/schema" ) -func TestSearchQueryRunner_getFieldSelection(t *testing.T) { - t.Run("only include fields are provided", func(t *testing.T) { - collFields := []*schema.Field{ - {FieldName: "field_1"}, - {FieldName: "field_2"}, +func TestSearchQueryRunner_getFacetFields(t *testing.T) { + collFields := []*schema.QueryableField{ + {FieldName: "field_1", Faceted: true, DataType: schema.StringType}, + {FieldName: "parent.field_2", Faceted: true, DataType: schema.StringType}, + {FieldName: "field_3", Faceted: false, DataType: schema.StringType}, + {FieldName: "field_4", Faceted: true, DataType: schema.Int32Type}, + } + runner := &SearchQueryRunner{req: &api.SearchRequest{}} + + t.Run("no facet field param in input", func(t *testing.T) { + runner.req.Facet = nil + facets, err := runner.getFacetFields(collFields) + assert.NoError(t, err) + assert.NotNil(t, facets) + assert.Empty(t, facets.Fields) + }) + + t.Run("no queryable field in collection", func(t *testing.T) { + var collFields []*schema.QueryableField + runner.req.Facet = []byte(`{"field_1":{"size":10}}`) + facets, err := runner.getFacetFields(collFields) + assert.ErrorContains(t, err, "`field_1` is not a schema field") + assert.NotNil(t, facets) + assert.Empty(t, facets.Fields) + }) + + t.Run("requested facet field is not faceted in collection", func(t *testing.T) { + runner.req.Facet = []byte(`{"parent.field_2":{"size":10},"field_3":{"size":10}}`) + facets, err := runner.getFacetFields(collFields) + assert.ErrorContains(t, err, "Faceting not enabled for `field_3`") + assert.NotNil(t, facets) + assert.Empty(t, facets.Fields) + }) + + t.Run("requested facet fields are not in collection", func(t *testing.T) { + runner.req.Facet = []byte(`{"field_1":{"size":10},"field_5":{"size":10}}`) + facets, err := runner.getFacetFields(collFields) + assert.ErrorContains(t, err, "`field_5` is not a schema field") + assert.NotNil(t, facets) + assert.Empty(t, facets.Fields) + }) + + t.Run("requested facet fields are not of String datatype", func(t *testing.T) { + runner.req.Facet = []byte(`{"field_1":{"size":10},"field_4":{"size":10}}`) + facets, err := runner.getFacetFields(collFields) + assert.ErrorContains(t, err, "Cannot generate facets for `field_4`") + assert.NotNil(t, facets) + assert.Empty(t, facets.Fields) + }) + + t.Run("valid facet fields requested", func(t *testing.T) { + runner.req.Facet = []byte(`{"field_1":{"size":10},"parent.field_2":{"size":10}}`) + facets, err := runner.getFacetFields(collFields) + assert.NoError(t, err) + assert.Len(t, facets.Fields, 2) + for _, ff := range facets.Fields { + assert.Contains(t, []string{"field_1", "parent.field_2"}, ff.Name) + assert.Equal(t, ff.Size, 10) } + }) +} +func TestSearchQueryRunner_getFieldSelection(t *testing.T) { + collFields := []*schema.QueryableField{ + {FieldName: "field_1"}, + {FieldName: "parent.field_2"}, + } + + t.Run("only include fields are provided", func(t *testing.T) { runner := &SearchQueryRunner{ req: &api.SearchRequest{ - IncludeFields: []string{"field_1", "field_2"}, + IncludeFields: []string{"field_1", "parent.field_2"}, }, } @@ -28,18 +90,13 @@ func TestSearchQueryRunner_getFieldSelection(t *testing.T) { assert.Empty(t, factory.Exclude) assert.Len(t, factory.Include, 2) assert.Contains(t, factory.Include, "field_1") - assert.Contains(t, factory.Include, "field_2") + assert.Contains(t, factory.Include, "parent.field_2") }) t.Run("only exclude fields are provided", func(t *testing.T) { - collFields := []*schema.Field{ - {FieldName: "field_1"}, - {FieldName: "field_2"}, - } - runner := &SearchQueryRunner{ req: &api.SearchRequest{ - ExcludeFields: []string{"field_1", "field_2"}, + ExcludeFields: []string{"field_1", "parent.field_2"}, }, } @@ -50,14 +107,10 @@ func TestSearchQueryRunner_getFieldSelection(t *testing.T) { assert.Empty(t, factory.Include) assert.Len(t, factory.Exclude, 2) assert.Contains(t, factory.Exclude, "field_1") - assert.Contains(t, factory.Exclude, "field_2") + assert.Contains(t, factory.Exclude, "parent.field_2") }) t.Run("no fields to include or exclude", func(t *testing.T) { - collFields := []*schema.Field{ - {FieldName: "field_1"}, - {FieldName: "field_2"}, - } runner := &SearchQueryRunner{req: &api.SearchRequest{}} factory, err := runner.getFieldSelection(collFields) @@ -67,7 +120,7 @@ func TestSearchQueryRunner_getFieldSelection(t *testing.T) { }) t.Run("no schema fields are defined", func(t *testing.T) { - var collFields []*schema.Field + var collFields []*schema.QueryableField runner := &SearchQueryRunner{req: &api.SearchRequest{}} factory, err := runner.getFieldSelection(collFields) @@ -77,14 +130,9 @@ func TestSearchQueryRunner_getFieldSelection(t *testing.T) { }) t.Run("selection fields are not in schema", func(t *testing.T) { - collFields := []*schema.Field{ - {FieldName: "field_1"}, - {FieldName: "field_2"}, - } - runner := &SearchQueryRunner{ req: &api.SearchRequest{ - ExcludeFields: []string{"field_2", "field_3"}, + ExcludeFields: []string{"field_1", "field_3"}, }, } From 0c2ec3420a80a8f24347231254bcd94fbe4d40c1 Mon Sep 17 00:00:00 2001 From: Peter Boros Date: Mon, 1 Aug 2022 16:39:16 +0100 Subject: [PATCH 3/8] fix: get service names consistent (#391) --- server/metrics/tracing.go | 5 ++--- server/midddleware/tracing.go | 5 +++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/server/metrics/tracing.go b/server/metrics/tracing.go index 203339205..363fc48b0 100644 --- a/server/metrics/tracing.go +++ b/server/metrics/tracing.go @@ -22,9 +22,8 @@ import ( ) const ( - GrpcTracingServiceName = "tigris.grpc" - KvTracingServiceName = "tigris.fdb.kv" - TxManagerTracingServiceName = "tigris.tx.manager" + KvTracingServiceName = "kv" + TxManagerTracingServiceName = "txmanager" ) type SpanMeta struct { diff --git a/server/midddleware/tracing.go b/server/midddleware/tracing.go index b0587f1de..74fb3c529 100644 --- a/server/midddleware/tracing.go +++ b/server/midddleware/tracing.go @@ -16,6 +16,7 @@ package middleware import ( "context" + "github.com/tigrisdata/tigris/util" middleware "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/tigrisdata/tigris/server/metrics" @@ -26,7 +27,7 @@ func traceUnary() func(ctx context.Context, req interface{}, info *grpc.UnarySer return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { var finisher func() grpcMeta := metrics.GetGrpcEndPointMetadataFromFullMethod(ctx, info.FullMethod, "unary") - spanMeta := metrics.NewSpanMeta(metrics.GrpcTracingServiceName, info.FullMethod, "unary_rpc", grpcMeta.GetTags()) + spanMeta := metrics.NewSpanMeta(util.Service, info.FullMethod, "unary_rpc", grpcMeta.GetTags()) ctx, finisher = spanMeta.StartTracing(ctx, false) defer finisher() resp, err := handler(ctx, req) @@ -40,7 +41,7 @@ func traceStream() grpc.StreamServerInterceptor { wrapped := middleware.WrapServerStream(stream) wrapped.WrappedContext = stream.Context() grpcMeta := metrics.GetGrpcEndPointMetadataFromFullMethod(wrapped.WrappedContext, info.FullMethod, "stream") - spanMeta := metrics.NewSpanMeta(metrics.GrpcTracingServiceName, info.FullMethod, "stream_rpc", grpcMeta.GetTags()) + spanMeta := metrics.NewSpanMeta(util.Service, info.FullMethod, "stream_rpc", grpcMeta.GetTags()) wrapped.WrappedContext, finisher = spanMeta.StartTracing(wrapped.WrappedContext, false) defer finisher() wrapper := &recvWrapper{wrapped} From 49df8cbce97e5def30397d2cfb4d1c51f95b6036 Mon Sep 17 00:00:00 2001 From: Peter Boros Date: Mon, 1 Aug 2022 17:00:12 +0100 Subject: [PATCH 4/8] fix: typo in middleware dir name (#392) --- server/{midddleware => middleware}/auth.go | 0 server/{midddleware => middleware}/auth_test.go | 0 server/{midddleware => middleware}/forwarder.go | 0 server/{midddleware => middleware}/headers.go | 0 server/{midddleware => middleware}/metrics.go | 0 server/{midddleware => middleware}/metrics_test.go | 0 server/{midddleware => middleware}/middleware.go | 0 server/{midddleware => middleware}/namespace.go | 0 server/{midddleware => middleware}/pprof.go | 0 server/{midddleware => middleware}/quota.go | 0 server/{midddleware => middleware}/timeout.go | 0 server/{midddleware => middleware}/timeout_test.go | 0 server/{midddleware => middleware}/tracing.go | 0 server/{midddleware => middleware}/validate.go | 0 server/muxer/grpc.go | 2 +- server/muxer/http.go | 2 +- server/services/v1/sessions.go | 2 +- 17 files changed, 3 insertions(+), 3 deletions(-) rename server/{midddleware => middleware}/auth.go (100%) rename server/{midddleware => middleware}/auth_test.go (100%) rename server/{midddleware => middleware}/forwarder.go (100%) rename server/{midddleware => middleware}/headers.go (100%) rename server/{midddleware => middleware}/metrics.go (100%) rename server/{midddleware => middleware}/metrics_test.go (100%) rename server/{midddleware => middleware}/middleware.go (100%) rename server/{midddleware => middleware}/namespace.go (100%) rename server/{midddleware => middleware}/pprof.go (100%) rename server/{midddleware => middleware}/quota.go (100%) rename server/{midddleware => middleware}/timeout.go (100%) rename server/{midddleware => middleware}/timeout_test.go (100%) rename server/{midddleware => middleware}/tracing.go (100%) rename server/{midddleware => middleware}/validate.go (100%) diff --git a/server/midddleware/auth.go b/server/middleware/auth.go similarity index 100% rename from server/midddleware/auth.go rename to server/middleware/auth.go diff --git a/server/midddleware/auth_test.go b/server/middleware/auth_test.go similarity index 100% rename from server/midddleware/auth_test.go rename to server/middleware/auth_test.go diff --git a/server/midddleware/forwarder.go b/server/middleware/forwarder.go similarity index 100% rename from server/midddleware/forwarder.go rename to server/middleware/forwarder.go diff --git a/server/midddleware/headers.go b/server/middleware/headers.go similarity index 100% rename from server/midddleware/headers.go rename to server/middleware/headers.go diff --git a/server/midddleware/metrics.go b/server/middleware/metrics.go similarity index 100% rename from server/midddleware/metrics.go rename to server/middleware/metrics.go diff --git a/server/midddleware/metrics_test.go b/server/middleware/metrics_test.go similarity index 100% rename from server/midddleware/metrics_test.go rename to server/middleware/metrics_test.go diff --git a/server/midddleware/middleware.go b/server/middleware/middleware.go similarity index 100% rename from server/midddleware/middleware.go rename to server/middleware/middleware.go diff --git a/server/midddleware/namespace.go b/server/middleware/namespace.go similarity index 100% rename from server/midddleware/namespace.go rename to server/middleware/namespace.go diff --git a/server/midddleware/pprof.go b/server/middleware/pprof.go similarity index 100% rename from server/midddleware/pprof.go rename to server/middleware/pprof.go diff --git a/server/midddleware/quota.go b/server/middleware/quota.go similarity index 100% rename from server/midddleware/quota.go rename to server/middleware/quota.go diff --git a/server/midddleware/timeout.go b/server/middleware/timeout.go similarity index 100% rename from server/midddleware/timeout.go rename to server/middleware/timeout.go diff --git a/server/midddleware/timeout_test.go b/server/middleware/timeout_test.go similarity index 100% rename from server/midddleware/timeout_test.go rename to server/middleware/timeout_test.go diff --git a/server/midddleware/tracing.go b/server/middleware/tracing.go similarity index 100% rename from server/midddleware/tracing.go rename to server/middleware/tracing.go diff --git a/server/midddleware/validate.go b/server/middleware/validate.go similarity index 100% rename from server/midddleware/validate.go rename to server/middleware/validate.go diff --git a/server/muxer/grpc.go b/server/muxer/grpc.go index 537473761..92da3d55c 100644 --- a/server/muxer/grpc.go +++ b/server/muxer/grpc.go @@ -19,7 +19,7 @@ import ( "github.com/soheilhy/cmux" "github.com/tigrisdata/tigris/server/config" "github.com/tigrisdata/tigris/server/metadata" - middleware "github.com/tigrisdata/tigris/server/midddleware" + "github.com/tigrisdata/tigris/server/middleware" "github.com/tigrisdata/tigris/server/transaction" "google.golang.org/grpc" "google.golang.org/grpc/reflection" diff --git a/server/muxer/http.go b/server/muxer/http.go index 1afe799e3..e646a50dd 100644 --- a/server/muxer/http.go +++ b/server/muxer/http.go @@ -25,7 +25,7 @@ import ( "github.com/soheilhy/cmux" "github.com/tigrisdata/tigris/server/config" "github.com/tigrisdata/tigris/server/metadata" - middleware "github.com/tigrisdata/tigris/server/midddleware" + "github.com/tigrisdata/tigris/server/middleware" "github.com/tigrisdata/tigris/server/transaction" ) diff --git a/server/services/v1/sessions.go b/server/services/v1/sessions.go index 40b969f72..fd55dec60 100644 --- a/server/services/v1/sessions.go +++ b/server/services/v1/sessions.go @@ -23,7 +23,7 @@ import ( "github.com/rs/zerolog/log" api "github.com/tigrisdata/tigris/api/server/v1" "github.com/tigrisdata/tigris/server/metadata" - middleware "github.com/tigrisdata/tigris/server/midddleware" + "github.com/tigrisdata/tigris/server/middleware" "github.com/tigrisdata/tigris/server/request" "github.com/tigrisdata/tigris/server/transaction" "github.com/tigrisdata/tigris/store/kv" From 7b296cef26b56c8dc921e7201970948d32606d99 Mon Sep 17 00:00:00 2001 From: Himank Chaudhary Date: Mon, 1 Aug 2022 10:08:52 -0700 Subject: [PATCH 5/8] fix: on drop remove entry from counter table as well --- server/config/options.go | 12 ++-- server/metadata/key_generator.go | 84 ++++++++++++++++++++++++++++ server/metadata/tenant.go | 81 +++++++++++++++------------ server/services/v1/key_generator.go | 82 ++------------------------- server/services/v1/query_runner.go | 8 +-- server/services/v1/search_indexer.go | 5 +- test/v1/server/document_test.go | 22 +++----- 7 files changed, 158 insertions(+), 136 deletions(-) create mode 100644 server/metadata/key_generator.go diff --git a/server/config/options.go b/server/config/options.go index 7eedd9540..6d0285548 100644 --- a/server/config/options.go +++ b/server/config/options.go @@ -21,9 +21,9 @@ import ( ) type ServerConfig struct { - Host string - Port int16 - FDBDelete bool `mapstructure:"fdb_delete" yaml:"fdb_delete" json:"fdb_delete"` + Host string + Port int16 + FDBHardDrop bool `mapstructure:"fdb_hard_drop" yaml:"fdb_hard_drop" json:"fdb_hard_drop"` } type Config struct { @@ -107,9 +107,9 @@ var DefaultConfig = Config{ SampleRate: 0.01, }, Server: ServerConfig{ - Host: "0.0.0.0", - Port: 8081, - FDBDelete: false, + Host: "0.0.0.0", + Port: 8081, + FDBHardDrop: false, }, Auth: AuthConfig{ IssuerURL: "https://tigrisdata-dev.us.auth0.com/", diff --git a/server/metadata/key_generator.go b/server/metadata/key_generator.go new file mode 100644 index 000000000..b0866f292 --- /dev/null +++ b/server/metadata/key_generator.go @@ -0,0 +1,84 @@ +package metadata + +import ( + "context" + + "github.com/tigrisdata/tigris/internal" + "github.com/tigrisdata/tigris/keys" + "github.com/tigrisdata/tigris/server/metadata/encoding" + "github.com/tigrisdata/tigris/server/transaction" + "github.com/tigrisdata/tigris/store/kv" +) + +const ( + // generatorSubspaceKey is used to store ids in storage so that we can guarantee uniqueness + generatorSubspaceKey = "generator" + // int32IdKey is the prefix after generator subspace to store int32 counters + int32IdKey = "int32_id" +) + +// TableKeyGenerator is used to generated keys that may need persistence like counter. +type TableKeyGenerator struct{} + +func NewTableKeyGenerator() *TableKeyGenerator { + return &TableKeyGenerator{} +} + +// GenerateCounter is used to generate an id in a transaction for int32 field only. This is mainly used to guarantee +// uniqueness with auto-incremented ids, so what we are doing is reserving this id in storage before returning to the +// caller so that only one id is assigned to one caller. +func (g *TableKeyGenerator) GenerateCounter(ctx context.Context, txMgr *transaction.Manager, table []byte) (int32, error) { + for { + tx, err := txMgr.StartTx(ctx) + if err != nil { + return -1, err + } + + var valueI32 int32 + if valueI32, err = g.generateCounter(ctx, tx, table); err != nil { + _ = tx.Rollback(ctx) + } + + if err = tx.Commit(ctx); err == nil { + return valueI32, nil + } + if err != kv.ErrConflictingTransaction { + return -1, err + } + } +} + +// generateCounter as it is used to generate int32 value, we are simply maintaining a counter. There is a contention to +// generate a counter if it is concurrently getting executed but the generation should be fast then it is best to start +// with this approach. +func (g *TableKeyGenerator) generateCounter(ctx context.Context, tx transaction.Tx, table []byte) (int32, error) { + key := keys.NewKey([]byte(generatorSubspaceKey), table, int32IdKey) + it, err := tx.Read(ctx, key) + if err != nil { + return 0, err + } + + id := uint32(1) + var row kv.KeyValue + if it.Next(&row) { + id = encoding.ByteToUInt32(row.Data.RawData) + uint32(1) + } + if err := it.Err(); err != nil { + return 0, err + } + + if err := tx.Replace(ctx, key, internal.NewTableData(encoding.UInt32ToByte(id))); err != nil { + return 0, err + } + + return int32(id), nil +} + +func (g *TableKeyGenerator) removeCounter(ctx context.Context, tx transaction.Tx, table []byte) error { + key := keys.NewKey([]byte(generatorSubspaceKey), table, int32IdKey) + if err := tx.Delete(ctx, key); err != nil { + return err + } + + return nil +} diff --git a/server/metadata/tenant.go b/server/metadata/tenant.go index 925fa308e..e64e9f6df 100644 --- a/server/metadata/tenant.go +++ b/server/metadata/tenant.go @@ -106,16 +106,17 @@ func (n *TenantNamespace) Id() uint32 { type TenantManager struct { sync.RWMutex - metaStore *encoding.MetadataDictionary - schemaStore *encoding.SchemaSubspace - kvStore kv.KeyValueStore - searchStore search.Store - tenants map[string]*Tenant - idToTenantMap map[uint32]string - version Version - versionH *VersionHandler - mdNameRegistry encoding.MDNameRegistry - encoder Encoder + metaStore *encoding.MetadataDictionary + schemaStore *encoding.SchemaSubspace + kvStore kv.KeyValueStore + searchStore search.Store + tenants map[string]*Tenant + idToTenantMap map[uint32]string + version Version + versionH *VersionHandler + mdNameRegistry encoding.MDNameRegistry + encoder Encoder + tableKeyGenerator *TableKeyGenerator } func NewTenantManager(kvStore kv.KeyValueStore, searchStore search.Store) *TenantManager { @@ -125,15 +126,16 @@ func NewTenantManager(kvStore kv.KeyValueStore, searchStore search.Store) *Tenan func newTenantManager(kvStore kv.KeyValueStore, searchStore search.Store, mdNameRegistry encoding.MDNameRegistry) *TenantManager { return &TenantManager{ - kvStore: kvStore, - searchStore: searchStore, - encoder: NewEncoder(), - metaStore: encoding.NewMetadataDictionary(mdNameRegistry), - schemaStore: encoding.NewSchemaStore(mdNameRegistry), - tenants: make(map[string]*Tenant), - idToTenantMap: make(map[uint32]string), - versionH: &VersionHandler{}, - mdNameRegistry: mdNameRegistry, + kvStore: kvStore, + searchStore: searchStore, + encoder: NewEncoder(), + metaStore: encoding.NewMetadataDictionary(mdNameRegistry), + schemaStore: encoding.NewSchemaStore(mdNameRegistry), + tenants: make(map[string]*Tenant), + idToTenantMap: make(map[uint32]string), + versionH: &VersionHandler{}, + mdNameRegistry: mdNameRegistry, + tableKeyGenerator: NewTableKeyGenerator(), } } @@ -269,7 +271,7 @@ func (m *TenantManager) GetTenant(ctx context.Context, namespaceName string, txM } namespace := NewTenantNamespace(namespaceName, id) - tenant = NewTenant(namespace, m.kvStore, m.searchStore, m.metaStore, m.schemaStore, m.encoder, m.versionH, currentVersion) + tenant = NewTenant(namespace, m.kvStore, m.searchStore, m.metaStore, m.schemaStore, m.encoder, m.versionH, currentVersion, m.tableKeyGenerator) if err = tenant.reload(ctx, tx, currentVersion); err != nil { return nil, err } @@ -305,7 +307,7 @@ func (m *TenantManager) createOrGetTenantInternal(ctx context.Context, tx transa return nil, err } - tenant := NewTenant(namespace, m.kvStore, m.searchStore, m.metaStore, m.schemaStore, m.encoder, m.versionH, currentVersion) + tenant := NewTenant(namespace, m.kvStore, m.searchStore, m.metaStore, m.schemaStore, m.encoder, m.versionH, currentVersion, m.tableKeyGenerator) tenant.Lock() err = tenant.reload(ctx, tx, currentVersion) tenant.Unlock() @@ -323,7 +325,7 @@ func (m *TenantManager) createOrGetTenantInternal(ctx context.Context, tx transa return nil, err } - return NewTenant(namespace, m.kvStore, m.searchStore, m.metaStore, m.schemaStore, m.encoder, m.versionH, nil), nil + return NewTenant(namespace, m.kvStore, m.searchStore, m.metaStore, m.schemaStore, m.encoder, m.versionH, nil, m.tableKeyGenerator), nil } // GetTableNameFromIds returns tenant name, database name, collection name corresponding to their encoded ids. @@ -421,7 +423,7 @@ func (m *TenantManager) reload(ctx context.Context, tx transaction.Tx, currentVe for namespace, id := range namespaces { if _, ok := m.tenants[namespace]; !ok { - m.tenants[namespace] = NewTenant(NewTenantNamespace(namespace, id), m.kvStore, m.searchStore, m.metaStore, m.schemaStore, m.encoder, m.versionH, currentVersion) + m.tenants[namespace] = NewTenant(NewTenantNamespace(namespace, id), m.kvStore, m.searchStore, m.metaStore, m.schemaStore, m.encoder, m.versionH, currentVersion, m.tableKeyGenerator) m.idToTenantMap[id] = namespace } } @@ -443,19 +445,20 @@ func (m *TenantManager) reload(ctx context.Context, tx transaction.Tx, currentVe type Tenant struct { sync.RWMutex - kvStore kv.KeyValueStore - searchStore search.Store - schemaStore *encoding.SchemaSubspace - metaStore *encoding.MetadataDictionary - Encoder Encoder - databases map[string]*Database - idToDatabaseMap map[uint32]string - namespace Namespace - version Version - versionH *VersionHandler + kvStore kv.KeyValueStore + searchStore search.Store + schemaStore *encoding.SchemaSubspace + metaStore *encoding.MetadataDictionary + Encoder Encoder + databases map[string]*Database + idToDatabaseMap map[uint32]string + namespace Namespace + version Version + versionH *VersionHandler + TableKeyGenerator *TableKeyGenerator } -func NewTenant(namespace Namespace, kvStore kv.KeyValueStore, searchStore search.Store, dict *encoding.MetadataDictionary, schemaStore *encoding.SchemaSubspace, encoder Encoder, versionH *VersionHandler, currentVersion Version) *Tenant { +func NewTenant(namespace Namespace, kvStore kv.KeyValueStore, searchStore search.Store, dict *encoding.MetadataDictionary, schemaStore *encoding.SchemaSubspace, encoder Encoder, versionH *VersionHandler, currentVersion Version, tableKeyGenerator *TableKeyGenerator) *Tenant { return &Tenant{ kvStore: kvStore, searchStore: searchStore, @@ -819,8 +822,16 @@ func (tenant *Tenant) dropCollection(ctx context.Context, tx transaction.Tx, db return err } + tableName, err := tenant.Encoder.EncodeTableName(tenant.namespace, db, cHolder.collection) + if err != nil { + return err + } + if err := tenant.TableKeyGenerator.removeCounter(ctx, tx, tableName); err != nil { + return err + } + // TODO: Move actual deletion out of the mutex - if config.DefaultConfig.Server.FDBDelete { + if config.DefaultConfig.Server.FDBHardDrop { tableName, err := tenant.Encoder.EncodeTableName(tenant.namespace, db, cHolder.collection) if err != nil { return err diff --git a/server/services/v1/key_generator.go b/server/services/v1/key_generator.go index 369881e5b..e3f81abec 100644 --- a/server/services/v1/key_generator.go +++ b/server/services/v1/key_generator.go @@ -10,24 +10,14 @@ import ( "github.com/buger/jsonparser" "github.com/pkg/errors" api "github.com/tigrisdata/tigris/api/server/v1" - "github.com/tigrisdata/tigris/internal" "github.com/tigrisdata/tigris/keys" "github.com/tigrisdata/tigris/lib/uuid" "github.com/tigrisdata/tigris/schema" "github.com/tigrisdata/tigris/server/metadata" - "github.com/tigrisdata/tigris/server/metadata/encoding" "github.com/tigrisdata/tigris/server/transaction" - "github.com/tigrisdata/tigris/store/kv" "github.com/tigrisdata/tigris/value" ) -const ( - // generatorSubspaceKey is used to store ids in storage so that we can guarantee uniqueness - generatorSubspaceKey = "generator" - // int32IdKey is the prefix after generator subspace to store int32 counters - int32IdKey = "int32_id" -) - var ( zeroIntStringSlice = []byte("0") zeroUUIDStringSlice = []byte(uuid.NullUUID.String()) @@ -37,14 +27,14 @@ var ( // keyGenerator is used to extract the keys from document and return keys.Key which will be used by Insert/Replace API. // keyGenerator may mutate the document in case autoGenerate is set for primary key fields. type keyGenerator struct { - generator *generator + generator *metadata.TableKeyGenerator document []byte keysForResp []byte index *schema.Index forceInsert bool } -func newKeyGenerator(document []byte, generator *generator, index *schema.Index) *keyGenerator { +func newKeyGenerator(document []byte, generator *metadata.TableKeyGenerator, index *schema.Index) *keyGenerator { return &keyGenerator{ document: document, generator: generator, @@ -57,7 +47,7 @@ func (k *keyGenerator) getKeysForResp() []byte { } // generate method also modifies the JSON document in case of autoGenerate primary key. -func (k *keyGenerator) generate(ctx context.Context, encoder metadata.Encoder, table []byte) (keys.Key, error) { +func (k *keyGenerator) generate(ctx context.Context, txMgr *transaction.Manager, encoder metadata.Encoder, table []byte) (keys.Key, error) { var indexParts []interface{} for _, field := range k.index.Fields { jsonVal, dtp, _, err := jsonparser.Get(k.document, field.FieldName) @@ -70,7 +60,7 @@ func (k *keyGenerator) generate(ctx context.Context, encoder metadata.Encoder, t var v value.Value if autoGenerate { - if jsonVal, v, err = k.generator.get(ctx, table, field); err != nil { + if jsonVal, v, err = k.get(ctx, txMgr, table, field); err != nil { return nil, err } if err = k.setKeyInDoc(field, jsonVal); err != nil { @@ -120,16 +110,6 @@ func (k *keyGenerator) getJsonQuotedValue(fieldType schema.FieldType, jsonVal [] } } -type generator struct { - txMgr *transaction.Manager -} - -func newGenerator(txMgr *transaction.Manager) *generator { - return &generator{ - txMgr: txMgr, - } -} - // isNull checks if the value is "zero" value of it's type func isNull(tp schema.FieldType, val []byte) bool { switch tp { @@ -150,7 +130,7 @@ func isNull(tp schema.FieldType, val []byte) bool { // get returns generated id for the supported primary key fields. This method returns unquoted JSON values. This is to // align with the json library that we are using as that returns unquoted strings as well. It is returning internal // value as well so that we don't need to recalculate it from jsonVal. -func (g *generator) get(ctx context.Context, table []byte, field *schema.Field) ([]byte, value.Value, error) { +func (k *keyGenerator) get(ctx context.Context, txMgr *transaction.Manager, table []byte, field *schema.Field) ([]byte, value.Value, error) { switch field.Type() { case schema.StringType, schema.UUIDType: value := value.NewStringValue(uuid.NewUUIDAsString()) @@ -168,7 +148,7 @@ func (g *generator) get(ctx context.Context, table []byte, field *schema.Field) value := value.NewIntValue(time.Now().UTC().UnixNano()) return []byte(fmt.Sprintf(`%d`, *value)), value, nil case schema.Int32Type: - valueI32, err := g.generateInTx(ctx, table) + valueI32, err := k.generator.GenerateCounter(ctx, txMgr, table) if err != nil { return nil, nil, err } @@ -178,53 +158,3 @@ func (g *generator) get(ctx context.Context, table []byte, field *schema.Field) } return nil, nil, api.Errorf(api.Code_INVALID_ARGUMENT, "unsupported type found in auto-generator") } - -// generateInTx is used to generate an id in a transaction for int32 field only. This is mainly used to guarantee -// uniqueness with auto-incremented ids, so what we are doing is reserving this id in storage before returning to the -// caller so that only one id is assigned to one caller. -func (g *generator) generateInTx(ctx context.Context, table []byte) (int32, error) { - for { - tx, err := g.txMgr.StartTx(ctx) - if err != nil { - return -1, err - } - - var valueI32 int32 - if valueI32, err = g.generateInt(ctx, tx, table); err != nil { - _ = tx.Rollback(ctx) - } - - if err = tx.Commit(ctx); err == nil { - return valueI32, nil - } - if err != kv.ErrConflictingTransaction { - return -1, err - } - } -} - -// generateInt as it is used to generate int32 value, we are simply maintaining a counter. There is a contention to -// generate a counter if it is concurrently getting executed but the generation should be fast then it is best to start -// with this approach. -func (g *generator) generateInt(ctx context.Context, tx transaction.Tx, table []byte) (int32, error) { - key := keys.NewKey([]byte(generatorSubspaceKey), table, int32IdKey) - it, err := tx.Read(ctx, key) - if err != nil { - return 0, err - } - - id := uint32(1) - var row kv.KeyValue - if it.Next(&row) { - id = encoding.ByteToUInt32(row.Data.RawData) + uint32(1) - } - if err := it.Err(); err != nil { - return 0, err - } - - if err := tx.Replace(ctx, key, internal.NewTableData(encoding.UInt32ToByte(id))); err != nil { - return 0, err - } - - return int32(id), nil -} diff --git a/server/services/v1/query_runner.go b/server/services/v1/query_runner.go index b6bd45736..ff1da616f 100644 --- a/server/services/v1/query_runner.go +++ b/server/services/v1/query_runner.go @@ -120,16 +120,16 @@ func (f *QueryRunnerFactory) GetDatabaseQueryRunner() *DatabaseQueryRunner { type BaseQueryRunner struct { encoder metadata.Encoder cdcMgr *cdc.Manager - generator *generator searchStore search.Store + txMgr *transaction.Manager } func NewBaseQueryRunner(encoder metadata.Encoder, cdcMgr *cdc.Manager, txMgr *transaction.Manager, searchStore search.Store) *BaseQueryRunner { return &BaseQueryRunner{ encoder: encoder, cdcMgr: cdcMgr, - generator: newGenerator(txMgr), searchStore: searchStore, + txMgr: txMgr, } } @@ -189,8 +189,8 @@ func (runner *BaseQueryRunner) insertOrReplace(ctx context.Context, tx transacti return nil, nil, err } - keyGen := newKeyGenerator(doc, runner.generator, coll.Indexes.PrimaryKey) - key, err := keyGen.generate(ctx, runner.encoder, table) + keyGen := newKeyGenerator(doc, tenant.TableKeyGenerator, coll.Indexes.PrimaryKey) + key, err := keyGen.generate(ctx, runner.txMgr, runner.encoder, table) if err != nil { return nil, nil, err } diff --git a/server/services/v1/search_indexer.go b/server/services/v1/search_indexer.go index 974218b92..a8cf1ebdc 100644 --- a/server/services/v1/search_indexer.go +++ b/server/services/v1/search_indexer.go @@ -41,6 +41,7 @@ const ( ) const ( + searchCreate string = "create" searchUpsert string = "upsert" searchUpdate string = "update" ) @@ -86,7 +87,9 @@ func (i *SearchIndexer) OnPostCommit(ctx context.Context, tenant *metadata.Tenan } else { var action string switch event.Op { - case kv.InsertEvent, kv.ReplaceEvent: + case kv.InsertEvent: + action = searchCreate + case kv.ReplaceEvent: action = searchUpsert case kv.UpdateEvent: action = searchUpdate diff --git a/test/v1/server/document_test.go b/test/v1/server/document_test.go index a30b124d3..69ae76bba 100644 --- a/test/v1/server/document_test.go +++ b/test/v1/server/document_test.go @@ -21,7 +21,6 @@ import ( "encoding/base64" "encoding/json" "fmt" - "math/rand" "net/http" "testing" "time" @@ -475,33 +474,28 @@ func (s *DocumentSuite) TestInsert_MultipleRows() { } func TestInsert_AutoGenerated(t *testing.T) { - dbName := fmt.Sprintf("db_test_autogenerated_%x", rand.Uint64()) - - dropDatabase(t, dbName) - createDatabase(t, dbName) - defer dropDatabase(t, dbName) - - collectionName := fmt.Sprintf("test_autogenerated_%x", rand.Uint64()) + SetupSuite(t, defaultDatabaseTestName, defaultCollectionTestName) + defer TearDownSuite(t, defaultDatabaseTestName) - testAutoGenerated(t, dbName, collectionName, Map{"type": "string", "autoGenerate": true}) - testAutoGenerated(t, dbName, collectionName, Map{ + testAutoGenerated(t, defaultDatabaseTestName, defaultCollectionTestName, Map{"type": "string", "autoGenerate": true}) + testAutoGenerated(t, defaultDatabaseTestName, defaultCollectionTestName, Map{ "type": "string", "autoGenerate": true, "format": "byte"}) - testAutoGenerated(t, dbName, collectionName, Map{ + testAutoGenerated(t, defaultDatabaseTestName, defaultCollectionTestName, Map{ "type": "string", "format": "uuid", "autoGenerate": true}) - testAutoGenerated(t, dbName, collectionName, Map{ + testAutoGenerated(t, defaultDatabaseTestName, defaultCollectionTestName, Map{ "type": "string", "format": "date-time", "autoGenerate": true}) - testAutoGenerated(t, dbName, collectionName, Map{ + testAutoGenerated(t, defaultDatabaseTestName, defaultCollectionTestName, Map{ "type": "integer", "format": "int32", "autoGenerate": true, }) - testAutoGenerated(t, dbName, collectionName, Map{"type": "integer", "autoGenerate": true}) + testAutoGenerated(t, defaultDatabaseTestName, defaultCollectionTestName, Map{"type": "integer", "autoGenerate": true}) } func TestInsert_SchemaUpdate(t *testing.T) { From 6937b0c3eb43e0ef0f77d331e7507213c5904fa5 Mon Sep 17 00:00:00 2001 From: Peter Boros Date: Tue, 2 Aug 2022 16:33:55 +0100 Subject: [PATCH 6/8] fix: fix datadog traces (#394) --- server/metrics/tracing.go | 3 ++- server/middleware/middleware.go | 1 - server/middleware/tracing.go | 8 ++++++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/server/metrics/tracing.go b/server/metrics/tracing.go index 363fc48b0..3c6654993 100644 --- a/server/metrics/tracing.go +++ b/server/metrics/tracing.go @@ -24,6 +24,7 @@ import ( const ( KvTracingServiceName = "kv" TxManagerTracingServiceName = "txmanager" + TraceServiceName = "tigris.grpc.server" ) type SpanMeta struct { @@ -60,7 +61,7 @@ func (s *SpanMeta) StartTracing(ctx context.Context, childOnly bool) (context.Co // There is no parent span, no need to start tracing here return ctx, func() {} } - span := tracer.StartSpan(s.resourceName, spanOpts...) + span := tracer.StartSpan(TraceServiceName, spanOpts...) for k, v := range s.tags { span.SetTag(k, v) } diff --git a/server/middleware/middleware.go b/server/middleware/middleware.go index a823fd7cc..a54480517 100644 --- a/server/middleware/middleware.go +++ b/server/middleware/middleware.go @@ -117,7 +117,6 @@ func Get(config *config.Config, tenantMgr *metadata.TenantManager, txMgr *transa } unaryInterceptors = append(unaryInterceptors, []grpc.UnaryServerInterceptor{ - //grpctrace.UnaryServerInterceptor(grpctrace.WithServiceName(util.Service)), pprofUnaryServerInterceptor(), quotaUnaryServerInterceptor(), grpc_logging.UnaryServerInterceptor(grpc_zerolog.InterceptorLogger(sampledTaggedLogger)), diff --git a/server/middleware/tracing.go b/server/middleware/tracing.go index 74fb3c529..f050417fd 100644 --- a/server/middleware/tracing.go +++ b/server/middleware/tracing.go @@ -23,11 +23,15 @@ import ( "google.golang.org/grpc" ) +const ( + TraceSpanType string = "rpc" +) + func traceUnary() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { var finisher func() grpcMeta := metrics.GetGrpcEndPointMetadataFromFullMethod(ctx, info.FullMethod, "unary") - spanMeta := metrics.NewSpanMeta(util.Service, info.FullMethod, "unary_rpc", grpcMeta.GetTags()) + spanMeta := metrics.NewSpanMeta(util.Service, info.FullMethod, TraceSpanType, grpcMeta.GetTags()) ctx, finisher = spanMeta.StartTracing(ctx, false) defer finisher() resp, err := handler(ctx, req) @@ -41,7 +45,7 @@ func traceStream() grpc.StreamServerInterceptor { wrapped := middleware.WrapServerStream(stream) wrapped.WrappedContext = stream.Context() grpcMeta := metrics.GetGrpcEndPointMetadataFromFullMethod(wrapped.WrappedContext, info.FullMethod, "stream") - spanMeta := metrics.NewSpanMeta(util.Service, info.FullMethod, "stream_rpc", grpcMeta.GetTags()) + spanMeta := metrics.NewSpanMeta(util.Service, info.FullMethod, TraceSpanType, grpcMeta.GetTags()) wrapped.WrappedContext, finisher = spanMeta.StartTracing(wrapped.WrappedContext, false) defer finisher() wrapper := &recvWrapper{wrapped} From d9f53fc74669d55d4a36607bf9a3c03cb39c3f85 Mon Sep 17 00:00:00 2001 From: Peter Boros Date: Wed, 3 Aug 2022 15:09:12 +0100 Subject: [PATCH 7/8] feat: traces catching errors (#396) --- server/metrics/tracing.go | 36 +++++++++++++++++++++++++++++++----- server/middleware/tracing.go | 6 ++++++ store/kv/kv.go | 1 + store/search/ts.go | 1 + 4 files changed, 39 insertions(+), 5 deletions(-) diff --git a/server/metrics/tracing.go b/server/metrics/tracing.go index 3c6654993..832088314 100644 --- a/server/metrics/tracing.go +++ b/server/metrics/tracing.go @@ -16,9 +16,13 @@ package metrics import ( "context" - + "errors" + "github.com/apple/foundationdb/bindings/go/src/fdb" + api "github.com/tigrisdata/tigris/api/server/v1" "github.com/tigrisdata/tigris/server/config" + "google.golang.org/grpc/status" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + "strconv" ) const ( @@ -32,6 +36,7 @@ type SpanMeta struct { resourceName string spanType string tags map[string]string + span tracer.Span } func NewSpanMeta(serviceName string, resourceName string, spanType string, tags map[string]string) *SpanMeta { @@ -61,12 +66,33 @@ func (s *SpanMeta) StartTracing(ctx context.Context, childOnly bool) (context.Co // There is no parent span, no need to start tracing here return ctx, func() {} } - span := tracer.StartSpan(TraceServiceName, spanOpts...) + s.span = tracer.StartSpan(TraceServiceName, spanOpts...) for k, v := range s.tags { - span.SetTag(k, v) + s.span.SetTag(k, v) } - ctx = tracer.ContextWithSpan(ctx, span) + ctx = tracer.ContextWithSpan(ctx, s.span) return ctx, func() { - span.Finish() + s.span.Finish() + } +} + +func (s *SpanMeta) FinishWithError(err error) { + if s.span == nil { + return + } + errCode := status.Code(err) + s.span.SetTag("grpc.code", errCode.String()) + var tigrisErr *api.TigrisError + var fdbErr fdb.Error + if errors.As(err, &fdbErr) { + s.span.SetTag("error_source", "fdb") + s.span.SetTag("fdb_error_code", strconv.Itoa(fdbErr.Code)) + } + if errors.As(err, &tigrisErr) { + s.span.SetTag("error_source", "tigris_server") + s.span.SetTag("tigris_server_error", tigrisErr.Code.String()) } + // TODO: handle known search errors + finishOptions := []tracer.FinishOption{tracer.WithError(err)} + s.span.Finish(finishOptions...) } diff --git a/server/middleware/tracing.go b/server/middleware/tracing.go index f050417fd..125aa79aa 100644 --- a/server/middleware/tracing.go +++ b/server/middleware/tracing.go @@ -35,6 +35,9 @@ func traceUnary() func(ctx context.Context, req interface{}, info *grpc.UnarySer ctx, finisher = spanMeta.StartTracing(ctx, false) defer finisher() resp, err := handler(ctx, req) + if err != nil { + spanMeta.FinishWithError(err) + } return resp, err } } @@ -50,6 +53,9 @@ func traceStream() grpc.StreamServerInterceptor { defer finisher() wrapper := &recvWrapper{wrapped} err := handler(srv, wrapper) + if err != nil { + spanMeta.FinishWithError(err) + } return err } } diff --git a/store/kv/kv.go b/store/kv/kv.go index a51ffd692..2124ac7ce 100644 --- a/store/kv/kv.go +++ b/store/kv/kv.go @@ -125,6 +125,7 @@ func measureLow(ctx context.Context, name string, f func() error) { metrics.FdbErrorRequests.Tagged(tags).Counter("unknown").Inc(1) } } + spanMeta.FinishWithError(err) } func (m *KeyValueStoreImplWithMetrics) measure(ctx context.Context, name string, f func() error) { diff --git a/store/search/ts.go b/store/search/ts.go index 54d3d80f8..671db9d44 100644 --- a/store/search/ts.go +++ b/store/search/ts.go @@ -59,6 +59,7 @@ func (m *storeImplWithMetrics) measure(ctx context.Context, name string, f func( if config.DefaultConfig.Metrics.Search.Counters { metrics.SearchErrorRequests.Tagged(tags).Counter("unknown").Inc(1) } + spanMeta.FinishWithError(err) } func (m *storeImplWithMetrics) CreateCollection(ctx context.Context, schema *tsApi.CollectionSchema) (err error) { From 49285cdfd05c3d4fb67ff0b6c186318bea475101 Mon Sep 17 00:00:00 2001 From: Himank Chaudhary Date: Wed, 3 Aug 2022 12:12:35 -0700 Subject: [PATCH 8/8] fix: fixing encoding issues --- lib/json/encoder.go | 44 ++++++++++++++++++++++++++++ lib/uuid/uuid.go | 14 +++++++++ server/services/v1/search_indexer.go | 15 ++++++---- server/services/v1/search_reader.go | 6 ++-- 4 files changed, 70 insertions(+), 9 deletions(-) create mode 100644 lib/json/encoder.go diff --git a/lib/json/encoder.go b/lib/json/encoder.go new file mode 100644 index 000000000..86654ab85 --- /dev/null +++ b/lib/json/encoder.go @@ -0,0 +1,44 @@ +// Copyright 2022 Tigris Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package json + +import ( + "bytes" + + jsoniter "github.com/json-iterator/go" +) + +func Encode(data map[string]any) ([]byte, error) { + var buffer bytes.Buffer + encoder := jsoniter.NewEncoder(&buffer) + err := encoder.Encode(data) + if err != nil { + return nil, err + } + + return buffer.Bytes(), nil +} + +func Decode(data []byte) (map[string]any, error) { + var decoded map[string]any + + decoder := jsoniter.NewDecoder(bytes.NewReader(data)) + decoder.UseNumber() + if err := decoder.Decode(&decoded); err != nil { + return nil, err + } + + return decoded, nil +} diff --git a/lib/uuid/uuid.go b/lib/uuid/uuid.go index bd3d0db74..fd78e83b1 100644 --- a/lib/uuid/uuid.go +++ b/lib/uuid/uuid.go @@ -1,3 +1,17 @@ +// Copyright 2022 Tigris Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package uuid import uuid2 "github.com/google/uuid" diff --git a/server/services/v1/search_indexer.go b/server/services/v1/search_indexer.go index a8cf1ebdc..fb06c0434 100644 --- a/server/services/v1/search_indexer.go +++ b/server/services/v1/search_indexer.go @@ -25,6 +25,7 @@ import ( "github.com/apple/foundationdb/bindings/go/src/fdb/subspace" jsoniter "github.com/json-iterator/go" "github.com/tigrisdata/tigris/internal" + "github.com/tigrisdata/tigris/lib/json" "github.com/tigrisdata/tigris/schema" "github.com/tigrisdata/tigris/server/metadata" "github.com/tigrisdata/tigris/server/transaction" @@ -155,7 +156,7 @@ func CreateSearchKey(table []byte, fdbKey []byte) (string, error) { case string: value = t case []byte: - value = string(t) + value = base64.StdEncoding.EncodeToString(t) } return value, nil } else { @@ -165,10 +166,9 @@ func CreateSearchKey(table []byte, fdbKey []byte) (string, error) { } func PackSearchFields(data *internal.TableData, collection *schema.DefaultCollection, id string) ([]byte, error) { - var err error // better to decode it and then update the JSON - var decData map[string]any - if err = jsoniter.Unmarshal(data.RawData, &decData); err != nil { + decData, err := json.Decode(data.RawData) + if err != nil { return nil, err } @@ -195,7 +195,12 @@ func PackSearchFields(data *internal.TableData, collection *schema.DefaultCollec decData[schema.ReservedFields[schema.UpdatedAt]] = data.UpdatedAt.UnixNano() } - return jsoniter.Marshal(decData) + encoded, err := json.Encode(decData) + if err != nil { + return nil, err + } + + return encoded, nil } func UnpackSearchFields(doc map[string]interface{}, collection *schema.DefaultCollection) (string, *internal.TableData, map[string]interface{}, error) { diff --git a/server/services/v1/search_reader.go b/server/services/v1/search_reader.go index c57763d77..e3ecf1b2c 100644 --- a/server/services/v1/search_reader.go +++ b/server/services/v1/search_reader.go @@ -17,8 +17,8 @@ package v1 import ( "context" - jsoniter "github.com/json-iterator/go" api "github.com/tigrisdata/tigris/api/server/v1" + "github.com/tigrisdata/tigris/lib/json" "github.com/tigrisdata/tigris/query/filter" "github.com/tigrisdata/tigris/query/read" qsearch "github.com/tigrisdata/tigris/query/search" @@ -94,10 +94,8 @@ func (p *page) readRow(row *Row) bool { } var rawData []byte - // marshal the doc as bytes - rawData, p.err = jsoniter.Marshal(doc) - if p.err != nil { + if rawData, p.err = json.Encode(doc); p.err != nil { return false }