Skip to content

Commit

Permalink
refactor: query execution logic, use metadata versioning for all sche…
Browse files Browse the repository at this point in the history
…ma updates, some bug fixes
  • Loading branch information
himank committed May 16, 2022
1 parent 1b8c83a commit 9ed8bb5
Show file tree
Hide file tree
Showing 24 changed files with 1,222 additions and 1,086 deletions.
86 changes: 53 additions & 33 deletions cmd/consistency/consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"context"
"fmt"
"math/rand"
"time"

"github.com/rs/zerolog/log"
clientConfig "github.com/tigrisdata/tigris-client-go/config"
Expand All @@ -12,40 +14,15 @@ import (

type Workload interface {
Setup(client driver.Driver) error
Start(client driver.Driver) error
Start(client driver.Driver) (int64, error)
Check(client driver.Driver) (bool, error)
Type() string
}

func CreateWorkloads() []Workload {
var workload []Workload
workload = append(workload, &workload2.DDLWorkload{
Threads: 16,
Database: "test1",
Collections: []string{"c1"},
Schemas: [][]byte{
[]byte(`{
"title": "c1",
"properties": {
"F1": {
"type": "integer"
},
"F2": {
"type": "string"
}
},
"primary_key": ["F1"]
}`)},
})

workload = append(workload, &workload2.InsertOnlyWorkload{
Threads: 64,
Records: 64,
Database: "test1",
Collections: []string{"c1", "c2"},
// first is integer primary key, second is string primary key
Schemas: [][]byte{
[]byte(`{
func collectionsForLoadTest() ([]string, [][]byte) {
// first is integer primary key, second is string primary key
return []string{"c1", "c2"}, [][]byte{
[]byte(`{
"title": "c1",
"properties": {
"F1": {
Expand All @@ -69,7 +46,7 @@ func CreateWorkloads() []Workload {
},
"primary_key": ["F1"]
}`),
[]byte(`{
[]byte(`{
"title": "c2",
"properties": {
"F1": {
Expand All @@ -93,13 +70,52 @@ func CreateWorkloads() []Workload {
},
"primary_key": ["F2"]
}`),
},
}
}

func CreateWorkloads() []Workload {
collections, schemas := collectionsForLoadTest()
var workload []Workload
workload = append(workload, &workload2.DDLWorkload{
Threads: 1,
Database: "test1",
Collections: []string{collections[0]},
Schemas: [][]byte{schemas[0]},
})

workload = append(workload, &workload2.InsertOnlyWorkload{
Threads: 64,
Records: 64,
Database: "test1",
Collections: collections,
// first is integer primary key, second is string primary key
Schemas: schemas,
})

workload = append(workload, &workload2.ReplaceOnlyWorkload{
Threads: 32,
Records: 32,
Database: "test1",
Collections: collections,
// first is integer primary key, second is string primary key
Schemas: schemas,
})

workload = append(workload, &workload2.SmallConciseWorkload{
Threads: 16,
Records: 10,
Database: "test1",
Collections: collections,
// first is integer primary key, second is string primary key
Schemas: schemas,
})

return workload
}

func main() {
rand.Seed(time.Now().Unix())

driver.DefaultProtocol = driver.HTTP
client, err := driver.NewDriver(context.TODO(), &clientConfig.Driver{
URL: fmt.Sprintf("http://%s:%d", "localhost", 8081),
Expand All @@ -115,9 +131,13 @@ func main() {
log.Panic().Err(err).Msg("workload setup failed")
}

if err = w.Start(client); err != nil {
start := time.Now()
records, err := w.Start(client)
if err != nil {
log.Panic().Err(err).Msg("workload start failed")
}
log.Debug().Msgf("load generated in %v, total records %d", time.Since(start), records)

var success bool
success, err = w.Check(client)
if err != nil {
Expand Down
12 changes: 7 additions & 5 deletions cmd/consistency/workload/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,31 @@ func (q *Queue) Get(collectionName string) *QueueDocuments {
return q.collectionToDocument[collectionName]
}

func (q *Queue) Insert(collectionName string, doc *Document) {
func (q *Queue) Add(collectionName string, doc *Document) {
q.Lock()
defer q.Unlock()

qd := q.collectionToDocument[collectionName]
qd.Insert(doc)
qd.Add(doc)
}

type QueueDocuments struct {
sync.Mutex

Collection string
Documents []*Document
Documents map[int64]*Document
}

func NewQueueDocuments(collection string) *QueueDocuments {
return &QueueDocuments{
Collection: collection,
Documents: make(map[int64]*Document),
}
}

func (q *QueueDocuments) Insert(doc *Document) {
func (q *QueueDocuments) Add(doc *Document) {
q.Lock()
defer q.Unlock()
q.Documents = append(q.Documents, doc)

q.Documents[doc.F1] = doc
}
2 changes: 1 addition & 1 deletion cmd/consistency/workload/workload_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (w *DDLWorkload) Setup(client driver.Driver) error {
return nil
}

func (w *DDLWorkload) Start(client driver.Driver) error { return nil }
func (w *DDLWorkload) Start(client driver.Driver) (int64, error) { return 0, nil }

func (w *DDLWorkload) Check(client driver.Driver) (bool, error) {
var wg sync.WaitGroup
Expand Down
19 changes: 5 additions & 14 deletions cmd/consistency/workload/workload_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package workload
import (
"context"
"reflect"
"sort"
"sync"

"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -53,7 +52,7 @@ func (w *InsertOnlyWorkload) Setup(client driver.Driver) error {
return tx.Commit(context.TODO())
}

func (w *InsertOnlyWorkload) Start(client driver.Driver) error {
func (w *InsertOnlyWorkload) Start(client driver.Driver) (int64, error) {
var insertErr error
var wg sync.WaitGroup
for i := int16(0); i < w.Threads; i++ {
Expand All @@ -71,21 +70,20 @@ func (w *InsertOnlyWorkload) Start(client driver.Driver) error {

for k := 0; k < len(w.Collections); k++ {
if _, err := client.UseDatabase(w.Database).Insert(context.TODO(), w.Collections[k], []driver.Document{serialized}); err != nil {
insertErr = multierror.Append(insertErr, errors.Wrapf(err, "insert to collection failed '%s' '%s'", w.Database, w.Collections[j]))
insertErr = multierror.Append(insertErr, errors.Wrapf(err, "insert to collection failed '%s' '%s'", w.Database, w.Collections[k]))
return
}

w.WorkloadData.Insert(w.Collections[k], doc)
w.WorkloadData.Add(w.Collections[k], doc)
//log.Debug().Msgf("inserted document '%s' '%s' '%v'\n", w.Database, w.Collections[k], doc)
}
id++
}
}(uniqueIdentifier)
}

wg.Wait()

return insertErr
return w.Records * int64(w.Threads), insertErr
}

func (w *InsertOnlyWorkload) Check(client driver.Driver) (bool, error) {
Expand All @@ -104,7 +102,7 @@ func (w *InsertOnlyWorkload) Check(client driver.Driver) (bool, error) {
return false, errors.Wrapf(err, "deserialzing document failed")
}
//log.Debug().Msgf("read document '%s' '%s' '%v'", w.Database, collection, document)
queueDoc.Insert(document)
queueDoc.Add(document)
}

if it.Err() != nil {
Expand All @@ -116,13 +114,6 @@ func (w *InsertOnlyWorkload) Check(client driver.Driver) (bool, error) {
log.Debug().Msgf("consistency issue for collection '%s' '%s', ignoring further checks totalDocumentsInserted: %d, totalDocsRead: %d", w.Database, collection, len(existing.Documents), len(queueDoc.Documents))
}

sort.Slice(existing.Documents, func(i, j int) bool {
return existing.Documents[i].F1 < existing.Documents[j].F1
})
sort.Slice(queueDoc.Documents, func(i, j int) bool {
return queueDoc.Documents[i].F1 < queueDoc.Documents[j].F1
})

isSuccess = reflect.DeepEqual(existing.Documents, queueDoc.Documents)
if !isSuccess {
log.Debug().Msgf("consistency issue for collection '%s' '%s', ignoring further checks", w.Database, collection)
Expand Down
133 changes: 133 additions & 0 deletions cmd/consistency/workload/workload_replace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package workload

import (
"context"
"reflect"
"sync"

"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/tigrisdata/tigris-client-go/driver"
)

type ReplaceOnlyWorkload struct {
Threads int16
Records int64
Database string
Collections []string
Schemas [][]byte
WorkloadData *Queue
}

func (w *ReplaceOnlyWorkload) Type() string {
return "replace_only_workload"
}

func (w *ReplaceOnlyWorkload) Setup(client driver.Driver) error {
w.WorkloadData = NewQueue(w.Collections)

// cleanup first
err := client.DropDatabase(context.TODO(), w.Database)
if err != nil {
log.Err(err).Msgf("dropped database failed, ignoring error '%s'", w.Database)
}

err = client.CreateDatabase(context.TODO(), w.Database)
if err != nil {
log.Err(err).Msgf("created database failed ignoring error '%s'", w.Database)
}

tx, err := client.BeginTx(context.TODO(), w.Database)
if err != nil {
return errors.Wrapf(err, "begin tx failed for db '%s'", w.Database)
}

for i := 0; i < len(w.Schemas); i++ {
if err = tx.CreateOrUpdateCollection(context.TODO(), w.Collections[i], w.Schemas[i]); err != nil {
return errors.Wrapf(err, "CreateOrUpdateCollection failed for db '%s' coll '%s'", w.Database, w.Collections[i])
}
}

return tx.Commit(context.TODO())
}

func (w *ReplaceOnlyWorkload) Start(client driver.Driver) (int64, error) {
var replaceErr error
var wg sync.WaitGroup
for i := int16(0); i < w.Threads; i++ {
wg.Add(1)
uniqueIdentifier := w.Records + int64(i)*w.Records
go func(id int64) {
defer wg.Done()
for j := int64(0); j < w.Records; j++ {
doc := NewDocument(1)
serialized, err := Serialize(doc)
if err != nil {
replaceErr = multierror.Append(replaceErr, err)
return
}

for k := 0; k < len(w.Collections); k++ {
if _, err := client.UseDatabase(w.Database).Replace(context.TODO(), w.Collections[k], []driver.Document{serialized}); err != nil {
replaceErr = multierror.Append(replaceErr, errors.Wrapf(err, "insert to collection failed '%s' '%s'", w.Database, w.Collections[k]))
return
}

w.WorkloadData.Add(w.Collections[k], doc)
//log.Debug().Msgf("replaced document '%s' '%s' '%v'\n", w.Database, w.Collections[k], doc)
}
id++
}
}(uniqueIdentifier)
}

wg.Wait()

return w.Records * int64(w.Threads), replaceErr
}

func (w *ReplaceOnlyWorkload) Check(client driver.Driver) (bool, error) {
isSuccess := false
for _, collection := range w.Collections {
it, err := client.UseDatabase(w.Database).Read(context.TODO(), collection, driver.Filter(`{}`), nil)
if err != nil {
return false, errors.Wrapf(err, "read to collection failed '%s' '%s'", w.Database, collection)
}

queueDoc := NewQueueDocuments(collection)
var doc driver.Document
for it.Next(&doc) {
document, err := Deserialize(doc)
if err != nil {
return false, errors.Wrapf(err, "deserialzing document failed")
}
//log.Debug().Msgf("read document '%s' '%s' '%v'", w.Database, collection, document)
queueDoc.Add(document)
}

if it.Err() != nil {
return false, it.Err()
}

existing := w.WorkloadData.Get(collection)
if len(existing.Documents) != len(queueDoc.Documents) {
log.Debug().Msgf("consistency issue for collection '%s' '%s', ignoring further checks totalDocumentsInserted: %d, totalDocsRead: %d", w.Database, collection, len(existing.Documents), len(queueDoc.Documents))
}

isSuccess = reflect.DeepEqual(existing.Documents, queueDoc.Documents)
if !isSuccess {
for _, doc := range existing.Documents {
log.Debug().Msgf("existing document %v", doc)
}
for _, doc := range queueDoc.Documents {
log.Debug().Msgf("found document %v", doc)
}

log.Debug().Msgf("consistency issue for collection '%s' '%s', ignoring further checks %v %v", w.Database, collection, existing.Documents, queueDoc.Documents)
}
return isSuccess, nil
}

return true, nil
}
Loading

0 comments on commit 9ed8bb5

Please sign in to comment.