Skip to content

Commit

Permalink
Merge pull request #13 from BenSlabbert/fix/9-bulk-push-pop-causes-pr…
Browse files Browse the repository at this point in the history
…ogram-hang

fix: bulk push/pop looping forever
  • Loading branch information
alexisvisco authored Nov 18, 2019
2 parents 43d2742 + 8073daa commit a1b0a26
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 56 deletions.
6 changes: 3 additions & 3 deletions sonic/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
)

type connection struct {
reader *bufio.Reader
conn net.Conn
reader *bufio.Reader
conn net.Conn
cmdMaxBytes int
closed bool
closed bool
}

func newConnection(d *driver) (*connection, error) {
Expand Down
107 changes: 54 additions & 53 deletions sonic/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"unicode/utf8"
)

Expand Down Expand Up @@ -100,11 +99,11 @@ func NewIngester(host string, port int, password string) (Ingestable, error) {
func (i ingesterChannel) Push(collection, bucket, object, text string) (err error) {
//
patterns := []struct {
Pattern string
Replacement string
Pattern string
Replacement string
}{{"\\", "\\\\"},
{"\n", "\\n"},
{"\"", "\\\""}}
{"\"", "\\\""}}
for _, v := range patterns {
text = strings.Replace(text, v.Pattern, v.Replacement, -1)
}
Expand Down Expand Up @@ -136,7 +135,7 @@ func (i ingesterChannel) Push(collection, bucket, object, text string) (err erro
// whereas slicing a string simply returns a new string header backed by the same array as the original
// (taking constant time).
func splitText(longString string, maxLen int) []string {
splits := []string{}
var splits []string

var l, r int
for l, r = 0, maxLen; r < len(longString); l, r = r, r+maxLen {
Expand All @@ -154,39 +153,41 @@ func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines in
parallelRoutines = 1
}

errs = make([]IngestBulkError, 0)
errMutex := &sync.Mutex{}

// chunk array into N (parallelRoutines) parts
divided := divideIngestBulkRecords(records, parallelRoutines)

// dispatch each records array into N goroutines
group := sync.WaitGroup{}
group.Add(len(divided))
bulkErrorChan := make(chan []IngestBulkError)
defer close(bulkErrorChan)

for _, r := range divided {
go func(recs []IngestBulkRecord) {
conn, _ := newConnection(i.driver)
r := r
go func(driver *driver, collection, bucket string, recs []IngestBulkRecord, bulkErrorChan chan<- []IngestBulkError) {
errs := make([]IngestBulkError, 0)
newIngester, _ := NewIngester(driver.Host, driver.Port, driver.Password)

for _, rec := range recs {
if conn == nil {
addBulkError(&errs, rec, ErrClosed, errMutex)
}
err := i.Push(collection, bucket, rec.Object, rec.Text)
if err != nil {
addBulkError(&errs, rec, err, errMutex)
if newIngester == nil {
addBulkError(&errs, rec, ErrClosed)
continue
}
// sonic should sent OK
_, err = conn.read()
err := newIngester.Push(collection, bucket, rec.Object, rec.Text)
if err != nil {
addBulkError(&errs, rec, err, errMutex)
addBulkError(&errs, rec, err)
}
}
conn.close()
group.Done()
}(r)

if newIngester != nil {
_ = newIngester.Quit()
}
bulkErrorChan <- errs
}(i.driver, collection, bucket, r, bulkErrorChan)
}
group.Wait()

errs = make([]IngestBulkError, 0)
for range divided {
errs = append(errs, <-bulkErrorChan...)
}

return errs
}

Expand All @@ -209,42 +210,44 @@ func (i ingesterChannel) BulkPop(collection, bucket string, parallelRoutines int
parallelRoutines = 1
}

errs = make([]IngestBulkError, 0)
errMutex := &sync.Mutex{}

// chunk array into N (parallelRoutines) parts
divided := divideIngestBulkRecords(records, parallelRoutines)

// dispatch each records array into N goroutines
group := sync.WaitGroup{}
group.Add(len(divided))
bulkErrorChan := make(chan []IngestBulkError)
defer close(bulkErrorChan)

for _, r := range divided {
go func(recs []IngestBulkRecord) {
conn, _ := newConnection(i.driver)
r := r
go func(driver *driver, collection, bucket string, recs []IngestBulkRecord, bulkErrorChan chan<- []IngestBulkError) {
errs := make([]IngestBulkError, 0)
newIngester, _ := NewIngester(driver.Host, driver.Port, driver.Password)

for _, rec := range recs {
if conn == nil {
addBulkError(&errs, rec, ErrClosed, errMutex)
}
err := conn.write(fmt.Sprintf(
"%s %s %s %s \"%s\"",
pop, collection, bucket, rec.Object, rec.Text),
)
if err != nil {
addBulkError(&errs, rec, err, errMutex)
if newIngester == nil {
addBulkError(&errs, rec, ErrClosed)
continue
}
// sonic should sent OK
_, err = conn.read()

err := newIngester.Pop(collection, bucket, rec.Object, rec.Text)

if err != nil {
addBulkError(&errs, rec, err, errMutex)
addBulkError(&errs, rec, err)
}
}
conn.close()
group.Done()
}(r)

if newIngester != nil {
_ = newIngester.Quit()
}

bulkErrorChan <- errs
}(i.driver, collection, bucket, r, bulkErrorChan)
}

errs = make([]IngestBulkError, 0)
for range divided {
errs = append(errs, <-bulkErrorChan...)
}
group.Wait()

return errs
}

Expand Down Expand Up @@ -328,8 +331,6 @@ func divideIngestBulkRecords(records []IngestBulkRecord, parallelRoutines int) [
return divided
}

func addBulkError(e *[]IngestBulkError, record IngestBulkRecord, err error, mutex *sync.Mutex) {
mutex.Lock()
defer mutex.Unlock()
func addBulkError(e *[]IngestBulkError, record IngestBulkRecord, err error) {
*e = append(*e, IngestBulkError{record.Object, err})
}
40 changes: 40 additions & 0 deletions sonic/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,27 @@ import (
var records = make([]IngestBulkRecord, 0)
var ingester, err = NewIngester("localhost", 1491, "SecretPassword")

func BenchmarkIngesterChannel_BulkPush2XMaxCPUs(b *testing.B) {
if err != nil {
return
}

cpus := 2 * runtime.NumCPU()

for n := 0; n < b.N; n++ {
e := ingester.FlushBucket("test", "test2XMaxCpus")
if e != nil {
b.Log(e)
b.Fail()
}
be := ingester.BulkPush("test", "test2XMaxCpus", cpus, records)
if len(be) > 0 {
b.Log(be, e)
b.Fail()
}
}
}

func BenchmarkIngesterChannel_BulkPushMaxCPUs(b *testing.B) {
if err != nil {
return
Expand Down Expand Up @@ -50,6 +71,25 @@ func BenchmarkIngesterChannel_BulkPush10(b *testing.B) {
}
}

func BenchmarkIngesterChannel_BulkPop10(b *testing.B) {
if err != nil {
return
}

for n := 0; n < b.N; n++ {
e := ingester.FlushBucket("test", "popTest10")
if e != nil {
b.Log(e)
b.Fail()
}
be := ingester.BulkPop("test", "popTest10", 10, records)
if len(be) > 0 {
b.Log(be, err)
b.Fail()
}
}
}

func BenchmarkIngesterChannel_Push(b *testing.B) {
if err != nil {
return
Expand Down

0 comments on commit a1b0a26

Please sign in to comment.