Skip to content

Commit

Permalink
Merge pull request #462 from tigrisdata/main
Browse files Browse the repository at this point in the history
Release Alpha
  • Loading branch information
efirs authored Aug 25, 2022
2 parents 6012f6c + 0fe7a03 commit 716715b
Show file tree
Hide file tree
Showing 14 changed files with 380 additions and 76 deletions.
25 changes: 25 additions & 0 deletions .github/workflows/cli-test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: cli-test
on:
push:
workflow_call:
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Check out code
uses: actions/checkout@v2
with:
submodules: true

- name: Install CLI
run: |
curl -sSL https://tigris.dev/cli-linux | sudo tar -xz -C .
wget https://raw.githubusercontent.com/tigrisdata/tigris-cli/main/tests/db.sh
- name: Start Tigris server
run: |
make run
sleep 5
- name: Run CLI tests
run: noup=1 /bin/bash db.sh
3 changes: 1 addition & 2 deletions query/filter/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
"fmt"

"github.com/buger/jsonparser"
"github.com/tigrisdata/tigris/lib/date"
"github.com/tigrisdata/tigris/schema"
ulog "github.com/tigrisdata/tigris/util/log"
"github.com/tigrisdata/tigris/value"
"github.com/tigrisdata/tigris/lib/date"
)

// Selector is a condition defined inside a filter. It has a field which corresponding the field on which condition
Expand Down Expand Up @@ -110,7 +110,6 @@ func (s *Selector) ToSearchFilter() []string {
if nsec, err := date.ToUnixNano(schema.DateTimeFormat, v.String()); err == nil {
return []string{fmt.Sprintf(op, s.Field.Name(), nsec)}
}

}
return []string{fmt.Sprintf(op, s.Field.Name(), v.AsInterface())}
}
Expand Down
4 changes: 2 additions & 2 deletions server/metadata/encoding/dictionary.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (r *reservedSubspace) allocateToken(ctx context.Context, tx transaction.Tx,
return 0, err
}

