Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caching registry fetch errors forever leads to PODs ending needing re… #1

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
20 changes: 20 additions & 0 deletions avroregistry/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package avroregistry

import (
"fmt"
)

// UnavailableError reports an error when the schema registry is unavailable.
type UnavailableError struct {
Cause error
}

// Error implements the error interface.
func (m *UnavailableError) Error() string {
return fmt.Sprintf("schema registry unavailability caused by: %v", m.Cause)
}

// Unwrap unwraps and return Cause error. It is needed to properly handle and compare errors.
func (e *UnavailableError) Unwrap() error {
return e.Cause
}
34 changes: 34 additions & 0 deletions avroregistry/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package avroregistry_test

import (
"errors"
"testing"

qt "github.com/frankban/quicktest"

"github.com/heetch/avro/avroregistry"
)

func TestUnavailableError_Unwrap(t *testing.T) {
c := qt.New(t)
var ErrExpect = errors.New("error")

err := &avroregistry.UnavailableError{
Cause: ErrExpect,
}

c.Assert(errors.Is(err, ErrExpect), qt.IsTrue)

var newErr *avroregistry.UnavailableError
c.Assert(errors.As(err, &newErr), qt.IsTrue)
}

func TestUnavailableError_Error(t *testing.T) {
c := qt.New(t)

err := &avroregistry.UnavailableError{
Cause: errors.New("ECONNREFUSED"),
}

c.Assert(err.Error(), qt.Equals, "schema registry unavailability caused by: ECONNREFUSED")
}
28 changes: 20 additions & 8 deletions avroregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,27 +190,39 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error {
resp, err := http.DefaultClient.Do(req)
if err != nil {
if !attempt.More() || !isTemporaryError(err) {
return err
return &UnavailableError{err}
}
continue
}
err = unmarshalResponse(req, resp, result)
if err == nil {
return nil
}
if !attempt.More() {
return err
}
if err, ok := err.(*apiError); ok && err.StatusCode/100 != 5 {
// It's not a 5xx error. We want to retry on 5xx
if apiErr, ok := err.(*apiError); ok {
// We want to retry on 5xx
// errors, because the Confluent Avro registry
// can occasionally return them as a matter of
// course (and there could also be an
// unavailable service that we're reaching
// through a proxy).
if apiErr.StatusCode/100 == 5 {
err = &UnavailableError{apiErr}
} else {
return apiErr
}
} else {
// some 5XX response body cannot be decoded
// hence an *apiError is not returned
if resp.StatusCode/100 == 5 {
err = &UnavailableError{err}
}
}

if !attempt.More() {
return err
}
}

if attempt.Stopped() {
return ctx.Err()
}
Expand All @@ -228,13 +240,13 @@ func unmarshalResponse(req *http.Request, resp *http.Response, result interface{
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
if err := httprequest.UnmarshalJSONResponse(resp, result); err != nil {
return fmt.Errorf("cannot unmarshal JSON response from %v: %v", req.URL, err)
return fmt.Errorf("cannot unmarshal JSON response from %v: %w", req.URL, err)
}
return nil
}
var apiErr apiError
if err := httprequest.UnmarshalJSONResponse(resp, &apiErr); err != nil {
return fmt.Errorf("cannot unmarshal JSON error response from %v: %v", req.URL, err)
return fmt.Errorf("cannot unmarshal JSON error response from %v: %w", req.URL, err)
}
apiErr.StatusCode = resp.StatusCode
return &apiErr
Expand Down
31 changes: 28 additions & 3 deletions avroregistry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,31 @@ func TestRegister(t *testing.T) {
c.Assert(id1, qt.Equals, id)
}

func TestSchemaRegistryUnavailableError(t *testing.T) {
c := qt.New(t)
ctx := context.Background()

testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

}))

// close the server
testServer.Close()

registry, err := avroregistry.New(avroregistry.Params{
ServerURL: testServer.URL,
RetryStrategy: noRetry,
})
c.Assert(err, qt.IsNil)

type R struct {
X int
}

_, err = registry.Register(ctx, randomString(), schemaOf(nil, R{}))
c.Assert(err, qt.ErrorMatches, "schema registry unavailability caused by: .*")
}

func TestRegisterWithEmptyStruct(t *testing.T) {
c := qt.New(t)

Expand Down Expand Up @@ -255,7 +280,7 @@ func TestRetryOnError(t *testing.T) {
c.Assert(err, qt.Equals, nil)
t0 := time.Now()
err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive)
c.Assert(err, qt.ErrorMatches, `Put "?http://0.1.2.3/config/x"?: temporary test error true`)
c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: Put "?http://0.1.2.3/config/x"?: temporary test error true`)
if d := time.Since(t0); d < 30*time.Millisecond {
c.Errorf("retry duration too small, want >=30ms got %v", d)
}
Expand Down Expand Up @@ -315,7 +340,7 @@ func TestRetryOn500(t *testing.T) {
// an error.
failCount = 5
err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive)
c.Assert(err, qt.ErrorMatches, `Avro registry error \(code 50001; HTTP status 500\): Failed to update compatibility level`)
c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: Avro registry error \(code 50001; HTTP status 500\): Failed to update compatibility level`)
}

