From 6f4456a491df177b8bb6e3fa42f7266378e595a3 Mon Sep 17 00:00:00 2001 From: Henning Kulander Date: Wed, 29 Nov 2023 14:29:49 +0100 Subject: [PATCH] Caching registry fetch errors forever leads to PODs ending needing restart to fix temporary errors like routing after redeploy. - Cache fetch errors for one minute to avoid overloading registry server. - Should fix context cancelled loops seen after redeploy. --- gotype.go | 46 ++++++++++++++++++++++++---------------------- singledecoder.go | 15 +++++++-------- 2 files changed, 31 insertions(+), 30 deletions(-) diff --git a/gotype.go b/gotype.go index d5e4f10..ea0670e 100644 --- a/gotype.go +++ b/gotype.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "strings" + "time" "github.com/actgardner/gogen-avro/v10/schema" @@ -29,7 +30,8 @@ var globalNames = new(Names) // In fact it just holds an error so that we can cache errors. type errorSchema struct { schema.AvroType - err error + err error + invalidAfter time.Time } // TypeOf returns the Avro type for the Go type of x. @@ -40,30 +42,30 @@ type errorSchema struct { // Otherwise TypeOf(T) is derived according to // the following rules: // -// - int, int64 and uint32 encode as "long" -// - int32, int16, uint16, int8 and uint8 encode as "int" -// - float32 encodes as "float" -// - float64 encodes as "double" -// - string encodes as "string" -// - Null{} encodes as "null" -// - time.Duration encodes as {"type": "long", "logicalType": "duration-nanos"} -// - time.Time encodes as {"type": "long", "logicalType": "timestamp-micros"} -// - github.com/google/uuid.UUID encodes as {"type": "string", "logicalType": "string"} -// - [N]byte encodes as {"type": "fixed", "name": "go.FixedN", "size": N} -// - a named type with underlying type [N]byte encodes as [N]byte but typeName(T) for the name. -// - []T encodes as {"type": "array", "items": TypeOf(T)} -// - map[string]T encodes as {"type": "map", "values": TypeOf(T)} -// - *T encodes as ["null", TypeOf(T)] -// - a named struct type encodes as {"type": "record", "name": typeName(T), "fields": ...} -// where the fields are encoded as described below. -// - interface types are disallowed. +// - int, int64 and uint32 encode as "long" +// - int32, int16, uint16, int8 and uint8 encode as "int" +// - float32 encodes as "float" +// - float64 encodes as "double" +// - string encodes as "string" +// - Null{} encodes as "null" +// - time.Duration encodes as {"type": "long", "logicalType": "duration-nanos"} +// - time.Time encodes as {"type": "long", "logicalType": "timestamp-micros"} +// - github.com/google/uuid.UUID encodes as {"type": "string", "logicalType": "string"} +// - [N]byte encodes as {"type": "fixed", "name": "go.FixedN", "size": N} +// - a named type with underlying type [N]byte encodes as [N]byte but typeName(T) for the name. +// - []T encodes as {"type": "array", "items": TypeOf(T)} +// - map[string]T encodes as {"type": "map", "values": TypeOf(T)} +// - *T encodes as ["null", TypeOf(T)] +// - a named struct type encodes as {"type": "record", "name": typeName(T), "fields": ...} +// where the fields are encoded as described below. +// - interface types are disallowed. // // Struct fields are encoded as follows: // -// - unexported struct fields are ignored -// - the field name is taken from the Go field name, or from a "json" tag for the field if present. -// - the default value for the field is the zero value for the type. -// - anonymous struct fields are disallowed (this restriction may be lifted in the future). +// - unexported struct fields are ignored +// - the field name is taken from the Go field name, or from a "json" tag for the field if present. +// - the default value for the field is the zero value for the type. +// - anonymous struct fields are disallowed (this restriction may be lifted in the future). func TypeOf(x interface{}) (*Type, error) { return globalNames.TypeOf(x) } diff --git a/singledecoder.go b/singledecoder.go index 4e0e674..6b43570 100644 --- a/singledecoder.go +++ b/singledecoder.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "sync" + "time" ) // DecodingRegistry is used by SingleDecoder to find information @@ -111,21 +112,19 @@ func (c *SingleDecoder) getProgram(ctx context.Context, vt reflect.Type, wID int var err error if wType != nil { - if es, ok := wType.avroType.(errorSchema); ok { + if es, ok := wType.avroType.(errorSchema); ok && (es.invalidAfter.IsZero() || time.Now().Before(es.invalidAfter)) { return nil, es.err } - } else { - // We haven't seen the writer schema before, so try to fetch it. - wType, err = c.registry.SchemaForID(ctx, wID) - // TODO look at the SchemaForID error - // and return an error without caching it if it's temporary? - // See https://github.com/heetch/avro/issues/39 } + + // We haven't seen the writer schema before or enough time has passed since fetching failed, so try to fetch it. + wType, err = c.registry.SchemaForID(ctx, wID) + c.mu.Lock() defer c.mu.Unlock() if err != nil { c.writerTypes[wID] = &Type{ - avroType: errorSchema{err: err}, + avroType: errorSchema{err: err, invalidAfter: time.Now().Add(1 * time.Minute)}, } return nil, err }