-
Notifications
You must be signed in to change notification settings - Fork 204
/
Copy pathdata_converter.go
158 lines (128 loc) · 4.32 KB
/
data_converter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package encryption
import (
"context"
"fmt"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/workflow"
)
const (
// MetadataEncodingEncrypted is "binary/encrypted"
MetadataEncodingEncrypted = "binary/encrypted"
// MetadataEncryptionKeyID is "encryption-key-id"
MetadataEncryptionKeyID = "encryption-key-id"
)
type DataConverter struct {
// Until EncodingDataConverter supports workflow.ContextAware we'll store parent here.
parent converter.DataConverter
converter.DataConverter
options DataConverterOptions
}
type DataConverterOptions struct {
KeyID string
// Enable ZLib compression before encryption.
Compress bool
}
// Codec implements PayloadCodec using AES Crypt.
type Codec struct {
KeyID string
}
// TODO: Implement workflow.ContextAware in CodecDataConverter
// Note that you only need to implement this function if you need to vary the encryption KeyID per workflow.
func (dc *DataConverter) WithWorkflowContext(ctx workflow.Context) converter.DataConverter {
if val, ok := ctx.Value(PropagateKey).(CryptContext); ok {
parent := dc.parent
if parentWithContext, ok := parent.(workflow.ContextAware); ok {
parent = parentWithContext.WithWorkflowContext(ctx)
}
options := dc.options
options.KeyID = val.KeyID
return NewEncryptionDataConverter(parent, options)
}
return dc
}
// TODO: Implement workflow.ContextAware in EncodingDataConverter
// Note that you only need to implement this function if you need to vary the encryption KeyID per workflow.
func (dc *DataConverter) WithContext(ctx context.Context) converter.DataConverter {
if val, ok := ctx.Value(PropagateKey).(CryptContext); ok {
parent := dc.parent
if parentWithContext, ok := parent.(workflow.ContextAware); ok {
parent = parentWithContext.WithContext(ctx)
}
options := dc.options
options.KeyID = val.KeyID
return NewEncryptionDataConverter(parent, options)
}
return dc
}
func (e *Codec) getKey(keyID string) (key []byte) {
// Key must be fetched from secure storage in production (such as a KMS).
// For testing here we just hard code a key.
return []byte("test-key-test-key-test-key-test!")
}
// NewEncryptionDataConverter creates a new instance of EncryptionDataConverter wrapping a DataConverter
func NewEncryptionDataConverter(dataConverter converter.DataConverter, options DataConverterOptions) *DataConverter {
codecs := []converter.PayloadCodec{
&Codec{KeyID: options.KeyID},
}
// Enable compression if requested.
// Note that this must be done before encryption to provide any value. Encrypted data should by design not compress very well.
// This means the compression codec must come after the encryption codec here as codecs are applied last -> first.
if options.Compress {
codecs = append(codecs, converter.NewZlibCodec(converter.ZlibCodecOptions{AlwaysEncode: true}))
}
return &DataConverter{
parent: dataConverter,
DataConverter: converter.NewCodecDataConverter(dataConverter, codecs...),
options: options,
}
}
// Encode implements converter.PayloadCodec.Encode.
func (e *Codec) Encode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) {
result := make([]*commonpb.Payload, len(payloads))
for i, p := range payloads {
origBytes, err := p.Marshal()
if err != nil {
return payloads, err
}
key := e.getKey(e.KeyID)
b, err := encrypt(origBytes, key)
if err != nil {
return payloads, err
}
result[i] = &commonpb.Payload{
Metadata: map[string][]byte{
converter.MetadataEncoding: []byte(MetadataEncodingEncrypted),
MetadataEncryptionKeyID: []byte(e.KeyID),
},
Data: b,
}
}
return result, nil
}
// Decode implements converter.PayloadCodec.Decode.
func (e *Codec) Decode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) {
result := make([]*commonpb.Payload, len(payloads))
for i, p := range payloads {
// Only if it's encrypted
if string(p.Metadata[converter.MetadataEncoding]) != MetadataEncodingEncrypted {
result[i] = p
continue
}
keyID, ok := p.Metadata[MetadataEncryptionKeyID]
if !ok {
return payloads, fmt.Errorf("no encryption key id")
}
key := e.getKey(string(keyID))
b, err := decrypt(p.Data, key)
if err != nil {
return payloads, err
}
result[i] = &commonpb.Payload{}
err = result[i].Unmarshal(b)
if err != nil {
return payloads, err
}
}
return result, nil
}