From a244d33a68d4f64696493d748efc2196d989466d Mon Sep 17 00:00:00 2001 From: Peter Boros Date: Thu, 18 Aug 2022 18:21:29 +0100 Subject: [PATCH 1/5] feat: Report size metrics periodically. (#436) --- server/config/options.go | 24 +++++---- server/quota/quota.go | 106 ++++++++++++++++++++++++++++++++----- server/quota/quota_test.go | 6 +-- 3 files changed, 109 insertions(+), 27 deletions(-) diff --git a/server/config/options.go b/server/config/options.go index 005235c08..852b87760 100644 --- a/server/config/options.go +++ b/server/config/options.go @@ -145,11 +145,13 @@ var DefaultConfig = Config{ EnableHeap: true, }, Quota: QuotaConfig{ - Enabled: false, - RateLimit: 1000, // requests per second both reads and writes - WriteThroughputLimit: 10000000, // bytes per second - ReadThroughputLimit: 10000000, // bytes per second - DataSizeLimit: 10000000000, // bytes + Enabled: false, + RateLimit: 1000, // requests per second both reads and writes + WriteThroughputLimit: 10000000, // bytes per second + ReadThroughputLimit: 10000000, // bytes per second + DataSizeLimit: 10000000000, // bytes + LimitUpdateInterval: 5, // seconds + TenantSizeRefreshInterval: 60, // seconds }, Observability: ObservabilityConfig{ Enabled: false, @@ -170,9 +172,11 @@ type SearchConfig struct { } type QuotaConfig struct { - Enabled bool - RateLimit int `mapstructure:"rate_limit" yaml:"rate_limit" json:"rate_limit"` - WriteThroughputLimit int `mapstructure:"write_throughput_limit" yaml:"write_throughput_limit" json:"write_throughput_limit"` - ReadThroughputLimit int `mapstructure:"read_throughput_limit" yaml:"read_throughput_limit" json:"read_throughput_limit"` - DataSizeLimit int64 `mapstructure:"data_size_limit" yaml:"data_size_limit" json:"data_size_limit"` + Enabled bool + RateLimit int `mapstructure:"rate_limit" yaml:"rate_limit" json:"rate_limit"` + WriteThroughputLimit int `mapstructure:"write_throughput_limit" yaml:"write_throughput_limit" json:"write_throughput_limit"` + ReadThroughputLimit int `mapstructure:"read_throughput_limit" yaml:"read_throughput_limit" json:"read_throughput_limit"` + DataSizeLimit int64 `mapstructure:"data_size_limit" yaml:"data_size_limit" json:"data_size_limit"` + LimitUpdateInterval int64 `mapstructure:"limit_update_interval" yaml:"limit_update_interval" json:"limit_update_interval"` + TenantSizeRefreshInterval int64 `mapstructure:"tenant_size_refresh_interval" yaml:"tenant_size_refresh_interval" json:"tenant_size_refresh_interval"` } diff --git a/server/quota/quota.go b/server/quota/quota.go index 39bc1e132..79a8e9242 100644 --- a/server/quota/quota.go +++ b/server/quota/quota.go @@ -19,6 +19,10 @@ import ( "sync" "time" + "github.com/tigrisdata/tigris/schema" + + ulog "github.com/tigrisdata/tigris/util/log" + api "github.com/tigrisdata/tigris/api/server/v1" "github.com/tigrisdata/tigris/server/config" "github.com/tigrisdata/tigris/server/metadata" @@ -29,20 +33,20 @@ import ( ) var ( - sizeLimitUpdateInterval int64 = 5 // seconds - ErrRateExceeded = api.Errorf(api.Code_RESOURCE_EXHAUSTED, "request rate limit exceeded") ErrThroughputExceeded = api.Errorf(api.Code_RESOURCE_EXHAUSTED, "request throughput limit exceeded") ErrStorageSizeExceeded = api.Errorf(api.Code_RESOURCE_EXHAUSTED, "data size limit exceeded") ) type State struct { - Rate *rate.Limiter - WriteThroughput *rate.Limiter - ReadThroughput *rate.Limiter - Size atomic.Int64 - SizeUpdateAt atomic.Int64 - SizeLock sync.Mutex + Rate *rate.Limiter + WriteThroughput *rate.Limiter + ReadThroughput *rate.Limiter + Size atomic.Int64 + SizeUpdateAt atomic.Int64 + TenantSizeUpdateAt atomic.Int64 + SizeLock sync.Mutex + TenantSizeLock sync.Mutex } type Manager struct { @@ -61,7 +65,7 @@ func Init(t *metadata.TenantManager, tx *transaction.Manager, c *config.QuotaCon // Allow checks rate, write throughput and storage size limits for the namespace // and returns error if at least one of them is exceeded func Allow(ctx context.Context, namespace string, reqSize int) error { - if !mgr.cfg.Enabled { + if !config.DefaultConfig.Quota.Enabled { return nil } return mgr.check(ctx, namespace, reqSize) @@ -102,14 +106,88 @@ func (m *Manager) check(ctx context.Context, namespace string, size int) error { return ErrThroughputExceeded } - return m.checkStorageSize(ctx, namespace, s, size) + return m.checkStorage(ctx, namespace, s, size) +} + +func getDbSize(ctx context.Context, tenant *metadata.Tenant, tx transaction.Tx, dbName string) int64 { + db, err := tenant.GetDatabase(ctx, tx, dbName) + if err != nil { + ulog.E(err) + } + dbSize, err := tenant.DatabaseSize(ctx, db) + if err != nil { + ulog.E(err) + } + return dbSize +} + +func getCollSize(ctx context.Context, tenant *metadata.Tenant, db *metadata.Database, coll *schema.DefaultCollection) int64 { + collSize, err := tenant.CollectionSize(ctx, db, coll) + if err != nil { + ulog.E(err) + } + return collSize +} + +func (m *Manager) updateTenantSize(ctx context.Context, namespace string) { + if m.txMgr == nil { + return + } + tenant, err := m.tenantMgr.GetTenant(ctx, namespace, m.txMgr) + if err != nil { + ulog.E(err) + // Could not determine tenant, just exit + return + } + + tx, err := m.txMgr.StartTx(ctx) + if err != nil { + ulog.E(err) + return + } + + for _, dbName := range tenant.ListDatabases(ctx, tx) { + metrics.UpdateDbSizeMetrics(namespace, dbName, getDbSize(ctx, tenant, tx, dbName)) + db, err := tenant.GetDatabase(ctx, tx, dbName) + if err != nil { + ulog.E(err) + return + } + for _, coll := range db.ListCollection() { + metrics.UpdateCollectionSizeMetrics(namespace, dbName, coll.Name, getCollSize(ctx, tenant, db, coll)) + } + } + + err = tx.Commit(ctx) + if err != nil { + err := tx.Rollback(ctx) + if err != nil { + ulog.E(err) + } + } } -func (m *Manager) checkStorageSize(ctx context.Context, namespace string, s *State, size int) error { +func (m *Manager) updateTenantMetrics(ctx context.Context, namespace string, s *State) { sz := s.Size.Load() + currentTimeStamp := time.Now().Unix() + + if currentTimeStamp >= s.TenantSizeUpdateAt.Load()+m.cfg.TenantSizeRefreshInterval { + s.TenantSizeLock.Lock() + defer s.TenantSizeLock.Unlock() - if time.Now().Unix() < s.SizeUpdateAt.Load()+sizeLimitUpdateInterval { + s.TenantSizeUpdateAt.Store(currentTimeStamp) metrics.UpdateNameSpaceSizeMetrics(namespace, sz) + m.updateTenantSize(ctx, namespace) + } +} + +func (m *Manager) checkStorage(ctx context.Context, namespace string, s *State, size int) error { + sz := s.Size.Load() + currentTimeStamp := time.Now().Unix() + + m.updateTenantMetrics(ctx, namespace, s) + + if currentTimeStamp < s.SizeUpdateAt.Load()+m.cfg.LimitUpdateInterval { if sz+int64(size) >= m.cfg.DataSizeLimit { return ErrStorageSizeExceeded } @@ -119,8 +197,8 @@ func (m *Manager) checkStorageSize(ctx context.Context, namespace string, s *Sta s.SizeLock.Lock() defer s.SizeLock.Unlock() - if time.Now().Unix() >= s.SizeUpdateAt.Load()+sizeLimitUpdateInterval { - s.SizeUpdateAt.Store(time.Now().Unix()) + if currentTimeStamp >= s.SizeUpdateAt.Load()+m.cfg.LimitUpdateInterval { + s.SizeUpdateAt.Store(currentTimeStamp) t, err := m.tenantMgr.GetTenant(ctx, namespace, m.txMgr) if err != nil { diff --git a/server/quota/quota_test.go b/server/quota/quota_test.go index 169addf42..aa3bbd5b5 100644 --- a/server/quota/quota_test.go +++ b/server/quota/quota_test.go @@ -98,14 +98,14 @@ func TestQuotaManager(t *testing.T) { require.NoError(t, err) } - sizeLimitUpdateInterval = 0 + m.cfg.LimitUpdateInterval = 0 require.Equal(t, ErrStorageSizeExceeded, m.check(ctx, ns, 0)) - sizeLimitUpdateInterval = 1 + m.cfg.LimitUpdateInterval = 1 require.Equal(t, ErrStorageSizeExceeded, m.check(ctx, ns, 0)) require.NoError(t, kvStore.DropTable(ctx, table)) - sizeLimitUpdateInterval = 0 + m.cfg.LimitUpdateInterval = 0 require.NoError(t, m.check(ctx, ns, 0)) From 73d9b041bed27d9f85c1718b091ba3334f081ded Mon Sep 17 00:00:00 2001 From: Peter Boros Date: Thu, 18 Aug 2022 18:54:21 +0100 Subject: [PATCH 2/5] chore: Remove Tx dependency from list database and list collections path (#438) --- server/metadata/tenant.go | 4 ++-- server/metadata/tenant_test.go | 22 +++++++++++----------- server/quota/quota.go | 26 ++++---------------------- server/quota/quota_test.go | 2 +- server/services/v1/query_runner.go | 4 ++-- 5 files changed, 20 insertions(+), 38 deletions(-) diff --git a/server/metadata/tenant.go b/server/metadata/tenant.go index 7e4365b40..575d01c0b 100644 --- a/server/metadata/tenant.go +++ b/server/metadata/tenant.go @@ -587,7 +587,7 @@ func (tenant *Tenant) DropDatabase(ctx context.Context, tx transaction.Tx, dbNam // GetDatabase returns the database object, or null if there is no database existing with the name passed in the param. // As reloading of tenant state is happening at the session manager layer so GetDatabase calls assume that the caller // just needs the state from the cache. -func (tenant *Tenant) GetDatabase(_ context.Context, _ transaction.Tx, dbName string) (*Database, error) { +func (tenant *Tenant) GetDatabase(_ context.Context, dbName string) (*Database, error) { tenant.Lock() defer tenant.Unlock() @@ -595,7 +595,7 @@ func (tenant *Tenant) GetDatabase(_ context.Context, _ transaction.Tx, dbName st } // ListDatabases is used to list all database available for this tenant. -func (tenant *Tenant) ListDatabases(_ context.Context, _ transaction.Tx) []string { +func (tenant *Tenant) ListDatabases(_ context.Context) []string { tenant.RLock() defer tenant.RUnlock() diff --git a/server/metadata/tenant_test.go b/server/metadata/tenant_test.go index 95a154036..10856bda0 100644 --- a/server/metadata/tenant_test.go +++ b/server/metadata/tenant_test.go @@ -194,12 +194,12 @@ func TestTenantManager_CreateDatabases(t *testing.T) { require.NoError(t, err) require.NoError(t, tenant.reload(ctx, tx, nil)) - db1, err := tenant.GetDatabase(ctx, tx, "tenant_db1") + db1, err := tenant.GetDatabase(ctx, "tenant_db1") require.NoError(t, err) require.Equal(t, "tenant_db1", db1.name) require.Equal(t, "tenant_db1", tenant.idToDatabaseMap[db1.id]) - db2, err := tenant.GetDatabase(ctx, tx, "tenant_db2") + db2, err := tenant.GetDatabase(ctx, "tenant_db2") require.NoError(t, err) require.Equal(t, "tenant_db2", db2.name) require.NoError(t, tx.Commit(ctx)) @@ -229,12 +229,12 @@ func TestTenantManager_CreateCollections(t *testing.T) { require.NoError(t, tenant.reload(ctx, tx, nil)) - db1, err := tenant.GetDatabase(ctx, tx, "tenant_db1") + db1, err := tenant.GetDatabase(ctx, "tenant_db1") require.NoError(t, err) require.Equal(t, "tenant_db1", db1.name) require.Equal(t, "tenant_db1", tenant.idToDatabaseMap[db1.id]) - db2, err := tenant.GetDatabase(ctx, tx, "tenant_db2") + db2, err := tenant.GetDatabase(ctx, "tenant_db2") require.NoError(t, err) require.Equal(t, "tenant_db2", db2.name) require.Equal(t, "tenant_db2", tenant.idToDatabaseMap[db2.id]) @@ -264,7 +264,7 @@ func TestTenantManager_CreateCollections(t *testing.T) { require.NoError(t, tenant.reload(ctx, tx, nil)) - db2, err = tenant.GetDatabase(ctx, tx, "tenant_db2") + db2, err = tenant.GetDatabase(ctx, "tenant_db2") require.NoError(t, err) collection := db2.GetCollection("test_collection") require.Equal(t, "test_collection", collection.Name) @@ -299,11 +299,11 @@ func TestTenantManager_DropCollection(t *testing.T) { require.NoError(t, tenant.reload(ctx, tx, nil)) - db1, err := tenant.GetDatabase(ctx, tx, "tenant_db1") + db1, err := tenant.GetDatabase(ctx, "tenant_db1") require.NoError(t, err) require.Equal(t, "tenant_db1", db1.name) - db2, err := tenant.GetDatabase(ctx, tx, "tenant_db2") + db2, err := tenant.GetDatabase(ctx, "tenant_db2") require.NoError(t, err) require.Equal(t, "tenant_db2", db2.name) @@ -399,9 +399,9 @@ func TestTenantManager_DataSize(t *testing.T) { factory, err := schema.Build("test_collection", jsSchema) require.NoError(t, err) - db1, err := tenant.GetDatabase(ctx, tx, "tenant_db1") + db1, err := tenant.GetDatabase(ctx, "tenant_db1") require.NoError(t, err) - db2, err := tenant.GetDatabase(ctx, tx, "tenant_db2") + db2, err := tenant.GetDatabase(ctx, "tenant_db2") require.NoError(t, err) require.NoError(t, tenant.CreateCollection(ctx, tx, db1, factory)) @@ -410,9 +410,9 @@ func TestTenantManager_DataSize(t *testing.T) { require.NoError(t, err) // create tenant2 dbs and collections - db21, err := tenant2.GetDatabase(ctx, tx, "tenant_db1") + db21, err := tenant2.GetDatabase(ctx, "tenant_db1") require.NoError(t, err) - db22, err := tenant2.GetDatabase(ctx, tx, "tenant_db2") + db22, err := tenant2.GetDatabase(ctx, "tenant_db2") require.NoError(t, err) require.NoError(t, tenant2.CreateCollection(ctx, tx, db21, factory)) diff --git a/server/quota/quota.go b/server/quota/quota.go index 79a8e9242..e8eec6d1d 100644 --- a/server/quota/quota.go +++ b/server/quota/quota.go @@ -109,11 +109,7 @@ func (m *Manager) check(ctx context.Context, namespace string, size int) error { return m.checkStorage(ctx, namespace, s, size) } -func getDbSize(ctx context.Context, tenant *metadata.Tenant, tx transaction.Tx, dbName string) int64 { - db, err := tenant.GetDatabase(ctx, tx, dbName) - if err != nil { - ulog.E(err) - } +func getDbSize(ctx context.Context, tenant *metadata.Tenant, db *metadata.Database) int64 { dbSize, err := tenant.DatabaseSize(ctx, db) if err != nil { ulog.E(err) @@ -140,31 +136,17 @@ func (m *Manager) updateTenantSize(ctx context.Context, namespace string) { return } - tx, err := m.txMgr.StartTx(ctx) - if err != nil { - ulog.E(err) - return - } - - for _, dbName := range tenant.ListDatabases(ctx, tx) { - metrics.UpdateDbSizeMetrics(namespace, dbName, getDbSize(ctx, tenant, tx, dbName)) - db, err := tenant.GetDatabase(ctx, tx, dbName) + for _, dbName := range tenant.ListDatabases(ctx) { + db, err := tenant.GetDatabase(ctx, dbName) if err != nil { ulog.E(err) return } + metrics.UpdateDbSizeMetrics(namespace, dbName, getDbSize(ctx, tenant, db)) for _, coll := range db.ListCollection() { metrics.UpdateCollectionSizeMetrics(namespace, dbName, coll.Name, getCollSize(ctx, tenant, db, coll)) } } - - err = tx.Commit(ctx) - if err != nil { - err := tx.Rollback(ctx) - if err != nil { - ulog.E(err) - } - } } func (m *Manager) updateTenantMetrics(ctx context.Context, namespace string, s *State) { diff --git a/server/quota/quota_test.go b/server/quota/quota_test.go index aa3bbd5b5..c2fd507c5 100644 --- a/server/quota/quota_test.go +++ b/server/quota/quota_test.go @@ -74,7 +74,7 @@ func TestQuotaManager(t *testing.T) { err = tenant.Reload(ctx, tx, []byte("aaa")) require.NoError(t, err) - db1, err := tenant.GetDatabase(ctx, tx, "tenant_db1") + db1, err := tenant.GetDatabase(ctx, "tenant_db1") require.NoError(t, err) require.NoError(t, tenant.CreateCollection(ctx, tx, db1, factory)) diff --git a/server/services/v1/query_runner.go b/server/services/v1/query_runner.go index ff7bf5132..de440bb23 100644 --- a/server/services/v1/query_runner.go +++ b/server/services/v1/query_runner.go @@ -153,7 +153,7 @@ func (runner *BaseQueryRunner) GetDatabase(ctx context.Context, tx transaction.T } // otherwise, simply read from the in-memory cache/disk. - db, err := tenant.GetDatabase(ctx, tx, dbName) + db, err := tenant.GetDatabase(ctx, dbName) if err != nil { return nil, err } @@ -1017,7 +1017,7 @@ func (runner *DatabaseQueryRunner) Run(ctx context.Context, tx transaction.Tx, t status: CreatedStatus, }, ctx, nil } else if runner.list != nil { - databaseList := tenant.ListDatabases(ctx, tx) + databaseList := tenant.ListDatabases(ctx) var databases = make([]*api.DatabaseInfo, len(databaseList)) for i, l := range databaseList { From 534c78030272f5ff31fc9bf339fbed2f7789517b Mon Sep 17 00:00:00 2001 From: Jigar Joshi Date: Thu, 18 Aug 2022 10:32:55 -0700 Subject: [PATCH 3/5] fix: Fixed the QueryTimeSeriesMetricsRequest marshaler to work with enum for HTTP/json --- api/server/v1/marshaler.go | 64 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/api/server/v1/marshaler.go b/api/server/v1/marshaler.go index df68c1978..379b4da14 100644 --- a/api/server/v1/marshaler.go +++ b/api/server/v1/marshaler.go @@ -18,6 +18,7 @@ import ( "encoding/json" "io" "net/url" + "strings" "time" "github.com/ajg/form" @@ -386,6 +387,69 @@ func (x *CreateOrUpdateCollectionRequest) UnmarshalJSON(data []byte) error { return nil } +// UnmarshalJSON on QueryTimeSeriesMetricsRequest. Handles enum. +func (x *QueryTimeSeriesMetricsRequest) UnmarshalJSON(data []byte) error { + var mp map[string]jsoniter.RawMessage + if err := jsoniter.Unmarshal(data, &mp); err != nil { + return err + } + for key, value := range mp { + switch key { + case "db": + if err := jsoniter.Unmarshal(value, &x.Db); err != nil { + return err + } + case "collection": + if err := jsoniter.Unmarshal(value, &x.Collection); err != nil { + return err + } + case "from": + if err := jsoniter.Unmarshal(value, &x.From); err != nil { + return err + } + case "to": + if err := jsoniter.Unmarshal(value, &x.To); err != nil { + return err + } + case "metric_name": + if err := jsoniter.Unmarshal(value, &x.MetricName); err != nil { + return err + } + case "space_aggregation": + var t string + if err := jsoniter.Unmarshal(value, &t); err != nil { + return err + } + switch strings.ToUpper(t) { + case "AVG": + x.SpaceAggregation = MetricQuerySpaceAggregation_AVG + case "MIN": + x.SpaceAggregation = MetricQuerySpaceAggregation_MIN + case "MAX": + x.SpaceAggregation = MetricQuerySpaceAggregation_MAX + case "SUM": + x.SpaceAggregation = MetricQuerySpaceAggregation_SUM + } + case "space_aggregated_by": + if err := jsoniter.Unmarshal(value, &x.SpaceAggregatedBy); err != nil { + return err + } + case "function": + var t string + if err := jsoniter.Unmarshal(value, &t); err != nil { + return err + } + switch strings.ToUpper(t) { + case "RATE": + x.Function = MetricQueryFunction_RATE + case "COUNT": + x.Function = MetricQueryFunction_COUNT + } + } + } + return nil +} + type collDesc struct { Collection string `json:"collection"` Metadata *CollectionMetadata `json:"metadata"` From bcbe0627ebe64a93fc97b7af8df9cc370b998e0d Mon Sep 17 00:00:00 2001 From: Jigar Joshi Date: Thu, 18 Aug 2022 13:56:06 -0700 Subject: [PATCH 4/5] fix: Added db and collection name restriction --- api/server/v1/validator.go | 16 ++++++++++++++-- schema/fields.go | 23 +++-------------------- test/v1/server/collection_test.go | 12 ++++++++++++ test/v1/server/database_test.go | 12 ++++++++++++ util/util.go | 21 +++++++++++++++++++++ 5 files changed, 62 insertions(+), 22 deletions(-) diff --git a/api/server/v1/validator.go b/api/server/v1/validator.go index 37731e120..77afa8658 100644 --- a/api/server/v1/validator.go +++ b/api/server/v1/validator.go @@ -14,6 +14,14 @@ package api +import ( + "regexp" + + "github.com/tigrisdata/tigris/util" +) + +var validNamePattern = regexp.MustCompile("^[a-zA-Z]+[a-zA-Z0-9_]+$") + type Validator interface { Validate() error } @@ -182,7 +190,9 @@ func isValidCollection(name string) error { if len(name) == 0 { return Errorf(Code_INVALID_ARGUMENT, "invalid collection name") } - + if !validNamePattern.MatchString(name) || util.LanguageKeywords.Contains(name) { + return Errorf(Code_INVALID_ARGUMENT, "invalid collection name") + } return nil } @@ -190,7 +200,9 @@ func isValidDatabase(name string) error { if len(name) == 0 { return Errorf(Code_INVALID_ARGUMENT, "invalid database name") } - + if !validNamePattern.MatchString(name) || util.LanguageKeywords.Contains(name) { + return Errorf(Code_INVALID_ARGUMENT, "invalid database name") + } return nil } diff --git a/schema/fields.go b/schema/fields.go index 6d868b547..7b15bec3b 100644 --- a/schema/fields.go +++ b/schema/fields.go @@ -21,6 +21,7 @@ import ( jsoniter "github.com/json-iterator/go" api "github.com/tigrisdata/tigris/api/server/v1" "github.com/tigrisdata/tigris/lib/set" + "github.com/tigrisdata/tigris/util" ) type FieldType int @@ -65,25 +66,7 @@ var FieldNames = [...]string{ var ( MsgFieldNameAsLanguageKeyword = "Invalid collection field name, It contains language keyword for fieldName = '%s'" MsgFieldNameInvalidPattern = "Invalid collection field name, field name can only contain [a-zA-Z0-9_$] and it can only start with [a-zA-Z_$] for fieldName = '%s'" - LanguageKeywords = set.New("abstract", "add", "alias", "and", "any", "args", "arguments", "array", - "as", "as?", "ascending", "assert", "async", "await", "base", "bool", "boolean", "break", "by", "byte", - "callable", "case", "catch", "chan", "char", "checked", "class", "clone", "const", "constructor", "continue", - "debugger", "decimal", "declare", "def", "default", "defer", "del", "delegate", "delete", "descending", "die", - "do", "double", "dynamic", "echo", "elif", "else", "elseif", "empty", "enddeclare", "endfor", "endforeach", - "endif", "endswitch", "endwhile", "enum", "equals", "eval", "event", "except", "exception", "exit", "explicit", - "export", "extends", "extern", "fallthrough", "false", "final", "finally", "fixed", "float", "fn", "for", - "foreach", "from", "fun", "func", "function", "get", "global", "go", "goto", "group", "if", "implements", - "implicit", "import", "in", "include", "include_once", "init", "instanceof", "insteadof", "int", "integer", - "interface", "internal", "into", "is", "isset", "join", "lambda", "let", "list", "lock", "long", "managed", - "map", "match", "module", "nameof", "namespace", "native", "new", "nint", "none", "nonlocal", "not", "notnull", - "nuint", "null", "number", "object", "of", "on", "operator", "or", "orderby", "out", "override", "package", - "params", "partial", "pass", "print", "private", "protected", "public", "raise", "range", "readonly", "record", - "ref", "remove", "require", "require_once", "return", "sbyte", "sealed", "select", "set", "short", "sizeof", - "stackalloc", "static", "strictfp", "string", "struct", "super", "switch", "symbol", "synchronized", "this", - "throw", "throws", "trait", "transient", "true", "try", "type", "typealias", "typeof", "uint", "ulong", - "unchecked", "unmanaged", "unsafe", "unset", "use", "ushort", "using", "val", "value", "var", "virtual", "void", - "volatile", "when", "where", "while", "with", "xor", "yield") - ValidFieldNamePattern = regexp.MustCompile(`^[a-zA-Z_$][a-zA-Z0-9_$]*$`) + ValidFieldNamePattern = regexp.MustCompile(`^[a-zA-Z_$][a-zA-Z0-9_$]*$`) ) const ( @@ -306,7 +289,7 @@ func (f *FieldBuilder) Build(isArrayElement bool) (*Field, error) { } // check for language keywords - if LanguageKeywords.Contains(strings.ToLower(f.FieldName)) { + if util.LanguageKeywords.Contains(strings.ToLower(f.FieldName)) { return nil, api.Errorf(api.Code_INVALID_ARGUMENT, MsgFieldNameAsLanguageKeyword, f.FieldName) } diff --git a/test/v1/server/collection_test.go b/test/v1/server/collection_test.go index eb228536a..d6d299365 100644 --- a/test/v1/server/collection_test.go +++ b/test/v1/server/collection_test.go @@ -74,6 +74,18 @@ func TestCreateCollection(t *testing.T) { }) } +func TestCreateCollectionInvalidName(t *testing.T) { + invalidCollectionName := []string{"", "test-coll", "1test-coll", "$testcoll", "testcoll$", "test$coll", "abstract", "yield"} + for _, name := range invalidCollectionName { + resp := createCollection(t, "valid_db_name", name, testCreateSchema) + resp.Status(http.StatusBadRequest). + JSON(). + Path("$.error"). + Object(). + ValueEqual("message", "invalid collection name") + } +} + func TestDropCollection(t *testing.T) { db, coll := setupTests(t) defer cleanupTests(t, db) diff --git a/test/v1/server/database_test.go b/test/v1/server/database_test.go index 038faf8de..93099b9dd 100644 --- a/test/v1/server/database_test.go +++ b/test/v1/server/database_test.go @@ -42,6 +42,18 @@ func TestCreateDatabase(t *testing.T) { ValueEqual("message", "database created successfully") } +func TestCreateDatabaseInvalidName(t *testing.T) { + invalidDbNames := []string{"", "1test-db", "test-db", "$testdb", "testdb$", "test$db", "abstract", "yield"} + for _, name := range invalidDbNames { + resp := createDatabase(t, name) + resp.Status(http.StatusBadRequest). + JSON(). + Path("$.error"). + Object(). + ValueEqual("message", "invalid database name") + } +} + func TestBeginTransaction(t *testing.T) { resp := beginTransaction(t, "test_db") cookieVal := resp.Cookie("Tigris-Tx-Id").Value() diff --git a/util/util.go b/util/util.go index 9159db4f7..2a6822f02 100644 --- a/util/util.go +++ b/util/util.go @@ -14,8 +14,29 @@ package util +import "github.com/tigrisdata/tigris/lib/set" + // Version of this build var Version string // Service program name used in logging and monitoring var Service string = "tigris-server" + +var LanguageKeywords = set.New("abstract", "add", "alias", "and", "any", "args", "arguments", "array", + "as", "as?", "ascending", "assert", "async", "await", "base", "bool", "boolean", "break", "by", "byte", + "callable", "case", "catch", "chan", "char", "checked", "class", "clone", "const", "constructor", "continue", + "debugger", "decimal", "declare", "def", "default", "defer", "del", "delegate", "delete", "descending", "die", + "do", "double", "dynamic", "echo", "elif", "else", "elseif", "empty", "enddeclare", "endfor", "endforeach", + "endif", "endswitch", "endwhile", "enum", "equals", "eval", "event", "except", "exception", "exit", "explicit", + "export", "extends", "extern", "fallthrough", "false", "final", "finally", "fixed", "float", "fn", "for", + "foreach", "from", "fun", "func", "function", "get", "global", "go", "goto", "group", "if", "implements", + "implicit", "import", "in", "include", "include_once", "init", "instanceof", "insteadof", "int", "integer", + "interface", "internal", "into", "is", "isset", "join", "lambda", "let", "list", "lock", "long", "managed", + "map", "match", "module", "nameof", "namespace", "native", "new", "nint", "none", "nonlocal", "not", "notnull", + "nuint", "null", "number", "object", "of", "on", "operator", "or", "orderby", "out", "override", "package", + "params", "partial", "pass", "print", "private", "protected", "public", "raise", "range", "readonly", "record", + "ref", "remove", "require", "require_once", "return", "sbyte", "sealed", "select", "set", "short", "sizeof", + "stackalloc", "static", "strictfp", "string", "struct", "super", "switch", "symbol", "synchronized", "this", + "throw", "throws", "trait", "transient", "true", "try", "type", "typealias", "typeof", "uint", "ulong", + "unchecked", "unmanaged", "unsafe", "unset", "use", "ushort", "using", "val", "value", "var", "virtual", "void", + "volatile", "when", "where", "while", "with", "xor", "yield") From d19ac5e6f9b4653f9ef2dc430fdd6a97c1f1c29b Mon Sep 17 00:00:00 2001 From: Adil Ansari Date: Thu, 18 Aug 2022 16:30:41 -0700 Subject: [PATCH 5/5] feat: Support for range queries on time fields (#429) * refactor: Index datetime fields as int64 * fix: addressing PR feedback * refactor: Remove schema dependency from lib --- lib/date/converter.go | 14 ++++++++ lib/date/converter_test.go | 45 +++++++++++++++++++++++++ query/filter/selector.go | 7 ++++ schema/collection.go | 15 +++++++-- schema/collection_test.go | 2 +- schema/fields.go | 6 ++-- schema/reserved.go | 16 ++++++--- schema/schema.go | 4 +++ server/services/v1/search_indexer.go | 50 ++++++++++++++++++++++------ 9 files changed, 140 insertions(+), 19 deletions(-) create mode 100644 lib/date/converter.go create mode 100644 lib/date/converter_test.go diff --git a/lib/date/converter.go b/lib/date/converter.go new file mode 100644 index 000000000..39c069ef9 --- /dev/null +++ b/lib/date/converter.go @@ -0,0 +1,14 @@ +package date + +import ( + "time" +) + +// ToUnixNano converts a time to Unix nano seconds +func ToUnixNano(format string, dateStr string) (int64, error) { + t, err := time.Parse(format, dateStr) + if err != nil { + return 0, err + } + return t.UnixNano(), nil +} diff --git a/lib/date/converter_test.go b/lib/date/converter_test.go new file mode 100644 index 000000000..717f9a80e --- /dev/null +++ b/lib/date/converter_test.go @@ -0,0 +1,45 @@ +package date + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "time" +) + +func TestToUnixNano(t *testing.T) { + validCases := []struct { + name string + date string + expected int64 + }{ + {"UTC RFC 3339", "2022-10-18T00:51:07+00:00", 1666054267000000000}, + {"UTC RFC 3339 Nano", "2022-10-18T00:51:07.528106+00:00", 1666054267528106000}, + {"IST RFC 3339", "2022-10-11T04:19:32+05:30", 1665442172000000000}, + {"IST RFC 3339 Nano", "2022-10-18T00:51:07.999999999+05:30", 1666034467999999999}, + {"No TZ RFC 3339", "2022-10-18T00:51:07Z", 1666054267000000000}, + } + + for _, v := range validCases { + t.Run(v.name, func(t *testing.T) { + actual, err := ToUnixNano(time.RFC3339Nano, v.date) + assert.NoError(t, err) + assert.Equal(t, v.expected, actual) + }) + } + + failureCases := []struct { + name string + date string + errorLike string + }{ + {"RFC 1123", "Mon, 02 Jan 2006 15:04:05 MST", "cannot parse"}, + } + + for _, v := range failureCases { + t.Run(v.name, func(t *testing.T) { + _, err := ToUnixNano(time.RFC3339Nano, v.date) + assert.ErrorContains(t, err, v.errorLike) + }) + } +} diff --git a/query/filter/selector.go b/query/filter/selector.go index e38716f81..e5bb7fc28 100644 --- a/query/filter/selector.go +++ b/query/filter/selector.go @@ -19,6 +19,7 @@ import ( "github.com/tigrisdata/tigris/schema" "github.com/tigrisdata/tigris/value" + "github.com/tigrisdata/tigris/lib/date" ) // Selector is a condition defined inside a filter. It has a field which corresponding the field on which condition @@ -88,6 +89,12 @@ func (s *Selector) ToSearchFilter() []string { case schema.DoubleType: // for double, we pass string in the filter to search backend return []string{fmt.Sprintf(op, s.Field.Name(), v.String())} + case schema.DateTimeType: + // encode into int64 + if nsec, err := date.ToUnixNano(schema.DateTimeFormat, v.String()); err == nil { + return []string{fmt.Sprintf(op, s.Field.Name(), nsec)} + } + } return []string{fmt.Sprintf(op, s.Field.Name(), v.AsInterface())} } diff --git a/schema/collection.go b/schema/collection.go index 911303a9f..cf5c78713 100644 --- a/schema/collection.go +++ b/schema/collection.go @@ -184,16 +184,27 @@ func GetSearchDeltaFields(existingFields []*QueryableField, incomingFields []*Fi } func buildSearchSchema(name string, queryableFields []*QueryableField) *tsApi.CollectionSchema { - var ptrTrue = true + var ptrTrue, ptrFalse = true, false var tsFields []tsApi.Field for _, s := range queryableFields { tsFields = append(tsFields, tsApi.Field{ - Name: s.FieldName, + Name: s.Name(), Type: s.SearchType, Facet: &s.Faceted, Index: &s.Indexed, Optional: &ptrTrue, }) + // Save original date as string to disk + if s.DataType == DateTimeType { + tsFields = append(tsFields, tsApi.Field{ + Name: ToSearchDateKey(s.Name()), + Type: toSearchFieldType(StringType), + Facet: &ptrFalse, + Index: &ptrFalse, + Sort: &ptrFalse, + Optional: &ptrTrue, + }) + } } return &tsApi.CollectionSchema{ diff --git a/schema/collection_test.go b/schema/collection_test.go index aab7db91f..4c118a5aa 100644 --- a/schema/collection_test.go +++ b/schema/collection_test.go @@ -282,7 +282,7 @@ func TestCollection_SearchSchema(t *testing.T) { schFactory, err := Build("t1", reqSchema) require.NoError(t, err) - expFlattenedFields := []string{"id", "id_32", "product", "id_uuid", "ts", "price", "simple_items", "simple_object.name", + expFlattenedFields := []string{"id", "id_32", "product", "id_uuid", "ts", ToSearchDateKey("ts"), "price", "simple_items", "simple_object.name", "simple_object.phone", "simple_object.address.street", "simple_object.details.nested_id", "simple_object.details.nested_obj.id", "simple_object.details.nested_obj.name", "simple_object.details.nested_array", "simple_object.details.nested_string", } diff --git a/schema/fields.go b/schema/fields.go index 7b15bec3b..15fe4a917 100644 --- a/schema/fields.go +++ b/schema/fields.go @@ -185,8 +185,10 @@ func toSearchFieldType(fieldType FieldType) string { return FieldNames[fieldType] case Int32Type, Int64Type: return FieldNames[fieldType] - case StringType, ByteType, UUIDType, DateTimeType: + case StringType, ByteType, UUIDType: return FieldNames[StringType] + case DateTimeType: + return FieldNames[Int64Type] case DoubleType: return searchDoubleType case ArrayType: @@ -393,7 +395,7 @@ func (q *QueryableField) Name() string { } func (q *QueryableField) ShouldPack() bool { - return q.DataType == ArrayType + return q.DataType == ArrayType || q.DataType == DateTimeType } func buildQueryableFields(fields []*Field) []*QueryableField { diff --git a/schema/reserved.go b/schema/reserved.go index 53cb6905f..674a6a855 100644 --- a/schema/reserved.go +++ b/schema/reserved.go @@ -21,13 +21,15 @@ const ( UpdatedAt Metadata IdToSearchKey + DateSearchKeyPrefix ) var ReservedFields = [...]string{ - CreatedAt: "created_at", - UpdatedAt: "updated_at", - Metadata: "metadata", - IdToSearchKey: "_tigris_id", + CreatedAt: "created_at", + UpdatedAt: "updated_at", + Metadata: "metadata", + IdToSearchKey: "_tigris_id", + DateSearchKeyPrefix: "_tigris_date_", } func IsReservedField(name string) bool { @@ -39,3 +41,9 @@ func IsReservedField(name string) bool { return false } + +// ToSearchDateKey can be used to generate storage field for search backend +// Original date strings are persisted as it is under this field +func ToSearchDateKey(key string) string { + return ReservedFields[DateSearchKeyPrefix] + key +} diff --git a/schema/schema.go b/schema/schema.go index 8d2e62439..3b523eaa2 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -15,6 +15,8 @@ package schema import ( + "time" + "github.com/buger/jsonparser" jsoniter "github.com/json-iterator/go" "github.com/pkg/errors" @@ -66,6 +68,8 @@ const ( PrimaryKeyIndexName = "pkey" AutoPrimaryKeyF = "id" PrimaryKeySchemaK = "primary_key" + // DateTimeFormat represents the supported date time format + DateTimeFormat = time.RFC3339Nano ) var ( diff --git a/server/services/v1/search_indexer.go b/server/services/v1/search_indexer.go index fb06c0434..91f1345e0 100644 --- a/server/services/v1/search_indexer.go +++ b/server/services/v1/search_indexer.go @@ -24,7 +24,9 @@ import ( "github.com/apple/foundationdb/bindings/go/src/fdb" "github.com/apple/foundationdb/bindings/go/src/fdb/subspace" jsoniter "github.com/json-iterator/go" + api "github.com/tigrisdata/tigris/api/server/v1" "github.com/tigrisdata/tigris/internal" + "github.com/tigrisdata/tigris/lib/date" "github.com/tigrisdata/tigris/lib/json" "github.com/tigrisdata/tigris/schema" "github.com/tigrisdata/tigris/server/metadata" @@ -179,12 +181,30 @@ func PackSearchFields(data *internal.TableData, collection *schema.DefaultCollec decData = FlattenObjects(decData) - // now if there is any array we need to pack it - for key, value := range decData { - if _, ok := value.([]any); ok { - // pack any array field - if decData[key], err = jsoniter.MarshalToString(value); err != nil { - return nil, err + // pack any date time or array fields here + for _, f := range collection.QueryableFields { + key, value := f.Name(), decData[f.Name()] + if value == nil { + continue + } + if f.ShouldPack() { + switch f.DataType { + case schema.ArrayType: + if decData[key], err = jsoniter.MarshalToString(value); err != nil { + return nil, err + } + case schema.DateTimeType: + if dateStr, ok := value.(string); ok { + t, err := date.ToUnixNano(schema.DateTimeFormat, dateStr) + if err != nil { + return nil, api.Errorf(api.Code_INVALID_ARGUMENT, "Validation failed, %s is not a valid date-time", dateStr) + } + decData[key] = t + // pack original date as string to a shadowed key + decData[schema.ToSearchDateKey(key)] = dateStr + } + default: + return nil, api.Errorf(api.Code_UNIMPLEMENTED, "Internal error!") } } } @@ -207,11 +227,21 @@ func UnpackSearchFields(doc map[string]interface{}, collection *schema.DefaultCo for _, f := range collection.QueryableFields { if f.ShouldPack() { if v, ok := doc[f.Name()]; ok { - var value interface{} - if err := jsoniter.UnmarshalFromString(v.(string), &value); err != nil { - return "", nil, nil, err + switch f.DataType { + case schema.ArrayType: + var value interface{} + if err := jsoniter.UnmarshalFromString(v.(string), &value); err != nil { + return "", nil, nil, err + } + doc[f.Name()] = value + case schema.DateTimeType: + // unpack original date from shadowed key + shadowedKey := schema.ToSearchDateKey(f.Name()) + doc[f.Name()] = doc[shadowedKey] + delete(doc, shadowedKey) + default: + return "", nil, nil, api.Errorf(api.Code_UNIMPLEMENTED, "Internal error!") } - doc[f.Name()] = value } } }