Skip to content

Commit

Permalink
Merge pull request #257 from tigrisdata/main
Browse files Browse the repository at this point in the history
Release alpha
  • Loading branch information
himank authored May 17, 2022
2 parents ebf8207 + 98cf798 commit db52902
Show file tree
Hide file tree
Showing 46 changed files with 2,568 additions and 2,006 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/go-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ jobs:
run: |
echo "need_rebuild=true" >> $GITHUB_ENV
- name: Create cache directory
run: mkdir -p /home/runner/.cache/go-build

- name: Pull build docker images
if: env.need_rebuild
run: |
Expand Down
2 changes: 1 addition & 1 deletion api/proto
Submodule proto updated from 830440 to 56ba26
29 changes: 29 additions & 0 deletions api/server/v1/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,32 @@ func (x *DescribeDatabaseResponse) MarshalJSON() ([]byte, error) {

return json.Marshal(&resp)
}

func (x *StreamResponse) MarshalJSON() ([]byte, error) {
type event struct {
TxId []byte `json:"tx_id"`
Collection string `json:"collection"`
Op string `json:"op"`
Key []byte `json:"key,omitempty"`
LKey []byte `json:"lkey,omitempty"`
RKey []byte `json:"rkey,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
Last bool `json:"last"`
}

resp := struct {
Event event `json:"event,omitempty"`
}{
Event: event{
TxId: x.Event.TxId,
Collection: x.Event.Collection,
Op: x.Event.Op,
Key: x.Event.Key,
LKey: x.Event.Lkey,
RKey: x.Event.Rkey,
Data: x.Event.Data,
Last: x.Event.Last,
},
}
return json.Marshal(resp)
}
20 changes: 12 additions & 8 deletions cdc/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ import (
)

type Tx struct {
Id []byte
Ops []Op
}

type Op struct {
Type string
Op string
Table []byte
Key []byte `json:",omitempty"`
LKey []byte `json:",omitempty"`
RKey []byte `json:",omitempty"`
Data []byte `json:",omitempty"`
Last bool
}

func (tx *Tx) addOp(entry Op) {
Expand All @@ -42,18 +44,18 @@ type TxListener struct {
tx *Tx
}

func (l *TxListener) OnSet(opType string, table []byte, key []byte, data []byte) {
func (l *TxListener) OnSet(op string, table []byte, key []byte, data []byte) {
l.tx.addOp(Op{
Type: opType,
Op: op,
Table: table,
Key: key,
Data: data,
})
}

func (l *TxListener) OnClearRange(opType string, table []byte, lKey []byte, rKey []byte) {
func (l *TxListener) OnClearRange(op string, table []byte, lKey []byte, rKey []byte) {
l.tx.addOp(Op{
Type: opType,
Op: op,
Table: table,
LKey: lKey,
RKey: rKey,
Expand All @@ -69,6 +71,8 @@ func (l *TxListener) OnCommit(fdbTx *fdb.Transaction) error {
return nil
}

l.tx.Ops[len(l.tx.Ops)-1].Last = true

json, err := jsoniter.Marshal(l.tx)
if err != nil {
return err
Expand All @@ -79,13 +83,13 @@ func (l *TxListener) OnCommit(fdbTx *fdb.Transaction) error {
return err
}

data := internal.NewTableDataWithEncoding(json, internal.JsonEncoding)
bytes, err := internal.Encode(data)
td := internal.NewTableDataWithEncoding(json, internal.JsonEncoding)
enc, err := internal.Encode(td)
if err != nil {
return err
}

fdbTx.SetVersionstampedKey(key, bytes)
fdbTx.SetVersionstampedKey(key, enc)

return nil
}
Expand Down
2 changes: 2 additions & 0 deletions cdc/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func (s *Streamer) read() error {
return nil, err
}

tx.Id = kv.Key

if len(s.Txs) < cap(s.Txs) {
s.lastKey = kv.Key
s.Txs <- tx
Expand Down
88 changes: 54 additions & 34 deletions cmd/consistency/consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"context"
"fmt"
"math/rand"
"time"

"github.com/rs/zerolog/log"
clientConfig "github.com/tigrisdata/tigris-client-go/config"
Expand All @@ -12,40 +14,15 @@ import (

type Workload interface {
Setup(client driver.Driver) error
Start(client driver.Driver) error
Start(client driver.Driver) (int64, error)
Check(client driver.Driver) (bool, error)
Type() string
}

func CreateWorkloads() []Workload {
var workload []Workload
workload = append(workload, &workload2.DDLWorkload{
Threads: 16,
Database: "test1",
Collections: []string{"c1"},
Schemas: [][]byte{
[]byte(`{
"title": "c1",
"properties": {
"F1": {
"type": "integer"
},
"F2": {
"type": "string"
}
},
"primary_key": ["F1"]
}`)},
})

workload = append(workload, &workload2.InsertOnlyWorkload{
Threads: 64,
Records: 64,
Database: "test1",
Collections: []string{"c1", "c2"},
// first is integer primary key, second is string primary key
Schemas: [][]byte{
[]byte(`{
func collectionsForLoadTest() ([]string, [][]byte) {
// first is integer primary key, second is string primary key
return []string{"c1", "c2"}, [][]byte{
[]byte(`{
"title": "c1",
"properties": {
"F1": {
Expand All @@ -69,7 +46,7 @@ func CreateWorkloads() []Workload {
},
"primary_key": ["F1"]
}`),
[]byte(`{
[]byte(`{
"title": "c2",
"properties": {
"F1": {
Expand All @@ -93,15 +70,54 @@ func CreateWorkloads() []Workload {
},
"primary_key": ["F2"]
}`),
},
}
}

func CreateWorkloads() []Workload {
collections, schemas := collectionsForLoadTest()
var workload []Workload
workload = append(workload, &workload2.DDLWorkload{
Threads: 1,
Database: "test1",
Collections: []string{collections[0]},
Schemas: [][]byte{schemas[0]},
})

workload = append(workload, &workload2.InsertOnlyWorkload{
Threads: 64,
Records: 64,
Database: "test1",
Collections: collections,
// first is integer primary key, second is string primary key
Schemas: schemas,
})

workload = append(workload, &workload2.ReplaceOnlyWorkload{
Threads: 32,
Records: 32,
Database: "test1",
Collections: collections,
// first is integer primary key, second is string primary key
Schemas: schemas,
})

workload = append(workload, &workload2.SmallConciseWorkload{
Threads: 16,
Records: 10,
Database: "test1",
Collections: collections,
// first is integer primary key, second is string primary key
Schemas: schemas,
})

return workload
}

func main() {
rand.Seed(time.Now().Unix())

driver.DefaultProtocol = driver.HTTP
client, err := driver.NewDriver(context.TODO(), &clientConfig.Config{
client, err := driver.NewDriver(context.TODO(), &clientConfig.Driver{
URL: fmt.Sprintf("http://%s:%d", "localhost", 8081),
})
if err != nil {
Expand All @@ -115,9 +131,13 @@ func main() {
log.Panic().Err(err).Msg("workload setup failed")
}

if err = w.Start(client); err != nil {
start := time.Now()
records, err := w.Start(client)
if err != nil {
log.Panic().Err(err).Msg("workload start failed")
}
log.Debug().Msgf("load generated in %v, total records %d", time.Since(start), records)

var success bool
success, err = w.Check(client)
if err != nil {
Expand Down
12 changes: 7 additions & 5 deletions cmd/consistency/workload/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,31 @@ func (q *Queue) Get(collectionName string) *QueueDocuments {
return q.collectionToDocument[collectionName]
}

func (q *Queue) Insert(collectionName string, doc *Document) {
func (q *Queue) Add(collectionName string, doc *Document) {
q.Lock()
defer q.Unlock()

qd := q.collectionToDocument[collectionName]
qd.Insert(doc)
qd.Add(doc)
}

type QueueDocuments struct {
sync.Mutex

Collection string
Documents []*Document
Documents map[int64]*Document
}

func NewQueueDocuments(collection string) *QueueDocuments {
return &QueueDocuments{
Collection: collection,
Documents: make(map[int64]*Document),
}
}

func (q *QueueDocuments) Insert(doc *Document) {
func (q *QueueDocuments) Add(doc *Document) {
q.Lock()
defer q.Unlock()
q.Documents = append(q.Documents, doc)

q.Documents[doc.F1] = doc
}
2 changes: 1 addition & 1 deletion cmd/consistency/workload/workload_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (w *DDLWorkload) Setup(client driver.Driver) error {
return nil
}

func (w *DDLWorkload) Start(client driver.Driver) error { return nil }
func (w *DDLWorkload) Start(client driver.Driver) (int64, error) { return 0, nil }

func (w *DDLWorkload) Check(client driver.Driver) (bool, error) {
var wg sync.WaitGroup
Expand Down
19 changes: 5 additions & 14 deletions cmd/consistency/workload/workload_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package workload
import (
"context"
"reflect"
"sort"
"sync"

"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -53,7 +52,7 @@ func (w *InsertOnlyWorkload) Setup(client driver.Driver) error {
return tx.Commit(context.TODO())
}

func (w *InsertOnlyWorkload) Start(client driver.Driver) error {
func (w *InsertOnlyWorkload) Start(client driver.Driver) (int64, error) {
var insertErr error
var wg sync.WaitGroup
for i := int16(0); i < w.Threads; i++ {
Expand All @@ -71,21 +70,20 @@ func (w *InsertOnlyWorkload) Start(client driver.Driver) error {

for k := 0; k < len(w.Collections); k++ {
if _, err := client.UseDatabase(w.Database).Insert(context.TODO(), w.Collections[k], []driver.Document{serialized}); err != nil {
insertErr = multierror.Append(insertErr, errors.Wrapf(err, "insert to collection failed '%s' '%s'", w.Database, w.Collections[j]))
insertErr = multierror.Append(insertErr, errors.Wrapf(err, "insert to collection failed '%s' '%s'", w.Database, w.Collections[k]))
return
}

w.WorkloadData.Insert(w.Collections[k], doc)
w.WorkloadData.Add(w.Collections[k], doc)
//log.Debug().Msgf("inserted document '%s' '%s' '%v'\n", w.Database, w.Collections[k], doc)
}
id++
}
}(uniqueIdentifier)
}

wg.Wait()

return insertErr
return w.Records * int64(w.Threads), insertErr
}

func (w *InsertOnlyWorkload) Check(client driver.Driver) (bool, error) {
Expand All @@ -104,7 +102,7 @@ func (w *InsertOnlyWorkload) Check(client driver.Driver) (bool, error) {
return false, errors.Wrapf(err, "deserialzing document failed")
}
//log.Debug().Msgf("read document '%s' '%s' '%v'", w.Database, collection, document)
queueDoc.Insert(document)
queueDoc.Add(document)
}

if it.Err() != nil {
Expand All @@ -116,13 +114,6 @@ func (w *InsertOnlyWorkload) Check(client driver.Driver) (bool, error) {
log.Debug().Msgf("consistency issue for collection '%s' '%s', ignoring further checks totalDocumentsInserted: %d, totalDocsRead: %d", w.Database, collection, len(existing.Documents), len(queueDoc.Documents))
}

sort.Slice(existing.Documents, func(i, j int) bool {
return existing.Documents[i].F1 < existing.Documents[j].F1
})
sort.Slice(queueDoc.Documents, func(i, j int) bool {
return queueDoc.Documents[i].F1 < queueDoc.Documents[j].F1
})

isSuccess = reflect.DeepEqual(existing.Documents, queueDoc.Documents)
if !isSuccess {
log.Debug().Msgf("consistency issue for collection '%s' '%s', ignoring further checks", w.Database, collection)
Expand Down
Loading

0 comments on commit db52902

Please sign in to comment.