forked from kedacore/http-add-on
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathproxy_handlers.go
134 lines (127 loc) · 4.42 KB
/
proxy_handlers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package main
import (
"context"
"fmt"
"net/http"
"net/url"
"strings"
"time"
"github.com/go-logr/logr"
"github.com/kedacore/http-add-on/interceptor/config"
kedanet "github.com/kedacore/http-add-on/pkg/net"
"github.com/kedacore/http-add-on/pkg/routing"
)
type forwardingConfig struct {
waitTimeout time.Duration
respHeaderTimeout time.Duration
forceAttemptHTTP2 bool
maxIdleConns int
idleConnTimeout time.Duration
tlsHandshakeTimeout time.Duration
expectContinueTimeout time.Duration
serviceUnavailableRetry int
}
func newForwardingConfigFromTimeouts(t *config.Timeouts) forwardingConfig {
return forwardingConfig{
waitTimeout: t.DeploymentReplicas,
respHeaderTimeout: t.ResponseHeader,
forceAttemptHTTP2: t.ForceHTTP2,
maxIdleConns: t.MaxIdleConns,
idleConnTimeout: t.IdleConnTimeout,
tlsHandshakeTimeout: t.TLSHandshakeTimeout,
expectContinueTimeout: t.ExpectContinueTimeout,
serviceUnavailableRetry: t.ServiceUnavailableRetry,
}
}
// newForwardingHandler takes in the service URL for the app backend
// and forwards incoming requests to it. Note that it isn't multitenant.
// It's intended to be deployed and scaled alongside the application itself.
//
// fwdSvcURL must have a valid scheme in it. The best way to do this is
// create a URL with url.Parse("https://...")
func newForwardingHandler(
lggr logr.Logger,
routingTable *routing.Table,
dialCtxFunc kedanet.DialContextFunc,
waitFunc forwardWaitFunc,
targetSvcURL routing.ServiceURLFunc,
fwdCfg forwardingConfig,
) http.Handler {
roundTripper := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: dialCtxFunc,
ForceAttemptHTTP2: fwdCfg.forceAttemptHTTP2,
MaxIdleConns: fwdCfg.maxIdleConns,
IdleConnTimeout: fwdCfg.idleConnTimeout,
TLSHandshakeTimeout: fwdCfg.tlsHandshakeTimeout,
ExpectContinueTimeout: fwdCfg.expectContinueTimeout,
ResponseHeaderTimeout: fwdCfg.respHeaderTimeout,
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
host, err := getHost(r)
if err != nil {
w.WriteHeader(400)
if _, err := w.Write([]byte("Host not found in request")); err != nil {
lggr.Error(err, "could not write error response to client")
}
return
}
lggr := lggr.WithValues("host", host)
routingTarget, err := routingTable.Lookup(host)
if err != nil {
w.WriteHeader(404)
if _, err := w.Write([]byte(fmt.Sprintf("Host %s not found", r.Host))); err != nil {
lggr.Error(err, "could not send error message to client")
}
return
}
waitFuncCtx, done := context.WithTimeout(r.Context(), fwdCfg.waitTimeout)
defer done()
replicas, err := waitFunc(
waitFuncCtx,
routingTarget.Namespace,
routingTarget.Deployment,
)
if err != nil {
lggr.Error(err, "wait function failed, not forwarding request")
w.WriteHeader(502)
if _, err := w.Write([]byte(fmt.Sprintf("error on backend (%s)", err))); err != nil {
lggr.Error(err, "could not write error response to client")
}
return
}
var targetURL *url.URL
if i := strings.Index(r.Host, ":"); i != -1 {
// if the host header contains port, route to ( routingTarget.Service:port)
targetPort := r.Host[i+1:]
//targetSvcName := routingTarget.Service
targetHost := fmt.Sprintf("http://%s.%s:%s", routingTarget.Service, routingTarget.Namespace, targetPort)
if targetURL, err = url.Parse(targetHost); err != nil {
lggr.Error(err, "forwarding failed")
w.WriteHeader(500)
if _, err := w.Write([]byte(fmt.Sprintf("error parsing host:port: %s to URL", targetHost))); err != nil {
lggr.Error(err, "could not write error response to client")
}
return
}
} else {
//if host header does not contain port, route to target stored in the routing table
targetURL, err = targetSvcURL(*routingTarget)
if err != nil {
lggr.Error(err, "forwarding failed")
w.WriteHeader(500)
if _, err := w.Write([]byte("error getting backend service URL")); err != nil {
lggr.Error(err, "could not write error response to client")
}
return
}
}
isColdStart := "false"
if replicas == 0 {
isColdStart = "true"
}
w.Header().Add("X-KEDA-HTTP-Cold-Start", isColdStart)
lggr.Info("dispatching request.", "host", host, "target_url", targetURL, "isColdStart", isColdStart)
forwardRequest(lggr, w, r, roundTripper, targetURL, fwdCfg.serviceUnavailableRetry)
})
}