Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caching registry fetch errors forever leads to PODs ending needing re… #127

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions gotype.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"reflect"
"strings"
"time"

"github.com/actgardner/gogen-avro/v10/schema"

Expand All @@ -29,7 +30,8 @@ var globalNames = new(Names)
// In fact it just holds an error so that we can cache errors.
type errorSchema struct {
schema.AvroType
err error
err error
invalidAfter time.Time
}

// TypeOf returns the Avro type for the Go type of x.
Expand All @@ -40,30 +42,30 @@ type errorSchema struct {
// Otherwise TypeOf(T) is derived according to
// the following rules:
//
// - int, int64 and uint32 encode as "long"
// - int32, int16, uint16, int8 and uint8 encode as "int"
// - float32 encodes as "float"
// - float64 encodes as "double"
// - string encodes as "string"
// - Null{} encodes as "null"
// - time.Duration encodes as {"type": "long", "logicalType": "duration-nanos"}
// - time.Time encodes as {"type": "long", "logicalType": "timestamp-micros"}
// - github.com/google/uuid.UUID encodes as {"type": "string", "logicalType": "string"}
// - [N]byte encodes as {"type": "fixed", "name": "go.FixedN", "size": N}
// - a named type with underlying type [N]byte encodes as [N]byte but typeName(T) for the name.
// - []T encodes as {"type": "array", "items": TypeOf(T)}
// - map[string]T encodes as {"type": "map", "values": TypeOf(T)}
// - *T encodes as ["null", TypeOf(T)]
// - a named struct type encodes as {"type": "record", "name": typeName(T), "fields": ...}
// where the fields are encoded as described below.
// - interface types are disallowed.
// - int, int64 and uint32 encode as "long"
// - int32, int16, uint16, int8 and uint8 encode as "int"
// - float32 encodes as "float"
// - float64 encodes as "double"
// - string encodes as "string"
// - Null{} encodes as "null"
// - time.Duration encodes as {"type": "long", "logicalType": "duration-nanos"}
// - time.Time encodes as {"type": "long", "logicalType": "timestamp-micros"}
// - github.com/google/uuid.UUID encodes as {"type": "string", "logicalType": "string"}
// - [N]byte encodes as {"type": "fixed", "name": "go.FixedN", "size": N}
// - a named type with underlying type [N]byte encodes as [N]byte but typeName(T) for the name.
// - []T encodes as {"type": "array", "items": TypeOf(T)}
// - map[string]T encodes as {"type": "map", "values": TypeOf(T)}
// - *T encodes as ["null", TypeOf(T)]
// - a named struct type encodes as {"type": "record", "name": typeName(T), "fields": ...}
// where the fields are encoded as described below.
// - interface types are disallowed.
//
// Struct fields are encoded as follows:
//
// - unexported struct fields are ignored
// - the field name is taken from the Go field name, or from a "json" tag for the field if present.
// - the default value for the field is the zero value for the type.
// - anonymous struct fields are disallowed (this restriction may be lifted in the future).
// - unexported struct fields are ignored
// - the field name is taken from the Go field name, or from a "json" tag for the field if present.
// - the default value for the field is the zero value for the type.
// - anonymous struct fields are disallowed (this restriction may be lifted in the future).
func TypeOf(x interface{}) (*Type, error) {
return globalNames.TypeOf(x)
}
Expand Down
15 changes: 7 additions & 8 deletions singledecoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"reflect"
"sync"
"time"
)

// DecodingRegistry is used by SingleDecoder to find information
Expand Down Expand Up @@ -111,21 +112,19 @@ func (c *SingleDecoder) getProgram(ctx context.Context, vt reflect.Type, wID int

var err error
if wType != nil {
if es, ok := wType.avroType.(errorSchema); ok {
if es, ok := wType.avroType.(errorSchema); ok && (es.invalidAfter.IsZero() || time.Now().Before(es.invalidAfter)) {
return nil, es.err
}
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I'm missing something but aren't we discarding the "normal" case here where wType is not nil and does not contain a schema error?
Or is that case already covered by checking c.programs in the beginning of the func call?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct. The c.programs in the beginning is the normal case where a compiled decoder for the schema is ready.

When a complied decoder is not ready, we need to fetch it. This could fail due to network errors, busy servers etc. In these cases we need to retry the fetch later, if not the state will be bad until the program is restarted. This is where invalidAfter comes into play. The next time the schema is requested, we find wType, and this will be an error with invalidAfter, so we can decide if we are retry to retry or not. Some errors are permanent, for example if we have a response from the registry, but we cannot compile a decoder for it. In this case, invalidAfter will be IsZero(), and we will never retry. To fix this state, a new schema must be uploaded, or the program restarted.

I have added some comments to make this more clear.

// We haven't seen the writer schema before, so try to fetch it.
wType, err = c.registry.SchemaForID(ctx, wID)
// TODO look at the SchemaForID error
// and return an error without caching it if it's temporary?
// See https://github.com/heetch/avro/issues/39
}

// We haven't seen the writer schema before or enough time has passed since fetching failed, so try to fetch it.
wType, err = c.registry.SchemaForID(ctx, wID)

c.mu.Lock()
defer c.mu.Unlock()
if err != nil {
c.writerTypes[wID] = &Type{
avroType: errorSchema{err: err},
avroType: errorSchema{err: err, invalidAfter: time.Now().Add(1 * time.Minute)},
hennikul marked this conversation as resolved.
Show resolved Hide resolved
}
return nil, err
}
Expand Down