-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathhandlers.go
126 lines (115 loc) · 3.94 KB
/
handlers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package main
/*
type rpcHandler func(context.Context, interface{}, network.Stream) error
// adapted from prysm's handler router - https://github.com/prysmaticlabs/prysm/blob/4e28192541625e2e7828928430dbc72eb6c075c4/beacon-chain/sync/rpc.go#L109
func setHandler(h host.Host, baseTopic string, handler rpcHandler) {
encoding := &encoder.SszNetworkEncoder{}
topic := baseTopic + encoding.ProtocolSuffix()
h.SetStreamHandler(protocol.ID(topic), func(stream network.Stream) {
defer func() {
if r := recover(); r != nil {
log.Printf("Panic occurred: %v", r)
log.Printf("%s", debug.Stack())
}
}()
// Resetting after closing is a no-op so defer a reset in case something goes wrong.
// It's up to the handler to Close the stream (send an EOF) if
// it successfully writes a response. We don't blindly call
// Close here because we may have only written a partial
// response.
defer func() {
_err := stream.Reset()
_ = _err
}()
base, ok := p2p.RPCTopicMappings[baseTopic]
if !ok {
log.Printf("ERROR: Could not retrieve base message for topic %s", baseTopic)
return
}
bb := base
t := reflect.TypeOf(base)
// Copy Base
base = reflect.New(t)
if baseTopic == p2p.RPCMetaDataTopicV1 || baseTopic == p2p.RPCMetaDataTopicV2 {
if err := metadataHandler(context.Background(), base, stream); err != nil {
if err != types.ErrWrongForkDigestVersion {
log.Printf("ERROR: Could not handle p2p RPC: %v", err)
}
}
return
}
// Given we have an input argument that can be pointer or the actual object, this gives us
// a way to check for its reflect.Kind and based on the result, we can decode
// accordingly.
if t.Kind() == reflect.Ptr {
msg, ok := reflect.New(t.Elem()).Interface().(ssz.Unmarshaler)
if !ok {
log.Printf("ERROR: message of %T ptr does not support marshaller interface. topic=%s", bb, baseTopic)
return
}
if err := encoding.DecodeWithMaxLength(stream, msg); err != nil {
log.Printf("ERROR: could not decode stream message: %v", err)
return
}
if err := handler(context.Background(), msg, stream); err != nil {
if err != types.ErrWrongForkDigestVersion {
log.Printf("ERROR: Could not handle p2p RPC: %v", err)
}
}
} else {
nTyp := reflect.New(t)
msg, ok := nTyp.Interface().(ssz.Unmarshaler)
if !ok {
log.Printf("ERROR: message of %T does not support marshaller interface", msg)
return
}
if err := handler(context.Background(), msg, stream); err != nil {
if err != types.ErrWrongForkDigestVersion {
log.Printf("ERROR: Could not handle p2p RPC: %v", err)
}
}
}
})
}
func dummyMetadata() metadata.Metadata {
metaData := ðpb.MetaDataV1{
SeqNumber: 0,
Attnets: bitfield.NewBitvector64(),
Syncnets: bitfield.Bitvector4{byte(0x00)},
}
return wrapper.WrappedMetadataV1(metaData)
}
// pingHandler reads the incoming ping rpc message from the peer.
func pingHandler(_ context.Context, _ interface{}, stream network.Stream) error {
encoding := &encoder.SszNetworkEncoder{}
defer closeStream(stream)
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
return err
}
m := dummyMetadata()
sq := consensustypes.SSZUint64(m.SequenceNumber())
if _, err := encoding.EncodeWithMaxLength(stream, &sq); err != nil {
return fmt.Errorf("%w: pingHandler stream write", err)
}
return nil
}
// metadataHandler spoofs a valid looking metadata message
func metadataHandler(_ context.Context, _ interface{}, stream network.Stream) error {
encoding := &encoder.SszNetworkEncoder{}
defer closeStream(stream)
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
return err
}
// write a dummy metadata message to satify the client handshake
m := dummyMetadata()
if _, err := encoding.EncodeWithMaxLength(stream, m); err != nil {
return fmt.Errorf("%w: metadata stream write", err)
}
return nil
}
func closeStream(stream network.Stream) {
if err := stream.Close(); err != nil {
log.Println(err)
}
}
*/