if err := tx.Replace(ctx, key, internal.NewTableData(UInt32ToByte(newReservedValue))); err != nil {
if err := tx.Replace(ctx, key, internal.NewTableData(UInt32ToByte(newReservedValue)), false); err != nil {
log.Debug().Str("key", key.String()).Uint32("value", newReservedValue).Msg("allocating token failed")
return 0, err
}
Expand Down Expand Up @@ -331,7 +331,7 @@ func (k *MetadataDictionary) delete(ctx context.Context, tx transaction.Tx, toDe
log.Debug().Str("key", toDeleteKey.String()).Str("type", encName).Msg("existing entry deletion succeed")

// now do insert because we need to fail if token is already assigned
if err := tx.Replace(ctx, newKey, internal.NewTableData(UInt32ToByte(newValue))); err != nil {
if err := tx.Replace(ctx, newKey, internal.NewTableData(UInt32ToByte(newValue)), false); err != nil {
log.Debug().Str("key", newKey.String()).Uint32("value", newValue).Err(err).Str("type", encName).Msg("encoding failed")
return err
}
Expand Down
2 changes: 1 addition & 1 deletion server/metadata/key_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (g *TableKeyGenerator) generateCounter(ctx context.Context, tx transaction.
return 0, err
}

if err := tx.Replace(ctx, key, internal.NewTableData(encoding.UInt32ToByte(id))); err != nil {
if err := tx.Replace(ctx, key, internal.NewTableData(encoding.UInt32ToByte(id)), false); err != nil {
return 0, err
}

Expand Down
110 changes: 87 additions & 23 deletions server/services/v1/query_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (runner *BaseQueryRunner) insertOrReplace(ctx context.Context, tx transacti
// as Int64 or timestamp to ensure uniqueness if multiple workers end up generating same timestamp.
err = tx.Insert(ctx, key, tableData)
} else {
err = tx.Replace(ctx, key, tableData)
err = tx.Replace(ctx, key, tableData, false)
}
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -357,11 +357,6 @@ func (runner *UpdateQueryRunner) Run(ctx context.Context, tx transaction.Tx, ten
return nil, ctx, err
}

iKeys, err := runner.buildKeysUsingFilter(tenant, db, collection, runner.req.Filter)
if err != nil {
return nil, ctx, err
}

var factory *update.FieldOperatorFactory
factory, err = update.BuildFieldOperators(runner.req.Fields)
if err != nil {
Expand Down Expand Up @@ -389,24 +384,56 @@ func (runner *UpdateQueryRunner) Run(ctx context.Context, tx transaction.Tx, ten
}
}

modifiedCount := int32(0)
for _, key := range iKeys {
// decode the fields now
modified := int32(0)
if modified, err = tx.Update(ctx, key, func(existing *internal.TableData) (*internal.TableData, error) {
merged, er := factory.MergeAndGet(existing.RawData)
if er != nil {
return nil, er
}
table, err := runner.encoder.EncodeTableName(tenant.GetNamespace(), db, collection)
if err != nil {
return nil, ctx, err
}

// ToDo: may need to change the schema version
return internal.NewTableDataWithTS(existing.CreatedAt, ts, merged), nil
}); ulog.E(err) {
if filter.None(runner.req.Filter) {
return nil, ctx, api.Errorf(api.Code_INVALID_ARGUMENT, "updating all documents is not allowed")
}

var iterator Iterator
reader := NewDatabaseReader(ctx, tx)
iKeys, err := runner.buildKeysUsingFilter(tenant, db, collection, runner.req.Filter)
if err == nil {
iterator, err = reader.KeyIterator(iKeys)
} else {
if iterator, err = reader.ScanTable(table); err != nil {
return nil, ctx, err
}
filterFactory := filter.NewFactory(collection.QueryableFields)
var filters []filter.Filter
if filters, err = filterFactory.Factorize(runner.req.Filter); err != nil {
return nil, ctx, err
}
modifiedCount += modified

iterator, err = reader.FilteredRead(iterator, filter.NewWrappedFilter(filters))
}
if err != nil {
return nil, ctx, err
}

modifiedCount := int32(0)
var row Row
for iterator.Next(&row) {
key, err := keys.FromBinary(table, row.Key)
if err != nil {
return nil, ctx, err
}

merged, er := factory.MergeAndGet(row.Data.RawData)
if er != nil {
return nil, ctx, err
}

newData := internal.NewTableDataWithTS(row.Data.CreatedAt, ts, merged)
// as we have merged the data, it is safe to call replace
if err = tx.Replace(ctx, key, newData, true); ulog.E(err) {
return nil, ctx, err
}
modifiedCount++
}
return &Response{
status: UpdatedStatus,
updatedAt: ts,
Expand Down Expand Up @@ -434,20 +461,57 @@ func (runner *DeleteQueryRunner) Run(ctx context.Context, tx transaction.Tx, ten
return nil, ctx, err
}

iKeys, err := runner.buildKeysUsingFilter(tenant, db, collection, runner.req.Filter)
table, err := runner.encoder.EncodeTableName(tenant.GetNamespace(), db, collection)
if err != nil {
return nil, ctx, err
}

var iterator Iterator
reader := NewDatabaseReader(ctx, tx)
if filter.None(runner.req.Filter) {
if iterator, err = reader.ScanTable(table); err != nil {
return nil, ctx, err
}
} else {
var iKeys []keys.Key
if iKeys, err = runner.buildKeysUsingFilter(tenant, db, collection, runner.req.Filter); err == nil {
iterator, err = reader.KeyIterator(iKeys)
} else {
if iterator, err = reader.ScanTable(table); err != nil {
return nil, ctx, err
}
filterFactory := filter.NewFactory(collection.QueryableFields)
var filters []filter.Filter
if filters, err = filterFactory.Factorize(runner.req.Filter); err != nil {
return nil, ctx, err
}

iterator, err = reader.FilteredRead(iterator, filter.NewWrappedFilter(filters))
}
}
if err != nil {
return nil, ctx, err
}

for _, key := range iKeys {
modifiedCount := int32(0)
var row Row
for iterator.Next(&row) {
key, err := keys.FromBinary(table, row.Key)
if err != nil {
return nil, ctx, err
}

if err = tx.Delete(ctx, key); ulog.E(err) {
return nil, ctx, err
}

modifiedCount++
}

return &Response{
status: DeletedStatus,
deletedAt: ts,
status: DeletedStatus,
deletedAt: ts,
modifiedCount: modifiedCount,
}, ctx, nil
}

Expand Down
6 changes: 3 additions & 3 deletions server/transaction/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type BaseTx interface {
Context() *SessionCtx
GetTxCtx() *api.TransactionCtx
Insert(ctx context.Context, key keys.Key, data *internal.TableData) error
Replace(ctx context.Context, key keys.Key, data *internal.TableData) error
Replace(ctx context.Context, key keys.Key, data *internal.TableData, isUpdate bool) error
Update(ctx context.Context, key keys.Key, apply func(*internal.TableData) (*internal.TableData, error)) (int32, error)
Delete(ctx context.Context, key keys.Key) error
Read(ctx context.Context, key keys.Key) (kv.Iterator, error)
Expand Down Expand Up @@ -179,15 +179,15 @@ func (s *TxSession) Insert(ctx context.Context, key keys.Key, data *internal.Tab
return s.kTx.Insert(ctx, key.Table(), kv.BuildKey(key.IndexParts()...), data)
}

func (s *TxSession) Replace(ctx context.Context, key keys.Key, data *internal.TableData) error {
func (s *TxSession) Replace(ctx context.Context, key keys.Key, data *internal.TableData, isUpdate bool) error {
s.Lock()
defer s.Unlock()

if err := s.validateSession(); err != nil {
return err
}

return s.kTx.Replace(ctx, key.Table(), kv.BuildKey(key.IndexParts()...), data)
return s.kTx.Replace(ctx, key.Table(), kv.BuildKey(key.IndexParts()...), data, isUpdate)
}

func (s *TxSession) Update(ctx context.Context, key keys.Key, apply func(*internal.TableData) (*internal.TableData, error)) (int32, error) {
Expand Down
2 changes: 1 addition & 1 deletion store/kv/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type baseKeyValue struct {

type baseKV interface {
Insert(ctx context.Context, table []byte, key Key, data []byte) error
Replace(ctx context.Context, table []byte, key Key, data []byte) error
Replace(ctx context.Context, table []byte, key Key, data []byte, isUpdate bool) error
Delete(ctx context.Context, table []byte, key Key) error
DeleteRange(ctx context.Context, table []byte, lKey Key, rKey Key) error
Read(ctx context.Context, table []byte, key Key) (baseIterator, error)
Expand Down
16 changes: 10 additions & 6 deletions store/kv/fdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ func (d *fdbkv) Insert(ctx context.Context, table []byte, key Key, data []byte)
return err
}

func (d *fdbkv) Replace(ctx context.Context, table []byte, key Key, data []byte) error {
func (d *fdbkv) Replace(ctx context.Context, table []byte, key Key, data []byte, isUpdate bool) error {
_, err := d.txWithRetry(ctx, func(tr fdb.Transaction) (interface{}, error) {
return nil, (&ftx{d: d, tx: &tr}).Replace(ctx, table, key, data)
return nil, (&ftx{d: d, tx: &tr}).Replace(ctx, table, key, data, isUpdate)
})
return err
}
Expand Down Expand Up @@ -292,11 +292,11 @@ func (b *fbatch) Insert(ctx context.Context, table []byte, key Key, data []byte)
return b.tx.Insert(ctx, table, key, data)
}

func (b *fbatch) Replace(ctx context.Context, table []byte, key Key, data []byte) error {
func (b *fbatch) Replace(ctx context.Context, table []byte, key Key, data []byte, isUpdate bool) error {
if err := b.flushBatch(ctx, key, nil, data); err != nil {
return err
}
return b.tx.Replace(ctx, table, key, data)
return b.tx.Replace(ctx, table, key, data, isUpdate)
}

func (b *fbatch) Delete(ctx context.Context, table []byte, key Key) error {
Expand Down Expand Up @@ -401,12 +401,16 @@ func (t *ftx) Insert(ctx context.Context, table []byte, key Key, data []byte) er
return err
}

func (t *ftx) Replace(ctx context.Context, table []byte, key Key, data []byte) error {
func (t *ftx) Replace(ctx context.Context, table []byte, key Key, data []byte, isUpdate bool) error {
listener := GetEventListener(ctx)
k := getFDBKey(table, key)

t.tx.Set(k, data)
listener.OnSet(ReplaceEvent, table, k, data)
if isUpdate {
listener.OnSet(UpdateEvent, table, k, data)
} else {
listener.OnSet(ReplaceEvent, table, k, data)
}

log.Debug().Str("table", string(table)).Interface("key", key).Msg("tx Replace")

Expand Down
18 changes: 9 additions & 9 deletions store/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Future fdb.FutureByteSlice

type KV interface {
Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) error
Replace(ctx context.Context, table []byte, key Key, data *internal.TableData) error
Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, isUpdate bool) error
Delete(ctx context.Context, table []byte, key Key) error
DeleteRange(ctx context.Context, table []byte, lKey Key, rKey Key) error
Read(ctx context.Context, table []byte, key Key) (Iterator, error)
Expand Down Expand Up @@ -199,18 +199,18 @@ func (m *KeyValueStoreImplWithMetrics) Insert(ctx context.Context, table []byte,
return
}

func (k *KeyValueStoreImpl) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData) error {
func (k *KeyValueStoreImpl) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, isUpdate bool) error {
enc, err := internal.Encode(data)
if err != nil {
return err
}

return k.fdbkv.Replace(ctx, table, key, enc)
return k.fdbkv.Replace(ctx, table, key, enc, isUpdate)
}

func (m *KeyValueStoreImplWithMetrics) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData) (err error) {
func (m *KeyValueStoreImplWithMetrics) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, isUpdate bool) (err error) {
m.measure(ctx, "Replace", func() error {
err = m.kv.Replace(ctx, table, key, data)
err = m.kv.Replace(ctx, table, key, data, isUpdate)
return err
})
return
Expand Down Expand Up @@ -433,18 +433,18 @@ func (m *TxImplWithMetrics) Insert(ctx context.Context, table []byte, key Key, d
return
}

func (tx *TxImpl) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData) error {
func (tx *TxImpl) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, isUpdate bool) error {
enc, err := internal.Encode(data)
if err != nil {
return err
}

return tx.ftx.Replace(ctx, table, key, enc)
return tx.ftx.Replace(ctx, table, key, enc, isUpdate)
}

func (m *TxImplWithMetrics) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData) (err error) {
func (m *TxImplWithMetrics) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, isUpdate bool) (err error) {
m.measure(ctx, "Replace", func() error {
err = m.tx.Replace(ctx, table, key, data)
err = m.tx.Replace(ctx, table, key, data, isUpdate)
return err
})
return
Expand Down
6 changes: 3 additions & 3 deletions store/kv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func testKeyValueStoreBasic(t *testing.T, kv KeyValueStore) {

// replace individual record
replacedValue2 := internal.NewTableData([]byte("value2+2"))
err = kv.Replace(ctx, table, BuildKey("p1", 2), replacedValue2)
err = kv.Replace(ctx, table, BuildKey("p1", 2), replacedValue2, false)
require.NoError(t, err)

it, err = kv.Read(ctx, table, BuildKey("p1", 2))
Expand Down Expand Up @@ -276,7 +276,7 @@ func benchKV(t *testing.T, kv baseKVStore) {
require.NoError(t, err)

key, doc := createDocument(t)
err = tx.Replace(ctx, table, BuildKey(key), doc)
err = tx.Replace(ctx, table, BuildKey(key), doc, false)
require.NoError(t, err)
require.NoError(t, tx.Commit(ctx))
atomic.AddInt64(&ops, 1)
Expand Down Expand Up @@ -322,7 +322,7 @@ func testKVBasic(t *testing.T, kv baseKVStore) {
require.Equal(t, []baseKeyValue{{Key: BuildKey("p1", int64(2)), FDBKey: getFDBKey(table, BuildKey("p1", int64(2))), Value: []byte("value2")}}, v)

// replace individual record
err = kv.Replace(ctx, table, BuildKey("p1", 2), []byte("value2+2"))
err = kv.Replace(ctx, table, BuildKey("p1", 2), []byte("value2+2"), false)
require.NoError(t, err)

it, err = kv.Read(ctx, table, BuildKey("p1", 2))
Expand Down
2 changes: 1 addition & 1 deletion store/kv/noop_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type NoopKV struct{}
func (n *NoopKV) Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) error {
return nil
}
func (n *NoopKV) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData) error {
func (n *NoopKV) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, isUpdate bool) error {
return nil
}
func (n *NoopKV) Delete(ctx context.Context, table []byte, key Key) error { return nil }
Expand Down
Loading

0 comments on commit 716715b

Please sign in to comment.