-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathosc.go
184 lines (164 loc) · 4.31 KB
/
osc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package osc
import (
"bytes"
"context"
"encoding/binary"
"net"
"strings"
"github.com/pkg/errors"
)
const (
// MessageChar is the first character of any valid OSC message.
MessageChar = '/'
)
// Typetag constants.
const (
TypetagPrefix byte = ','
TypetagInt byte = 'i'
TypetagFloat byte = 'f'
TypetagString byte = 's'
TypetagBlob byte = 'b'
TypetagFalse byte = 'F'
TypetagTrue byte = 'T'
)
var (
byteOrder = binary.BigEndian
)
// Packet is an OSC packet.
// An OSC packet consists of its contents, a contiguous block
// of binary data, and its size, the number of 8-bit bytes
// that comprise the contents. The size of an OSC packet
// is always a multiple of 4.
type Packet interface {
Bytes() []byte
Equal(other Packet) bool
}
// ToBytes returns an OSC representation of the given string.
// This means that the returned byte slice is padded with null bytes
// so that it's length is a multiple of 4.
func ToBytes(s string) []byte {
if len(s) == 0 {
return []byte{}
}
return Pad(append([]byte(s), 0))
}
// Pad pads a slice of bytes with null bytes so that it's length is a multiple of 4.
func Pad(b []byte) []byte {
for i := len(b); (i % 4) != 0; i++ {
b = append(b, 0)
}
return b
}
// ReadString reads a string from a byte slice.
// If the byte slice does not have any null bytes,
// then one is appended to the end.
// If the length of the byte slice is not a multiple of 4
// we append as many null bytes as we need to make this true
// before converting to a string.
// What this means is that the second return value, which is
// the number of bytes that are consumed to create the string is
// always a multiple of 4.
// We also strip off any trailing null bytes in the returned string.
func ReadString(data []byte) (string, int64) {
if len(data) == 0 {
return "", 0
}
nullidx := bytes.IndexByte(data, 0)
if nullidx == -1 {
data = append(data, 0)
nullidx = len(data) - 1
}
data = Pad(data[:nullidx+1])
return string(bytes.TrimRight(data, "\x00")), int64(len(data))
}
// ReadBlob reads a blob of the given length from the given slice of bytes.
func ReadBlob(length int32, data []byte) ([]byte, int64) {
l := length
if length > int32(len(data)) {
l = int32(len(data))
}
var idx int32
for idx = l; (idx % 4) != 0; idx++ {
if idx >= int32(len(data)) {
data = append(data, 0)
}
}
return data[:idx], int64(idx)
}
// Incoming represents incoming data.
type Incoming struct {
Data []byte
Sender net.Addr
}
type netWriter interface {
SetWriteBuffer(bytes int) error
WriteTo([]byte, net.Addr) (int, error)
}
func checkDispatcher(dispatcher Dispatcher) error {
if dispatcher == nil {
return ErrNilDispatcher
}
messageHandlers, ok := dispatcher.(PatternMatching)
if ok {
for addr := range messageHandlers {
if err := ValidateAddress(addr); err != nil {
return err
}
}
}
return nil
}
// readSender knows how to read bytes and return the net.Addr
// of the sender of the bytes.
type readSender interface {
CloseChan() <-chan struct{}
Context() context.Context
read([]byte) (int, net.Addr, error)
}
func serve(r readSender, numWorkers int, exactMatch bool, dispatcher Dispatcher) error {
if err := checkDispatcher(dispatcher); err != nil {
return err
}
var (
errChan = make(chan error)
ready = make(chan worker, numWorkers)
)
for i := 0; i < numWorkers; i++ {
go worker{
DataChan: make(chan Incoming),
Dispatcher: dispatcher,
ErrChan: errChan,
Ready: ready,
ExactMatch: exactMatch,
}.run()
}
go workerLoop(r, ready, errChan)
// If the connection is closed or the context is canceled then stop serving.
select {
case err := <-errChan:
return errors.Wrap(err, "error serving udp")
case <-r.CloseChan():
case <-r.Context().Done():
return r.Context().Err()
}
return nil
}
func workerLoop(r readSender, ready chan worker, errChan chan error) {
for {
data := make([]byte, bufSize)
_, sender, err := r.read(data)
if err != nil {
// Tried non-blocking select on closeChan right before ReadFromUDP
// but that didn't stop us from reading a closed connection. [briansorahan]
if strings.Contains(err.Error(), "use of closed network connection") {
return
}
errChan <- err
return
}
// Get the next worker.
worker := <-ready
// Assign them the data we just read.
worker.DataChan <- Incoming{Data: data, Sender: sender}
}
}