Skip to content

Commit

Permalink
Merge pull request #1180 from tigrisdata/main
Browse files Browse the repository at this point in the history
Release Beta
  • Loading branch information
himank authored May 12, 2023
2 parents fef1660 + 4c5f14a commit 9d7af24
Show file tree
Hide file tree
Showing 26 changed files with 400 additions and 77 deletions.
3 changes: 2 additions & 1 deletion api/server/v1/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ const (
HeaderSchemaSignOff = "Tigris-Schema-Sign-Off"
HeaderBypassAuthCache = "Tigris-Bypass-Auth-Cache" // #nosec G101
HeaderReadSearchDataFromStorage = "Tigris-Search-Read-From-Storage"
HeaderServerTiming = "Server-Timing"
)

func CustomMatcher(key string) (string, bool) {
key = textproto.CanonicalMIMEHeaderKey(key)
switch key {
case HeaderRequestTimeout, HeaderAccessControlAllowOrigin, SetCookie, Cookie, HeaderAccept:
case HeaderRequestTimeout, HeaderAccessControlAllowOrigin, SetCookie, Cookie, HeaderAccept, HeaderServerTiming:
return key, true
default:
if strings.HasPrefix(key, HeaderPrefix) {
Expand Down
9 changes: 8 additions & 1 deletion query/filter/key_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,5 +460,12 @@ func encodeString(val string) any {
}

func fieldsToQueryableFields(fields []*schema.Field) []*schema.QueryableField {
return schema.NewQueryableFieldsBuilder().BuildQueryableFields(fields, nil, false)
builder := schema.NewQueryableFieldsBuilder()
qf := make([]*schema.QueryableField, len(fields))

for i, field := range fields {
qf[i] = builder.NewQueryableField(field.Name(), field, nil)
}

return qf
}
8 changes: 6 additions & 2 deletions schema/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func NewDefaultCollection(id uint32, schVer uint32, factory *Factory, schemas Ve
if implicitSearchIndex != nil {
prevVersionInSearch = implicitSearchIndex.prevVersionInSearch
}
queryableFields := NewQueryableFieldsBuilder().BuildQueryableFields(factory.Fields, prevVersionInSearch, true)
queryableFields := NewQueryableFieldsBuilder().BuildQueryableFields(factory.Fields, prevVersionInSearch, factory.Indexes.IndexMetadata)

schemaDeltas, err := buildSchemaDeltas(schemas)
if err != nil {
Expand Down Expand Up @@ -216,6 +216,10 @@ func (*DefaultCollection) SecondaryIndexKeyword() string {
return "skey"
}

func (d *DefaultCollection) SecondaryIndexMetadata() bool {
return d.SecondaryIndexes.IndexMetadata
}

func (d *DefaultCollection) GetVersion() uint32 {
return d.SchVer
}
Expand Down Expand Up @@ -250,7 +254,7 @@ func (d *DefaultCollection) GetActiveIndexedFields() []*QueryableField {
func (d *DefaultCollection) GetWriteModeIndexes() []*QueryableField {
var indexed []*QueryableField
for _, q := range d.QueryableFields {
if q.Indexed && !d.SecondaryIndexes.IsWriteModeIndex(q.FieldName) {
if q.Indexed && !d.SecondaryIndexes.IsActiveIndex(q.FieldName) {
indexed = append(indexed, q)
}
}
Expand Down
7 changes: 3 additions & 4 deletions schema/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,16 +292,15 @@ var SupportedFieldProperties = container.NewHashSet(
// Indexes is to wrap different index that a collection can have.
type Indexes struct {
All []*Index

// Schema level set if the collection should index the `created_at` and `updated_at` metadata
IndexMetadata bool
}

func (i *Indexes) IsActiveIndex(name string) bool {
return i.indexesHasStateState(name, INDEX_ACTIVE)
}

func (i *Indexes) IsWriteModeIndex(name string) bool {
return i.indexesHasStateState(name, INDEX_WRITE_MODE)
}

func (i *Indexes) indexesHasStateState(name string, state IndexState) bool {
for _, idx := range i.All {
if idx.Name == name {
Expand Down
34 changes: 18 additions & 16 deletions schema/queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (*QueryableFieldsBuilder) NewQueryableField(name string, f *Field, fieldsIn
return q
}

func (builder *QueryableFieldsBuilder) BuildQueryableFields(fields []*Field, fieldsInSearch []tsApi.Field, includeMetadata bool) []*QueryableField {
func (builder *QueryableFieldsBuilder) BuildQueryableFields(fields []*Field, fieldsInSearch []tsApi.Field, indexMetadata bool) []*QueryableField {
var queryableFields []*QueryableField

for _, f := range fields {
Expand All @@ -209,21 +209,23 @@ func (builder *QueryableFieldsBuilder) BuildQueryableFields(fields []*Field, fie
}
}

if includeMetadata {
ptrTrue := true
// Allowing metadata fields to be queryable. User provided reserved fields are rejected by FieldBuilder.
queryableFields = append(queryableFields, builder.NewQueryableField(ReservedFields[CreatedAt], &Field{
FieldName: ReservedFields[CreatedAt],
DataType: DateTimeType,
Indexed: &ptrTrue,
}, fieldsInSearch))

queryableFields = append(queryableFields, builder.NewQueryableField(ReservedFields[UpdatedAt], &Field{
FieldName: ReservedFields[UpdatedAt],
DataType: DateTimeType,
Indexed: &ptrTrue,
}, fieldsInSearch))
}
ptrFalse := false
// Allowing metadata fields to be queryable. User provided reserved fields are rejected by FieldBuilder.
queryableFields = append(queryableFields, builder.NewQueryableField(ReservedFields[CreatedAt], &Field{
FieldName: ReservedFields[CreatedAt],
DataType: DateTimeType,
Sorted: &ptrFalse,
SearchIndexed: &ptrFalse,
Indexed: &indexMetadata,
}, fieldsInSearch))

queryableFields = append(queryableFields, builder.NewQueryableField(ReservedFields[UpdatedAt], &Field{
FieldName: ReservedFields[UpdatedAt],
DataType: DateTimeType,
Sorted: &ptrFalse,
SearchIndexed: &ptrFalse,
Indexed: &indexMetadata,
}, fieldsInSearch))

return queryableFields
}
Expand Down
20 changes: 20 additions & 0 deletions schema/queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,23 @@ func TestBuildQueryableFields(t *testing.T) {
require.Equal(t, expTypes[i], q.SearchType)
}
}

func TestIncludeMetadata(t *testing.T) {
fields := []*Field{}
expTypes := []string{"int64", "int64"}
expFields := []string{"_tigris_created_at", "_tigris_updated_at"}

queryable := NewQueryableFieldsBuilder().BuildQueryableFields(fields, nil, true)
for i, q := range queryable {
require.Equal(t, expFields[i], q.FieldName)
require.Equal(t, expTypes[i], q.SearchType)
require.True(t, q.Indexed)
}

queryable = NewQueryableFieldsBuilder().BuildQueryableFields(fields, nil, false)
for i, q := range queryable {
require.Equal(t, expFields[i], q.FieldName)
require.Equal(t, expTypes[i], q.SearchType)
require.False(t, q.Indexed)
}
}
34 changes: 21 additions & 13 deletions schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,20 +200,27 @@ func (fb *FactoryBuilder) Build(collection string, reqSchema jsoniter.RawMessage
}
}

// Hard coded for now, this needs to be read from the schema at the top-level
indexMetadata := true
secondaryIndex := make([]*Index, 0)

if indexMetadata {
secondaryIndex = append(secondaryIndex, []*Index{
{
Name: ReservedFields[CreatedAt],
IdxType: SECONDARY_INDEX,
State: UNKNOWN,
},
{
Name: ReservedFields[UpdatedAt],
IdxType: SECONDARY_INDEX,
State: UNKNOWN,
},
}...)
}

// Create the secondary indexes with an unknown state
// to determine the state, tigris will need to read from the index metadata
secondaryIndex := []*Index{
{
Name: ReservedFields[CreatedAt],
IdxType: SECONDARY_INDEX,
State: UNKNOWN,
},
{
Name: ReservedFields[UpdatedAt],
IdxType: SECONDARY_INDEX,
State: UNKNOWN,
},
}
for _, field := range fields {
if field.Indexed != nil && *field.Indexed {
secondaryIndex = append(secondaryIndex, &Index{Name: field.Name(), IdxType: SECONDARY_INDEX, State: UNKNOWN, Fields: []*Field{field}})
Expand All @@ -229,7 +236,8 @@ func (fb *FactoryBuilder) Build(collection string, reqSchema jsoniter.RawMessage
State: INDEX_ACTIVE,
},
Indexes: &Indexes{
All: secondaryIndex,
All: secondaryIndex,
IndexMetadata: indexMetadata,
},
Name: collection,
Schema: reqSchema,
Expand Down
1 change: 1 addition & 0 deletions schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func TestCreateCollectionFromSchema(t *testing.T) {
State: UNKNOWN,
},
},
IndexMetadata: true,
},
Fields: []*Field{
{FieldName: "id", DataType: Int64Type, PrimaryKeyField: &b},
Expand Down
8 changes: 8 additions & 0 deletions server/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ type MetricsConfig struct {
Network NetworkMetricGroupConfig `mapstructure:"network" yaml:"network" json:"network"`
Auth AuthMetricsConfig `mapstructure:"auth" yaml:"auth" json:"auth"`
SecondaryIndex SecondaryIndexMetricsConfig `mapstructure:"secondary_index" yaml:"secondary_index" json:"secondary_index"`
Queue QueueMetricsConfig `mapstructure:"queue" yaml:"queue" json:"queue"`
}

type TimerConfig struct {
Expand Down Expand Up @@ -224,6 +225,10 @@ type SecondaryIndexMetricsConfig struct {
FilteredTags []string `mapstructure:"filtered_tags" yaml:"filtered_tags" json:"filtered_tags"`
}

type QueueMetricsConfig struct {
Enabled bool `mapstructure:"enabled" yaml:"enabled" json:"enabled"`
}

type WorkersConfig struct {
Enabled bool `mapstructure:"enabled" yaml:"enabled" json:"enabled"`
Count uint `mapstructure:"count" yaml:"count" json:"count"`
Expand Down Expand Up @@ -456,6 +461,9 @@ var DefaultConfig = Config{
Enabled: true,
FilteredTags: nil,
},
Queue: QueueMetricsConfig{
Enabled: true,
},
},
Profiling: ProfilingConfig{
Enabled: false,
Expand Down
4 changes: 4 additions & 0 deletions server/metrics/measurement.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ func (m *Measurement) GetAuthErrorTags(err error) map[string]string {
return filterTags(standardizeTags(mergeTags(m.tags, getTagsForError(err)), getAuthErrorTagKeys()), config.DefaultConfig.Metrics.Auth.FilteredTags)
}

func (m *Measurement) TimeSinceStart() time.Duration {
return time.Since(m.startedAt)
}

func (m *Measurement) SaveMeasurementToContext(ctx context.Context) (context.Context, error) {
if m.datadogSpan == nil && m.jaegerSpan == nil {
return nil, fmt.Errorf("parent span was not created")
Expand Down
5 changes: 5 additions & 0 deletions server/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ func InitializeMetrics() func() {
initializeSecondaryIndexScopes()
}

if cfg.Queue.Enabled {
QueueMetrics = root.SubScope("queue")
initializeQueueScopes()
}

// Metrics for Metronome - external billing service
MetronomeMetrics = root.SubScope("metronome")
initializeMetronomeScopes()
Expand Down
51 changes: 51 additions & 0 deletions server/metrics/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2022-2023 Tigris Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"github.com/uber-go/tally"
)

var (
QueueMetrics tally.Scope
QueueErrors tally.Scope
QueueOk tally.Scope
)

func initializeQueueScopes() {
QueueErrors = QueueMetrics.SubScope("errors")
QueueOk = QueueMetrics.SubScope("ok")
}

func SetQueueSize(size int) {
if QueueMetrics == nil {
return
}
QueueOk.Gauge("size").Update(float64(size))
}

func IncFailedJobError() {
if QueueMetrics == nil {
return
}
QueueErrors.Counter("job").Inc(1)
}

func IncFailedWorkerError() {
if QueueMetrics == nil {
return
}
QueueErrors.Counter("worker").Inc(1)
}
43 changes: 43 additions & 0 deletions server/metrics/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2022-2023 Tigris Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"testing"

"github.com/tigrisdata/tigris/server/config"
)

func TestQueueMetrics(t *testing.T) {
config.DefaultConfig.Tracing.Enabled = true
config.DefaultConfig.Metrics.Enabled = true
InitializeMetrics()

t.Run("enabled", func(t *testing.T) {
SetQueueSize(10)
IncFailedJobError()
IncFailedWorkerError()
})

t.Run("disabled", func(t *testing.T) {
save := QueueMetrics
t.Cleanup(func() { QueueMetrics = save })

QueueMetrics = nil
SetQueueSize(10)
IncFailedJobError()
IncFailedWorkerError()
})
}
13 changes: 9 additions & 4 deletions server/middleware/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ var (
)

const (
CookieMaxAgeKey = "Expires"
CookieMaxAgeKey = "Expires"
ServerTimingHeader = "Server-Timing"
)

func headersUnaryServerInterceptor() func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
startTime := time.Now()
resp, err := handler(ctx, req)
callHeaders := metadata.New(map[string]string{})

Expand All @@ -43,6 +45,8 @@ func headersUnaryServerInterceptor() func(ctx context.Context, req any, info *gr
expirationTime := time.Now().Add(MaximumTimeout + 2*time.Second)
callHeaders.Append(api.SetCookie, fmt.Sprintf("%s=%s;%s=%s", api.HeaderTxID, ty.GetTxCtx().GetId(), CookieMaxAgeKey, expirationTime.Format(time.RFC1123)))
}

callHeaders.Append(ServerTimingHeader, getServerTimingValue(time.Since(startTime)))
if err := grpc.SendHeader(ctx, metadata.Join(OutgoingHeaders, callHeaders)); err != nil {
return nil, err
}
Expand All @@ -53,9 +57,10 @@ func headersUnaryServerInterceptor() func(ctx context.Context, req any, info *gr

func headersStreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if err := grpc.SendHeader(stream.Context(), OutgoingStreamHeaders); err != nil {
return err
}
return handler(srv, stream)
}
}

func getServerTimingValue(dur time.Duration) string {
return fmt.Sprintf("total;dur=%d", dur.Milliseconds())
}
Loading

0 comments on commit 9d7af24

Please sign in to comment.