func TestNoRetryOnNon5XXStatus(t *testing.T) {
Expand Down Expand Up @@ -367,7 +392,7 @@ func TestUnavailableError(t *testing.T) {
})
c.Assert(err, qt.Equals, nil)
err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive)
c.Assert(err, qt.ErrorMatches, `cannot unmarshal JSON error response from .*/config/x: unexpected content type text/html; want application/json; content: 502 Proxy Error; Proxy Error; The whole world is bogus`)
c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: cannot unmarshal JSON error response from .*/config/x: unexpected content type text/html; want application/json; content: 502 Proxy Error; Proxy Error; The whole world is bogus`)
}

var schemaEquivalenceTests = []struct {
Expand Down
46 changes: 24 additions & 22 deletions gotype.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"reflect"
"strings"
"time"

"github.com/actgardner/gogen-avro/v10/schema"

Expand All @@ -29,7 +30,8 @@ var globalNames = new(Names)
// In fact it just holds an error so that we can cache errors.
type errorSchema struct {
schema.AvroType
err error
err error
invalidAfter time.Time
}

// TypeOf returns the Avro type for the Go type of x.
Expand All @@ -40,30 +42,30 @@ type errorSchema struct {
// Otherwise TypeOf(T) is derived according to
// the following rules:
//
// - int, int64 and uint32 encode as "long"
// - int32, int16, uint16, int8 and uint8 encode as "int"
// - float32 encodes as "float"
// - float64 encodes as "double"
// - string encodes as "string"
// - Null{} encodes as "null"
// - time.Duration encodes as {"type": "long", "logicalType": "duration-nanos"}
// - time.Time encodes as {"type": "long", "logicalType": "timestamp-micros"}
// - github.com/google/uuid.UUID encodes as {"type": "string", "logicalType": "string"}
// - [N]byte encodes as {"type": "fixed", "name": "go.FixedN", "size": N}
// - a named type with underlying type [N]byte encodes as [N]byte but typeName(T) for the name.
// - []T encodes as {"type": "array", "items": TypeOf(T)}
// - map[string]T encodes as {"type": "map", "values": TypeOf(T)}
// - *T encodes as ["null", TypeOf(T)]
// - a named struct type encodes as {"type": "record", "name": typeName(T), "fields": ...}
// where the fields are encoded as described below.
// - interface types are disallowed.
// - int, int64 and uint32 encode as "long"
// - int32, int16, uint16, int8 and uint8 encode as "int"
// - float32 encodes as "float"
// - float64 encodes as "double"
// - string encodes as "string"
// - Null{} encodes as "null"
// - time.Duration encodes as {"type": "long", "logicalType": "duration-nanos"}
// - time.Time encodes as {"type": "long", "logicalType": "timestamp-micros"}
// - github.com/google/uuid.UUID encodes as {"type": "string", "logicalType": "string"}
// - [N]byte encodes as {"type": "fixed", "name": "go.FixedN", "size": N}
// - a named type with underlying type [N]byte encodes as [N]byte but typeName(T) for the name.
// - []T encodes as {"type": "array", "items": TypeOf(T)}
// - map[string]T encodes as {"type": "map", "values": TypeOf(T)}
// - *T encodes as ["null", TypeOf(T)]
// - a named struct type encodes as {"type": "record", "name": typeName(T), "fields": ...}
// where the fields are encoded as described below.
// - interface types are disallowed.
//
// Struct fields are encoded as follows:
//
// - unexported struct fields are ignored
// - the field name is taken from the Go field name, or from a "json" tag for the field if present.
// - the default value for the field is the zero value for the type.
// - anonymous struct fields are disallowed (this restriction may be lifted in the future).
// - unexported struct fields are ignored
// - the field name is taken from the Go field name, or from a "json" tag for the field if present.
// - the default value for the field is the zero value for the type.
// - anonymous struct fields are disallowed (this restriction may be lifted in the future).
func TypeOf(x interface{}) (*Type, error) {
return globalNames.TypeOf(x)
}
Expand Down
17 changes: 8 additions & 9 deletions singledecoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"reflect"
"sync"
"time"
)

// DecodingRegistry is used by SingleDecoder to find information
Expand Down Expand Up @@ -92,7 +93,7 @@ func (c *SingleDecoder) Unmarshal(ctx context.Context, data []byte, x interface{
}
prog, err := c.getProgram(ctx, vt, wID)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal: %v", err)
return nil, fmt.Errorf("cannot unmarshal: %w", err)
}
return unmarshal(nil, body, prog, v)
}
Expand All @@ -111,21 +112,19 @@ func (c *SingleDecoder) getProgram(ctx context.Context, vt reflect.Type, wID int

var err error
if wType != nil {
if es, ok := wType.avroType.(errorSchema); ok {
if es, ok := wType.avroType.(errorSchema); ok && (es.invalidAfter.IsZero() || time.Now().Before(es.invalidAfter)) {
return nil, es.err
}
} else {
// We haven't seen the writer schema before, so try to fetch it.
wType, err = c.registry.SchemaForID(ctx, wID)
// TODO look at the SchemaForID error
// and return an error without caching it if it's temporary?
// See https://github.com/heetch/avro/issues/39
}

// We haven't seen the writer schema before or enough time has passed since fetching failed, so try to fetch it.
wType, err = c.registry.SchemaForID(ctx, wID)

c.mu.Lock()
defer c.mu.Unlock()
if err != nil {
c.writerTypes[wID] = &Type{
avroType: errorSchema{err: err},
avroType: errorSchema{err: err, invalidAfter: time.Now().Add(1 * time.Minute)},
}
return nil, err
}
Expand Down