-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathsingledecoder.go
146 lines (132 loc) · 4.28 KB
/
singledecoder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package avro
import (
"context"
"fmt"
"reflect"
"sync"
)
// DecodingRegistry is used by SingleDecoder to find information
// about schema identifiers in messages.
type DecodingRegistry interface {
// DecodeSchemaID returns the schema ID header of the message
// and the bare message without schema information.
// A schema ID is specific to the DecodingRegistry instance - within
// a given DecodingRegistry instance (only), a given schema ID
// must always correspond to the same schema.
//
// If the message isn't valid, DecodeSchemaID should return (0, nil).
DecodeSchemaID(msg []byte) (int64, []byte)
// SchemaForID returns the schema for the given ID.
SchemaForID(ctx context.Context, id int64) (*Type, error)
}
type decoderSchemaPair struct {
t reflect.Type
schemaID int64
}
// SingleDecoder decodes messages in Avro binary format.
// Each message includes a header or wrapper that indicates the schema
// used to encode the message.
//
// A DecodingRegistry is used to retrieve the schema for a given message
// or to find the encoding for a given schema.
//
// To encode or decode a stream of messages that all use the same
// schema, use StreamEncoder or StreamDecoder instead.
type SingleDecoder struct {
registry DecodingRegistry
names *Names
// mu protects the fields below.
// We might be better off with a couple of sync.Maps here, but this is a bit easier on the brain.
mu sync.RWMutex
// writerTypes holds a cache of the schemas previously encountered when
// decoding messages.
writerTypes map[int64]*Type
// programs holds the programs previously created when decoding.
programs map[decoderSchemaPair]*decodeProgram
}
// NewSingleDecoder returns a new SingleDecoder that uses g to determine
// the schema of each message that's marshaled or unmarshaled.
//
// Go values unmarshaled through Unmarshal will have their Avro schemas
// translated with the given Names instance. If names is nil, the global
// namespace will be used.
func NewSingleDecoder(r DecodingRegistry, names *Names) *SingleDecoder {
if names == nil {
names = globalNames
}
return &SingleDecoder{
registry: r,
writerTypes: make(map[int64]*Type),
programs: make(map[decoderSchemaPair]*decodeProgram),
names: names,
}
}
// Unmarshal unmarshals the given message into x. The body
// of the message is unmarshaled as with the Unmarshal function.
//
// It needs the context argument because it might end up
// fetching schema data over the network via the DecodingRegistry.
//
// Unmarshal returns the actual type that was decoded into.
func (c *SingleDecoder) Unmarshal(ctx context.Context, data []byte, x interface{}) (*Type, error) {
v := reflect.ValueOf(x)
if v.Kind() != reflect.Ptr {
return nil, fmt.Errorf("cannot decode into non-pointer value %T", x)
}
v = v.Elem()
vt := v.Type()
wID, body := c.registry.DecodeSchemaID(data)
if wID == 0 && body == nil {
return nil, fmt.Errorf("cannot get schema ID from message")
}
prog, err := c.getProgram(ctx, vt, wID)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal: %w", err)
}
return unmarshal(nil, body, prog, v)
}
func (c *SingleDecoder) getProgram(ctx context.Context, vt reflect.Type, wID int64) (*decodeProgram, error) {
c.mu.RLock()
if prog := c.programs[decoderSchemaPair{vt, wID}]; prog != nil {
c.mu.RUnlock()
return prog, nil
}
if debugging {
debugf("no hit found for program %T schemaID %v", vt, wID)
}
wType := c.writerTypes[wID]
c.mu.RUnlock()
var err error
if wType != nil {
if es, ok := wType.avroType.(errorSchema); ok {
return nil, es.err
}
} else {
// We haven't seen the writer schema before, so try to fetch it.
wType, err = c.registry.SchemaForID(ctx, wID)
// TODO look at the SchemaForID error
// and return an error without caching it if it's temporary?
// See https://github.com/heetch/avro/issues/39
}
c.mu.Lock()
defer c.mu.Unlock()
if err != nil {
c.writerTypes[wID] = &Type{
avroType: errorSchema{err: err},
}
return nil, err
}
if prog := c.programs[decoderSchemaPair{vt, wID}]; prog != nil {
// Someone else got there first.
return prog, nil
}
prog, err := compileDecoder(c.names, vt, wType)
if err != nil {
c.writerTypes[wID] = &Type{
avroType: errorSchema{err: err},
}
return nil, err
}
c.programs[decoderSchemaPair{vt, wID}] = prog
return prog, nil
}