Skip to content

Commit

Permalink
Support v3io stream consumer groups (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavius authored Feb 16, 2020
1 parent 98824ee commit 1d2df5f
Show file tree
Hide file tree
Showing 23 changed files with 2,140 additions and 96 deletions.
4 changes: 3 additions & 1 deletion go.mod
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ go 1.12

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/mattn/go-colorable v0.1.1 // indirect
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect
github.com/nuclio/errors v0.0.1
github.com/nuclio/logger v0.0.0-20190303161055-fc1e4b16d127
github.com/nuclio/zap v0.0.2
github.com/pavius/zap v1.4.2-0.20180228181622-8d52692529b8 // indirect
github.com/philhofer/fwd v1.0.0 // indirect
github.com/pkg/errors v0.8.1 // indirect
github.com/rs/xid v1.1.0
github.com/stretchr/testify v1.3.0
github.com/tinylib/msgp v1.1.1 // indirect
github.com/valyala/fasthttp v1.2.0
go.uber.org/atomic v1.3.2 // indirect
go.uber.org/multierr v1.1.0 // indirect
Expand Down
10 changes: 6 additions & 4 deletions go.sum
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/capnproto/go-capnproto2 v2.17.0+incompatible h1:vPbYlc2CBNdjzOMzHfwo7TbFNRBDaRKitlWiRs1riTw=
github.com/capnproto/go-capnproto2 v2.17.0+incompatible/go.mod h1:T3/pxeK0qevFRlAASYZe90Ozs+JmlQTNY+VLc6+lJHw=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand All @@ -8,8 +6,6 @@ github.com/klauspost/compress v1.4.0 h1:8nsMz3tWa9SWWPL60G1V6CUsf4lLjWLTNEtibhe8
github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e h1:+lIPJOWl+jSiJOc70QXJ07+2eg2Jy2EC7Mi11BWujeM=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/mattn/go-colorable v0.1.1 h1:G1f5SKeVxmagw/IyvzvtZE4Gybcc4Tr1tf7I8z0XgOg=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
github.com/mattn/go-isatty v0.0.5 h1:tHXDdz1cpzGaovsTB+TVB8q90WEokoVmfMqoVcrLUgw=
Expand All @@ -24,13 +20,19 @@ github.com/nuclio/zap v0.0.2 h1:rY5PkMOl8CTkqRqIPuxziBiKK6Mq/8oEurfgRnNtqf0=
github.com/nuclio/zap v0.0.2/go.mod h1:SUxPsgePvlyjx6c5MtGdB50pf0IQThtlyLwISLboeuc=
github.com/pavius/zap v1.4.2-0.20180228181622-8d52692529b8 h1:WqLgmr/wj9TO5Sc6oYPQRAJBxuHE0NTeuVeFnT+FZVo=
github.com/pavius/zap v1.4.2-0.20180228181622-8d52692529b8/go.mod h1:6FWOCx06uh50GClv8S2cfk3asqTJs3qq3ZNRtLZE77I=
github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rs/xid v1.1.0 h1:9Z322kTPrDR5GpxTH+1yl7As6tEHIH9aGsRccl20ELk=
github.com/rs/xid v1.1.0/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/tinylib/msgp v1.1.1 h1:TnCZ3FIuKeaIy+F45+Cnp+caqdXGy4z74HvwXN+570Y=
github.com/tinylib/msgp v1.1.1/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.2.0 h1:dzZJf2IuMiclVjdw0kkT+f9u4YdrapbNyGAN47E/qnk=
Expand Down
99 changes: 99 additions & 0 deletions pkg/common/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package common

import (
"math"
"math/rand"
"sync/atomic"
"time"
)

// Backoff is a time.Duration counter, starting at Min. After every call to
// the Duration method the current timing is multiplied by Factor, but it
// never exceeds Max.
//
// Backoff is not generally concurrent-safe, but the ForAttempt method can
// be used concurrently.
type Backoff struct {
attempt uint64
// Factor is the multiplying factor for each increment step
Factor float64
// Jitter eases contention by randomizing backoff steps
Jitter bool
// Min and Max are the minimum and maximum values of the counter
Min, Max time.Duration
}

// Duration returns the duration for the current attempt before incrementing
// the attempt counter. See ForAttempt.
func (b *Backoff) Duration() time.Duration {
d := b.ForAttempt(float64(atomic.AddUint64(&b.attempt, 1) - 1))
return d
}

const maxInt64 = float64(math.MaxInt64 - 512)

// ForAttempt returns the duration for a specific attempt. This is useful if
// you have a large number of independent Backoffs, but don't want use
// unnecessary memory storing the Backoff parameters per Backoff. The first
// attempt should be 0.
//
// ForAttempt is concurrent-safe.
func (b *Backoff) ForAttempt(attempt float64) time.Duration {
// Zero-values are nonsensical, so we use
// them to apply defaults
min := b.Min
if min <= 0 {
min = 100 * time.Millisecond
}
max := b.Max
if max <= 0 {
max = 10 * time.Second
}
if min >= max {
// short-circuit
return max
}
factor := b.Factor
if factor <= 0 {
factor = 2
}
//calculate this duration
minf := float64(min)
durf := minf * math.Pow(factor, attempt)
if b.Jitter {
durf = rand.Float64()*(durf-minf) + minf
}
//ensure float64 wont overflow int64
if durf > maxInt64 {
return max
}
dur := time.Duration(durf)
//keep within bounds
if dur < min {
return min
}
if dur > max {
return max
}
return dur
}

// Reset restarts the current attempt counter at zero.
func (b *Backoff) Reset() {
atomic.StoreUint64(&b.attempt, 0)
}

// Attempt returns the current attempt counter value.
func (b *Backoff) Attempt() float64 {
return float64(atomic.LoadUint64(&b.attempt))
}

// Copy returns a backoff with equals constraints as the original
func (b *Backoff) Copy() *Backoff {
return &Backoff{
Factor: b.Factor,
Jitter: b.Jitter,
Min: b.Min,
Max: b.Max,
}
}
142 changes: 142 additions & 0 deletions pkg/common/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package common

import (
"context"
"reflect"
"runtime"
"time"

"github.com/nuclio/errors"
"github.com/nuclio/logger"
)

func getFunctionName(fn interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
}

// give either retryInterval or backoff
func RetryFunc(ctx context.Context,
loggerInstance logger.Logger,
attempts int,
retryInterval *time.Duration,
backoff *Backoff,
fn func(int) (bool, error)) error {

var err error
var retry bool

for attempt := 1; attempt <= attempts; attempt++ {
retry, err = fn(attempt)

// if there's no need to retry - we're done
if !retry {
return err
}

// are we out of time?
if ctx.Err() != nil {

loggerInstance.WarnWithCtx(ctx,
"Context error detected during retries",
"ctxErr", ctx.Err(),
"previousErr", err,
"function", getFunctionName(fn),
"attempt", attempt)

// return the error if one was provided
if err != nil {
return err
}

return ctx.Err()
}

if backoff != nil {
time.Sleep(backoff.Duration())
} else {
if retryInterval == nil {
return errors.New("Either retry interval or backoff must be given")
}
time.Sleep(*retryInterval)
}
}

// attempts exhausted and we're unsuccessful
// Return the original error for later checking
loggerInstance.WarnWithCtx(ctx,
"Failed final attempt to invoke function",
"function", getFunctionName(fn),
"err", err,
"attempts", attempts)

// this shouldn't happen
if err == nil {
loggerInstance.ErrorWithCtx(ctx,
"Failed final attempt to invoke function, but error is nil. This shouldn't happen",
"function", getFunctionName(fn),
"err", err,
"attempts", attempts)
return errors.New("Failed final attempt to invoke function without proper error supplied")
}
return err
}

func MakeRange(min, max int) []int {
a := make([]int, max-min+1)
for i := range a {
a[i] = min + i
}
return a
}

func IntSliceContainsInt(slice []int, number int) bool {
for _, intInSlice := range slice {
if intInSlice == number {
return true
}
}

return false
}

func IntSlicesEqual(slice1 []int, slice2 []int) bool {
if len(slice1) != len(slice2) {
return false
}

for intIndex := 0; intIndex < len(slice1); intIndex++ {
if slice1[intIndex] != slice2[intIndex] {
return false
}
}

return true
}

func Uint64SlicesEqual(slice1 []uint64, slice2 []uint64) bool {
if len(slice1) != len(slice2) {
return false
}

for intIndex := 0; intIndex < len(slice1); intIndex++ {
if slice1[intIndex] != slice2[intIndex] {
return false
}
}

return true
}

func StringSlicesEqual(slice1 []string, slice2 []string) bool {
if len(slice1) != len(slice2) {
return false
}

for stringIndex := 0; stringIndex < len(slice1); stringIndex++ {
if slice1[stringIndex] != slice2[stringIndex] {
return false
}
}

return true
}
6 changes: 6 additions & 0 deletions pkg/dataplane/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ type Container interface {
// CreateStreamSync
CreateStreamSync(*CreateStreamInput) error

// DescribeStream
DescribeStream(*DescribeStreamInput, interface{}, chan *Response) (*Request, error)

// DescribeStreamSync
DescribeStreamSync(*DescribeStreamInput) (*Response, error)

// DeleteStream
DeleteStream(*DeleteStreamInput, interface{}, chan *Response) (*Request, error)

Expand Down
14 changes: 14 additions & 0 deletions pkg/dataplane/http/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,20 @@ func (c *container) CreateStreamSync(createStreamInput *v3io.CreateStreamInput)
return c.session.context.CreateStreamSync(createStreamInput)
}

// DescribeStream
func (c *container) DescribeStream(describeStreamInput *v3io.DescribeStreamInput,
context interface{},
responseChan chan *v3io.Response) (*v3io.Request, error) {
c.populateInputFields(&describeStreamInput.DataPlaneInput)
return c.session.context.DescribeStream(describeStreamInput, context, responseChan)
}

// DescribeStreamSync
func (c *container) DescribeStreamSync(describeStreamInput *v3io.DescribeStreamInput) (*v3io.Response, error) {
c.populateInputFields(&describeStreamInput.DataPlaneInput)
return c.session.context.DescribeStreamSync(describeStreamInput)
}

// DeleteStream
func (c *container) DeleteStream(deleteStreamInput *v3io.DeleteStreamInput, context interface{}, responseChan chan *v3io.Response) (*v3io.Request, error) {
c.populateInputFields(&deleteStreamInput.DataPlaneInput)
Expand Down
Loading

0 comments on commit 1d2df5f

Please sign in to comment.