forked from kedacore/http-add-on
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathrequest_forwarder_test.go
290 lines (268 loc) · 8.18 KB
/
request_forwarder_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
package main
import (
"log"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
"github.com/go-logr/logr"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/kedacore/http-add-on/interceptor/config"
kedanet "github.com/kedacore/http-add-on/pkg/net"
)
func newRoundTripper(
dialCtxFunc kedanet.DialContextFunc,
httpRespHeaderTimeout time.Duration,
) http.RoundTripper {
return &http.Transport{
DialContext: dialCtxFunc,
ResponseHeaderTimeout: httpRespHeaderTimeout,
}
}
func defaultTimeouts() config.Timeouts {
return config.Timeouts{
Connect: 100 * time.Millisecond,
KeepAlive: 100 * time.Millisecond,
ResponseHeader: 500 * time.Millisecond,
DeploymentReplicas: 1 * time.Second,
}
}
// returns a kedanet.DialContextFunc by calling kedanet.DialContextWithRetry. if you pass nil for the
// timeoutConfig, it uses standard values. otherwise it uses the one you passed.
//
// the returned config.Timeouts is what was passed to the DialContextWithRetry function
func retryDialContextFunc(
timeouts config.Timeouts,
backoff wait.Backoff,
) kedanet.DialContextFunc {
dialer := kedanet.NewNetDialer(
timeouts.Connect,
timeouts.KeepAlive,
)
return kedanet.DialContextWithRetry(dialer, backoff)
}
func reqAndRes(path string) (*httptest.ResponseRecorder, *http.Request, error) {
req, err := http.NewRequest("GET", path, nil)
if err != nil {
return nil, nil, err
}
resRecorder := httptest.NewRecorder()
return resRecorder, req, nil
}
func TestForwarderSuccess(t *testing.T) {
r := require.New(t)
// this channel will be closed after the request was received, but
// before the response was sent
reqRecvCh := make(chan struct{})
const respCode = 302
const respBody = "TestForwardingHandler"
originHdl := kedanet.NewTestHTTPHandlerWrapper(
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
close(reqRecvCh)
w.WriteHeader(respCode)
_, err := w.Write([]byte(respBody))
r.NoError(err)
}),
)
testServer := httptest.NewServer(originHdl)
defer testServer.Close()
forwardURL, err := url.Parse(testServer.URL)
r.NoError(err)
const path = "/testfwd"
res, req, err := reqAndRes(path)
r.NoError(err)
timeouts := defaultTimeouts()
dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff())
forwardRequest(
logr.Discard(),
res,
req,
newRoundTripper(dialCtxFunc, timeouts.ResponseHeader),
forwardURL,
2,
)
r.True(
ensureSignalBeforeTimeout(reqRecvCh, 100*time.Millisecond),
"request was not received within %s",
100*time.Millisecond,
)
forwardedRequests := originHdl.IncomingRequests()
r.Equal(1, len(forwardedRequests), "number of requests forwarded")
forwardedRequest := forwardedRequests[0]
r.Equal(path, forwardedRequest.URL.Path)
r.Equal(
302,
res.Code,
"Proxied status code was wrong. Response body was %s",
res.Body.String(),
)
r.Equal(respBody, res.Body.String())
}
// Test to make sure that the request forwarder times out if headers aren't returned in time
func TestForwarderHeaderTimeout(t *testing.T) {
r := require.New(t)
// the origin will wait until this channel receives or is closed
originWaitCh := make(chan struct{})
hdl := kedanet.NewTestHTTPHandlerWrapper(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-originWaitCh
w.WriteHeader(200)
}),
)
srv, originURL, err := kedanet.StartTestServer(hdl)
r.NoError(err)
defer srv.Close()
timeouts := defaultTimeouts()
timeouts.Connect = 10 * time.Millisecond
timeouts.ResponseHeader = 10 * time.Millisecond
backoff := timeouts.Backoff(2, 2, 1)
dialCtxFunc := retryDialContextFunc(timeouts, backoff)
res, req, err := reqAndRes("/testfwd")
r.NoError(err)
forwardRequest(
logr.Discard(),
res,
req,
newRoundTripper(dialCtxFunc, timeouts.ResponseHeader),
originURL,
2,
)
forwardedRequests := hdl.IncomingRequests()
r.Equal(0, len(forwardedRequests))
r.Equal(502, res.Code)
r.Contains(res.Body.String(), "error on backend")
// the proxy has bailed out, so tell the origin to stop
close(originWaitCh)
}
// Test to ensure that the request forwarder waits for an origin that is slow
func TestForwarderWaitsForSlowOrigin(t *testing.T) {
r := require.New(t)
// the origin will wait until this channel receives or is closed
originWaitCh := make(chan struct{})
const originRespCode = 200
const originRespBodyStr = "Hello World!"
hdl := kedanet.NewTestHTTPHandlerWrapper(
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
<-originWaitCh
w.WriteHeader(originRespCode)
_, err := w.Write([]byte(originRespBodyStr))
r.NoError(err)
}),
)
srv, originURL, err := kedanet.StartTestServer(hdl)
r.NoError(err)
defer srv.Close()
// the origin is gonna wait this long, and we'll make the proxy
// have a much longer timeout than this to account for timing issues
const originDelay = 5 * time.Millisecond
timeouts := config.Timeouts{
Connect: originDelay,
KeepAlive: 2 * time.Second,
// the handler is going to take 500 milliseconds to respond, so make the
// forwarder wait much longer than that
ResponseHeader: originDelay * 4,
}
dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff())
go func() {
time.Sleep(originDelay)
close(originWaitCh)
}()
const path = "/testfwd"
res, req, err := reqAndRes(path)
r.NoError(err)
forwardRequest(
logr.Discard(),
res,
req,
newRoundTripper(dialCtxFunc, timeouts.ResponseHeader),
originURL,
2,
)
// wait for the goroutine above to finish, with a little cusion
ensureSignalBeforeTimeout(originWaitCh, originDelay*2)
r.Equal(originRespCode, res.Code)
r.Equal(originRespBodyStr, res.Body.String())
}
func TestForwarderConnectionRetryAndTimeout(t *testing.T) {
r := require.New(t)
noSuchURL, err := url.Parse("https://localhost:65533")
r.NoError(err)
timeouts := config.Timeouts{
Connect: 10 * time.Millisecond,
KeepAlive: 1 * time.Millisecond,
ResponseHeader: 50 * time.Millisecond,
}
dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff())
res, req, err := reqAndRes("/test")
r.NoError(err)
start := time.Now()
forwardRequest(
logr.Discard(),
res,
req,
newRoundTripper(dialCtxFunc, timeouts.ResponseHeader),
noSuchURL,
2,
)
elapsed := time.Since(start)
log.Printf("forwardRequest took %s", elapsed)
// forwardDoneSignal should close _after_ the total timeout of forwardRequest.
//
// forwardRequest uses dialCtxFunc to establish network connections, and dialCtxFunc does
// exponential backoff. It starts at 2ms (timeouts.Connect above), doubles every time, and stops after 5 tries,
// so that's 2ms + 4ms + 8ms + 16ms + 32ms, or SUM(2^N) where N is in [1, 5]
expectedForwardTimeout := kedanet.MinTotalBackoffDuration(timeouts.DefaultBackoff())
r.GreaterOrEqualf(
elapsed,
expectedForwardTimeout,
"proxy returned after %s, expected not to return until %s",
time.Since(start),
expectedForwardTimeout,
)
r.Equal(
502,
res.Code,
"unexpected code (response body was '%s')",
res.Body.String(),
)
r.Contains(res.Body.String(), "error on backend")
}
func TestForwardRequestRedirectAndHeaders(t *testing.T) {
r := require.New(t)
srv, srvURL, err := kedanet.StartTestServer(
kedanet.NewTestHTTPHandlerWrapper(
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("X-Custom-Header", "somethingcustom")
w.Header().Set("Location", "abc123.com")
w.WriteHeader(301)
_, err := w.Write([]byte("Hello from srv"))
r.NoError(err)
}),
),
)
r.NoError(err)
defer srv.Close()
timeouts := defaultTimeouts()
timeouts.Connect = 10 * time.Millisecond
timeouts.ResponseHeader = 10 * time.Millisecond
backoff := timeouts.Backoff(2, 2, 1)
dialCtxFunc := retryDialContextFunc(timeouts, backoff)
res, req, err := reqAndRes("/testfwd")
r.NoError(err)
forwardRequest(
logr.Discard(),
res,
req,
newRoundTripper(dialCtxFunc, timeouts.ResponseHeader),
srvURL,
2,
)
r.Equal(301, res.Code)
r.Equal("abc123.com", res.Header().Get("Location"))
r.Equal("text/html; charset=utf-8", res.Header().Get("Content-Type"))
r.Equal("somethingcustom", res.Header().Get("X-Custom-Header"))
r.Equal("Hello from srv", res.Body.String())
}