-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathreverse_server.go
254 lines (225 loc) · 7.92 KB
/
reverse_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
package grpctunnel
import (
"context"
"io"
"sync"
"github.com/fullstorydev/grpchan"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"github.com/jhump/grpctunnel/tunnelpb"
)
type reverseTunnelServerState int
const (
stateActive = reverseTunnelServerState(iota)
stateClosing
stateClosed
)
// ReverseTunnelServer is a server that can run on the client side of a gRPC
// connection, handling requests sent over a reverse tunnel.
//
// Callers must first call NewReverseTunnelServer to create a new instance.
// Then callers register server handlers with the server and use the Serve
// method to actually create a reverse tunnel and handle requests.
type ReverseTunnelServer struct {
stub tunnelpb.TunnelServiceClient
opts tunnelOpts
handlers grpchan.HandlerMap
mu sync.Mutex
instances map[tunnelpb.TunnelService_OpenReverseTunnelClient]struct{}
wg sync.WaitGroup
state reverseTunnelServerState
}
// NewReverseTunnelServer creates a new server that uses the given stub to
// create reverse tunnels.
func NewReverseTunnelServer(stub tunnelpb.TunnelServiceClient, opts ...TunnelOption) *ReverseTunnelServer {
r := &ReverseTunnelServer{
stub: stub,
handlers: grpchan.HandlerMap{},
instances: map[tunnelpb.TunnelService_OpenReverseTunnelClient]struct{}{},
}
for _, opt := range opts {
opt.apply(&r.opts)
}
return r
}
// RegisterService implements grpc.ServiceRegistrar. This allows you to use this
// server when calling registration functions generated by the Go gRPC plugin
// for protobuf. For example:
//
// revTunnelSvr := NewReverseTunnelServer(tunnelClient)
// foo.RegisterFooServiceServer(revTunnelSvr, myFooServiceImpl{})
//
// All services registered will be available for the other end of the tunnel to
// invoke.
func (s *ReverseTunnelServer) RegisterService(desc *grpc.ServiceDesc, srv interface{}) {
s.handlers.RegisterService(desc, srv)
}
// Serve creates a new reverse tunnel and handles incoming RPC requests that
// arrive over that reverse tunnel. Since this is a reverse tunnel, RPC requests
// are initiated by the server, and this end (the client) processes the requests
// and sends responses.
//
// The boolean return value indicates whether the tunnel was created or not. If
// false, the returned error indicates the reason the tunnel could not be
// created. If true, the tunnel could be created and requests could be serviced.
// In this case, the returned error indicates the reason the tunnel was stopped.
// This will be nil if the stream was closed by the other side of the tunnel
// (the server, acting as an RPC client, hanging up).
//
// Reasons for the tunnel ending abnormally include detection of invalid usage
// of the stream (RPC client sending references to invalid stream IDs or sending
// frames for a stream ID in improper order) or if the stream itself fails (for
// example, if there is a network disruption or the given context is cancelled).
//
// Callers may call this repeatedly, to create multiple, concurrent tunnels to
// the gRPC server associated with the stub used to create this reverse tunnel
// server.
func (s *ReverseTunnelServer) Serve(ctx context.Context, opts ...grpc.CallOption) (started bool, err error) {
// TODO: validate options and maybe return an error
ctx = metadata.AppendToOutgoingContext(ctx, grpctunnelNegotiateKey, grpctunnelNegotiateVal)
stream, err := s.stub.OpenReverseTunnel(ctx, opts...)
if err != nil {
return false, err
}
respMD, err := stream.Header()
if err != nil {
return false, err
}
vals := respMD.Get(grpctunnelNegotiateKey)
clientAcceptsSettings := len(vals) > 0 && vals[0] == grpctunnelNegotiateVal
// TODO: we don't have a way to access outgoing metadata that gets added by
// client interceptors that may be run by the stub.
reqMD, _ := metadata.FromOutgoingContext(ctx)
stream = &threadSafeOpenReverseTunnelClient{TunnelService_OpenReverseTunnelClient: stream}
if err := s.addInstance(stream); err != nil {
return false, err
}
defer s.wg.Done()
err = serveTunnel(stream, reqMD, clientAcceptsSettings, &s.opts, s.handlers, s.isClosing)
if err == context.Canceled && ctx.Err() == nil && s.isClosed() {
// If we get back a cancelled error, but the given context is not
// cancelled and this server is closed, then the cancellation was
// caused by the server stopping. In that case, no need to report
// an error to caller.
err = nil
}
return true, err
}
func (s *ReverseTunnelServer) addInstance(stream tunnelpb.TunnelService_OpenReverseTunnelClient) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.state >= stateClosing {
return status.Errorf(codes.Unavailable, "server is shutting down")
}
s.wg.Add(1)
if s.instances == nil {
s.instances = map[tunnelpb.TunnelService_OpenReverseTunnelClient]struct{}{}
}
s.instances[stream] = struct{}{}
return nil
}
func (s *ReverseTunnelServer) isClosing() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.state >= stateClosing
}
func (s *ReverseTunnelServer) isClosed() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.state >= stateClosed
}
// Stop shuts down the server immediately. On return, the server has returned
// and any on-going operations have been cancelled.
func (s *ReverseTunnelServer) Stop() {
defer s.wg.Wait()
s.mu.Lock()
defer s.mu.Unlock()
if s.state == stateClosed {
// already stopping
return
}
s.state = stateClosed
// immediately stop all instances
for stream := range s.instances {
_ = stream.CloseSend()
}
}
// GracefulStop initiates graceful shutdown and waits for the server to stop.
// During graceful shutdown, no new operations will be allowed to start, but
// existing operations may proceed. The server stops after all existing
// operations complete.
//
// To given existing operations a time limit, the caller can also arrange to
// call Stop after some deadline, which forcibly terminates existing operations.
func (s *ReverseTunnelServer) GracefulStop() {
defer s.wg.Wait()
s.mu.Lock()
defer s.mu.Unlock()
if s.state != stateActive {
// already stopping
return
}
s.state = stateClosing
// we just set the flag to stop new streams, but allow
// existing streams to finish
}
type threadSafeOpenReverseTunnelClient struct {
tunnelpb.TunnelService_OpenReverseTunnelClient
sendMu sync.Mutex
closed bool
recvMu sync.Mutex
}
func (h *threadSafeOpenReverseTunnelClient) CloseSend() error {
h.sendMu.Lock()
defer h.sendMu.Unlock()
h.closed = true
return h.TunnelService_OpenReverseTunnelClient.CloseSend()
}
func (h *threadSafeOpenReverseTunnelClient) Send(msg *tunnelpb.ServerToClient) error {
return h.SendMsg(msg)
}
func (h *threadSafeOpenReverseTunnelClient) SendMsg(m interface{}) error {
h.sendMu.Lock()
defer h.sendMu.Unlock()
if h.closed {
return io.EOF
}
return h.TunnelService_OpenReverseTunnelClient.SendMsg(m)
}
func (h *threadSafeOpenReverseTunnelClient) Recv() (*tunnelpb.ClientToServer, error) {
h.recvMu.Lock()
defer h.recvMu.Unlock()
return h.TunnelService_OpenReverseTunnelClient.Recv()
}
func (h *threadSafeOpenReverseTunnelClient) RecvMsg(m interface{}) error {
h.recvMu.Lock()
defer h.recvMu.Unlock()
return h.TunnelService_OpenReverseTunnelClient.RecvMsg(m)
}
type threadSafeOpenTunnelServer struct {
sendMu sync.Mutex
recvMu sync.Mutex
tunnelpb.TunnelService_OpenTunnelServer
}
func (h *threadSafeOpenTunnelServer) Send(msg *tunnelpb.ServerToClient) error {
h.sendMu.Lock()
defer h.sendMu.Unlock()
return h.TunnelService_OpenTunnelServer.Send(msg)
}
func (h *threadSafeOpenTunnelServer) SendMsg(m interface{}) error {
h.sendMu.Lock()
defer h.sendMu.Unlock()
return h.TunnelService_OpenTunnelServer.SendMsg(m)
}
func (h *threadSafeOpenTunnelServer) Recv() (*tunnelpb.ClientToServer, error) {
h.recvMu.Lock()
defer h.recvMu.Unlock()
return h.TunnelService_OpenTunnelServer.Recv()
}
func (h *threadSafeOpenTunnelServer) RecvMsg(m interface{}) error {
h.recvMu.Lock()
defer h.recvMu.Unlock()
return h.TunnelService_OpenTunnelServer.RecvMsg(m)
}