-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathVulnAbstractHttpServerTransport.java
390 lines (347 loc) · 18 KB
/
VulnAbstractHttpServerTransport.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntSet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.transport.PortsRange;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BindTransportException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.CancelledKeyException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_BIND_HOST;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PORT;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_HOST;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_PORT;
public abstract class AbstractHttpServerTransport extends AbstractLifecycleComponent implements HttpServerTransport {
private static final Logger logger = LogManager.getLogger(AbstractHttpServerTransport.class);
protected final Settings settings;
public final HttpHandlingSettings handlingSettings;
protected final NetworkService networkService;
protected final BigArrays bigArrays;
protected final ThreadPool threadPool;
protected final Dispatcher dispatcher;
private final NamedXContentRegistry xContentRegistry;
protected final PortsRange port;
protected final ByteSizeValue maxContentLength;
private final String[] bindHosts;
private final String[] publishHosts;
private volatile BoundTransportAddress boundAddress;
private final AtomicLong totalChannelsAccepted = new AtomicLong();
private final Set<HttpChannel> httpChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<HttpServerChannel> httpServerChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
protected AbstractHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) {
super(settings);
this.settings = settings;
this.networkService = networkService;
this.bigArrays = bigArrays;
this.threadPool = threadPool;
this.xContentRegistry = xContentRegistry;
this.dispatcher = dispatcher;
this.handlingSettings = HttpHandlingSettings.fromSettings(settings);
// we can't make the network.bind_host a fallback since we already fall back to http.host hence the extra conditional here
List<String> httpBindHost = SETTING_HTTP_BIND_HOST.get(settings);
this.bindHosts = (httpBindHost.isEmpty() ? NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING.get(settings) : httpBindHost)
.toArray(Strings.EMPTY_ARRAY);
// we can't make the network.publish_host a fallback since we already fall back to http.host hence the extra conditional here
List<String> httpPublishHost = SETTING_HTTP_PUBLISH_HOST.get(settings);
this.publishHosts = (httpPublishHost.isEmpty() ? NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING.get(settings) : httpPublishHost)
.toArray(Strings.EMPTY_ARRAY);
this.port = SETTING_HTTP_PORT.get(settings);
this.maxContentLength = SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings);
}
@Override
public BoundTransportAddress boundAddress() {
return this.boundAddress;
}
@Override
public HttpInfo info() {
BoundTransportAddress boundTransportAddress = boundAddress();
if (boundTransportAddress == null) {
return null;
}
return new HttpInfo(boundTransportAddress, maxContentLength.getBytes());
}
@Override
public HttpStats stats() {
return new HttpStats(httpChannels.size(), totalChannelsAccepted.get());
}
protected void bindServer() {
// Bind and start to accept incoming connections.
InetAddress hostAddresses[];
try {
hostAddresses = networkService.resolveBindHostAddresses(bindHosts);
} catch (IOException e) {
throw new BindHttpException("Failed to resolve host [" + Arrays.toString(bindHosts) + "]", e);
}
List<TransportAddress> boundAddresses = new ArrayList<>(hostAddresses.length);
for (InetAddress address : hostAddresses) {
boundAddresses.add(bindAddress(address));
}
final InetAddress publishInetAddress;
try {
publishInetAddress = networkService.resolvePublishHostAddresses(publishHosts);
} catch (Exception e) {
throw new BindTransportException("Failed to resolve publish address", e);
}
final int publishPort = resolvePublishPort(settings, boundAddresses, publishInetAddress);
TransportAddress publishAddress = new TransportAddress(new InetSocketAddress(publishInetAddress, publishPort));
this.boundAddress = new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[0]), publishAddress);
logger.info("{}", boundAddress);
}
private TransportAddress bindAddress(final InetAddress hostAddress) {
final AtomicReference<Exception> lastException = new AtomicReference<>();
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
boolean success = port.iterate(portNumber -> {
try {
synchronized (httpServerChannels) {
HttpServerChannel httpServerChannel = bind(new InetSocketAddress(hostAddress, portNumber));
httpServerChannels.add(httpServerChannel);
boundSocket.set(httpServerChannel.getLocalAddress());
}
} catch (Exception e) {
lastException.set(e);
return false;
}
return true;
});
if (!success) {
throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get());
}
if (logger.isDebugEnabled()) {
logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get()));
}
return new TransportAddress(boundSocket.get());
}
protected abstract HttpServerChannel bind(InetSocketAddress hostAddress) throws Exception;
@Override
protected void doStop() {
synchronized (httpServerChannels) {
if (httpServerChannels.isEmpty() == false) {
try {
CloseableChannel.closeChannels(new ArrayList<>(httpServerChannels), true);
} catch (Exception e) {
logger.warn("exception while closing channels", e);
} finally {
httpServerChannels.clear();
}
}
}
try {
CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);
} catch (Exception e) {
logger.warn("unexpected exception while closing http channels", e);
}
httpChannels.clear();
stopInternal();
}
@Override
protected void doClose() {
}
/**
* Called to tear down internal resources
*/
protected abstract void stopInternal();
// package private for tests
static int resolvePublishPort(Settings settings, List<TransportAddress> boundAddresses, InetAddress publishInetAddress) {
int publishPort = SETTING_HTTP_PUBLISH_PORT.get(settings);
if (publishPort < 0) {
for (TransportAddress boundAddress : boundAddresses) {
InetAddress boundInetAddress = boundAddress.address().getAddress();
if (boundInetAddress.isAnyLocalAddress() || boundInetAddress.equals(publishInetAddress)) {
publishPort = boundAddress.getPort();
break;
}
}
}
// if no matching boundAddress found, check if there is a unique port for all bound addresses
if (publishPort < 0) {
final IntSet ports = new IntHashSet();
for (TransportAddress boundAddress : boundAddresses) {
ports.add(boundAddress.getPort());
}
if (ports.size() == 1) {
publishPort = ports.iterator().next().value;
}
}
if (publishPort < 0) {
throw new BindHttpException("Failed to auto-resolve http publish port, multiple bound addresses " + boundAddresses +
" with distinct ports and none of them matched the publish address (" + publishInetAddress + "). " +
"Please specify a unique port by setting " + SETTING_HTTP_PORT.getKey() + " or " + SETTING_HTTP_PUBLISH_PORT.getKey());
}
return publishPort;
}
protected void onException(HttpChannel channel, Exception e) {
if (lifecycle.started() == false) {
// just close and ignore - we are already stopped and just need to make sure we release all resources
CloseableChannel.closeChannel(channel);
return;
}
if (NetworkExceptionHelper.isCloseConnectionException(e)) {
logger.trace(() -> new ParameterizedMessage(
"close connection exception caught while handling client http traffic, closing connection {}", channel), e);
CloseableChannel.closeChannel(channel);
} else if (NetworkExceptionHelper.isConnectException(e)) {
logger.trace(() -> new ParameterizedMessage(
"connect exception caught while handling client http traffic, closing connection {}", channel), e);
CloseableChannel.closeChannel(channel);
} else if (e instanceof CancelledKeyException) {
logger.trace(() -> new ParameterizedMessage(
"cancelled key exception caught while handling client http traffic, closing connection {}", channel), e);
CloseableChannel.closeChannel(channel);
} else {
logger.warn(() -> new ParameterizedMessage(
"caught exception while handling client http traffic, closing connection {}", channel), e);
CloseableChannel.closeChannel(channel);
}
}
protected void onServerException(HttpServerChannel channel, Exception e) {
logger.error(new ParameterizedMessage("exception from http server channel caught on transport layer [channel={}]", channel), e);
}
/**
* Exception handler for exceptions that are not associated with a specific channel.
*
* @param exception the exception
*/
protected void onNonChannelException(Exception exception) {
String threadName = Thread.currentThread().getName();
logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", threadName), exception);
}
protected void serverAcceptedChannel(HttpChannel httpChannel) {
boolean addedOnThisCall = httpChannels.add(httpChannel);
assert addedOnThisCall : "Channel should only be added to http channel set once";
totalChannelsAccepted.incrementAndGet();
httpChannel.addCloseListener(ActionListener.wrap(() -> httpChannels.remove(httpChannel)));
logger.trace(() -> new ParameterizedMessage("Http channel accepted: {}", httpChannel));
}
/**
* This method handles an incoming http request.
*
* @param httpRequest that is incoming
* @param httpChannel that received the http request
*/
public void incomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel) {
handleIncomingRequest(httpRequest, httpChannel, null);
}
/**
* This method handles an incoming http request that has encountered an error.
*
* @param httpRequest that is incoming
* @param httpChannel that received the http request
* @param exception that was encountered
*/
public void incomingRequestError(final HttpRequest httpRequest, final HttpChannel httpChannel, final Exception exception) {
handleIncomingRequest(httpRequest, httpChannel, exception);
}
// Visible for testing
void dispatchRequest(final RestRequest restRequest, final RestChannel channel, final Throwable badRequestCause) {
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
if (badRequestCause != null) {
dispatcher.dispatchBadRequest(restRequest, channel, threadContext, badRequestCause);
} else {
dispatcher.dispatchRequest(restRequest, channel, threadContext);
}
}
}
private void handleIncomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel, final Exception exception) {
Exception badRequestCause = exception;
/*
* We want to create a REST request from the incoming request from Netty. However, creating this request could fail if there
* are incorrectly encoded parameters, or the Content-Type header is invalid. If one of these specific failures occurs, we
* attempt to create a REST request again without the input that caused the exception (e.g., we remove the Content-Type header,
* or skip decoding the parameters). Once we have a request in hand, we then dispatch the request as a bad request with the
* underlying exception that caused us to treat the request as bad.
*/
final RestRequest restRequest;
{
RestRequest innerRestRequest;
try {
innerRestRequest = RestRequest.request(xContentRegistry, httpRequest, httpChannel);
} catch (final RestRequest.ContentTypeHeaderException e) {
badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);
innerRestRequest = requestWithoutContentTypeHeader(httpRequest, httpChannel, badRequestCause);
} catch (final RestRequest.BadParameterException e) {
badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);
innerRestRequest = RestRequest.requestWithoutParameters(xContentRegistry, httpRequest, httpChannel);
}
restRequest = innerRestRequest;
}
/*
* We now want to create a channel used to send the response on. However, creating this channel can fail if there are invalid
* parameter values for any of the filter_path, human, or pretty parameters. We detect these specific failures via an
* IllegalArgumentException from the channel constructor and then attempt to create a new channel that bypasses parsing of these
* parameter values.
*/
final RestChannel channel;
{
RestChannel innerChannel;
ThreadContext threadContext = threadPool.getThreadContext();
try {
innerChannel = new DefaultRestChannel(httpChannel, httpRequest, restRequest, bigArrays, handlingSettings, threadContext);
} catch (final IllegalArgumentException e) {
badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);
final RestRequest innerRequest = RestRequest.requestWithoutParameters(xContentRegistry, httpRequest, httpChannel);
innerChannel = new DefaultRestChannel(httpChannel, httpRequest, innerRequest, bigArrays, handlingSettings, threadContext);
}
channel = innerChannel;
}
dispatchRequest(restRequest, channel, badRequestCause);
}
private RestRequest requestWithoutContentTypeHeader(HttpRequest httpRequest, HttpChannel httpChannel, Exception badRequestCause) {
HttpRequest httpRequestWithoutContentType = httpRequest.removeHeader("Content-Type");
try {
return RestRequest.request(xContentRegistry, httpRequestWithoutContentType, httpChannel);
} catch (final RestRequest.BadParameterException e) {
badRequestCause.addSuppressed(e);
return RestRequest.requestWithoutParameters(xContentRegistry, httpRequestWithoutContentType, httpChannel);
}
}
}