Skip to content

Commit

Permalink
Callbacks for Connection initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
tippl committed Apr 7, 2024
1 parent ccff406 commit 6905e83
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public void connect(
final InetAddress address = remoteAddresses[i];
final boolean last = i == remoteAddresses.length - 1;
final InetSocketAddress remoteAddress = new InetSocketAddress(address, port);
onBeforeSocketConnect(context, endpointHost);
if (LOG.isDebugEnabled()) {
LOG.debug("{} connecting {}->{} ({})", endpointHost, localAddress, remoteAddress, connectTimeout);
}
Expand Down Expand Up @@ -221,6 +222,7 @@ public void connect(
}
socket.connect(remoteAddress, TimeValue.isPositive(connectTimeout) ? connectTimeout.toMillisecondsIntBound() : 0);
conn.bind(socket);
onAfterSocketConnect(context, endpointHost);
if (LOG.isDebugEnabled()) {
LOG.debug("{} {} connected {}->{}", ConnPoolSupport.getId(conn), endpointHost,
conn.getLocalAddress(), conn.getRemoteAddress());
Expand All @@ -229,11 +231,16 @@ public void connect(
final TlsSocketStrategy tlsSocketStrategy = tlsSocketStrategyLookup != null ? tlsSocketStrategyLookup.lookup(endpointHost.getSchemeName()) : null;
if (tlsSocketStrategy != null) {
final NamedEndpoint tlsName = endpointName != null ? endpointName : endpointHost;
onBeforeTlsHandshake(context, endpointHost);
if (LOG.isDebugEnabled()) {
LOG.debug("{} {} upgrading to TLS", ConnPoolSupport.getId(conn), tlsName);
}
final Socket upgradedSocket = tlsSocketStrategy.upgrade(socket, tlsName.getHostName(), tlsName.getPort(), attachment, context);
conn.bind(upgradedSocket);
onAfterTlsHandshake(context, endpointHost);
if (LOG.isDebugEnabled()) {
LOG.debug("{} {} upgraded to TLS", ConnPoolSupport.getId(conn), tlsName);
}
}
return;
} catch (final RuntimeException ex) {
Expand Down Expand Up @@ -278,14 +285,31 @@ public void upgrade(
final TlsSocketStrategy tlsSocketStrategy = tlsSocketStrategyLookup != null ? tlsSocketStrategyLookup.lookup(newProtocol) : null;
if (tlsSocketStrategy != null) {
final NamedEndpoint tlsName = endpointName != null ? endpointName : endpointHost;
onBeforeTlsHandshake(context, endpointHost);
if (LOG.isDebugEnabled()) {
LOG.debug("{} upgrading to TLS {}:{}", ConnPoolSupport.getId(conn), tlsName.getHostName(), tlsName.getPort());
}
final SSLSocket upgradedSocket = tlsSocketStrategy.upgrade(socket, tlsName.getHostName(), tlsName.getPort(), attachment, context);
conn.bind(upgradedSocket);
onAfterTlsHandshake(context, endpointHost);
if (LOG.isDebugEnabled()) {
LOG.debug("{} upgraded to TLS {}:{}", ConnPoolSupport.getId(conn), tlsName.getHostName(), tlsName.getPort());
}
} else {
throw new UnsupportedSchemeException(newProtocol + " protocol is not supported");
}
}

protected void onBeforeSocketConnect(final HttpContext httpContext, final HttpHost endpointHost) {
}

protected void onAfterSocketConnect(final HttpContext httpContext, final HttpHost endpointHost) {
}

protected void onBeforeTlsHandshake(final HttpContext httpContext, final HttpHost endpointHost) {
}

protected void onAfterTlsHandshake(final HttpContext httpContext, final HttpHost endpointHost) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public PoolingHttpClientConnectionManager(
}

@Internal
protected PoolingHttpClientConnectionManager(
public PoolingHttpClientConnectionManager(
final HttpClientConnectionOperator httpClientConnectionOperator,
final PoolConcurrencyPolicy poolConcurrencyPolicy,
final PoolReusePolicy poolReusePolicy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,26 +75,26 @@
*/
public class PoolingHttpClientConnectionManagerBuilder {

private HttpConnectionFactory<ManagedHttpClientConnection> connectionFactory;
private TlsSocketStrategy tlsSocketStrategy;
private SchemePortResolver schemePortResolver;
private DnsResolver dnsResolver;
private PoolConcurrencyPolicy poolConcurrencyPolicy;
private PoolReusePolicy poolReusePolicy;
private Resolver<HttpRoute, SocketConfig> socketConfigResolver;
private Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver;
private Resolver<HttpHost, TlsConfig> tlsConfigResolver;
protected HttpConnectionFactory<ManagedHttpClientConnection> connectionFactory;
protected TlsSocketStrategy tlsSocketStrategy;
protected SchemePortResolver schemePortResolver;
protected DnsResolver dnsResolver;
protected PoolConcurrencyPolicy poolConcurrencyPolicy;
protected PoolReusePolicy poolReusePolicy;
protected Resolver<HttpRoute, SocketConfig> socketConfigResolver;
protected Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver;
protected Resolver<HttpHost, TlsConfig> tlsConfigResolver;

private boolean systemProperties;
protected boolean systemProperties;

private int maxConnTotal;
private int maxConnPerRoute;
protected int maxConnTotal;
protected int maxConnPerRoute;

public static PoolingHttpClientConnectionManagerBuilder create() {
return new PoolingHttpClientConnectionManagerBuilder();
}

PoolingHttpClientConnectionManagerBuilder() {
protected PoolingHttpClientConnectionManagerBuilder() {
super();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
import org.apache.hc.client5.http.routing.RoutingSupport;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.CallbackContribution;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
Expand All @@ -59,7 +60,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class DefaultAsyncClientConnectionOperator implements AsyncClientConnectionOperator {
@Internal
public class DefaultAsyncClientConnectionOperator implements AsyncClientConnectionOperator {

private static final Logger LOG = LoggerFactory.getLogger(DefaultAsyncClientConnectionOperator.class);

Expand Down Expand Up @@ -105,6 +107,7 @@ public Future<ManagedAsyncClientConnection> connect(
final InetAddress remoteAddress = endpointHost.getAddress();
final TlsConfig tlsConfig = attachment instanceof TlsConfig ? (TlsConfig) attachment : TlsConfig.DEFAULT;

onBeforeSocketConnect(context, endpointHost);
if (LOG.isDebugEnabled()) {
LOG.debug("{} connecting {}->{} ({})", endpointHost, localAddress, remoteAddress, connectTimeout);
}
Expand All @@ -121,6 +124,7 @@ public Future<ManagedAsyncClientConnection> connect(
@Override
public void completed(final IOSession session) {
final DefaultManagedAsyncClientConnection connection = new DefaultManagedAsyncClientConnection(session);
onAfterSocketConnect(context, endpointHost);
if (LOG.isDebugEnabled()) {
LOG.debug("{} {} connected {}->{}", ConnPoolSupport.getId(connection), endpointHost,
connection.getLocalAddress(), connection.getRemoteAddress());
Expand All @@ -131,6 +135,7 @@ public void completed(final IOSession session) {
final Timeout socketTimeout = connection.getSocketTimeout();
final Timeout handshakeTimeout = tlsConfig.getHandshakeTimeout();
final NamedEndpoint tlsName = endpointName != null ? endpointName : endpointHost;
onBeforeTlsHandshake(context, endpointHost);
if (LOG.isDebugEnabled()) {
LOG.debug("{} {} upgrading to TLS", ConnPoolSupport.getId(connection), tlsName);
}
Expand All @@ -145,6 +150,10 @@ public void completed(final IOSession session) {
public void completed(final TransportSecurityLayer transportSecurityLayer) {
connection.setSocketTimeout(socketTimeout);
future.completed(connection);
onAfterTlsHandshake(context, endpointHost);
if (LOG.isDebugEnabled()) {
LOG.debug("{} {} upgraded to TLS", ConnPoolSupport.getId(connection), tlsName);
}
}

});
Expand Down Expand Up @@ -214,4 +223,16 @@ public void completed(final TransportSecurityLayer transportSecurityLayer) {
}
}

protected void onBeforeSocketConnect(final HttpContext httpContext, final HttpHost endpointHost) {
}

protected void onAfterSocketConnect(final HttpContext httpContext, final HttpHost endpointHost) {
}

protected void onBeforeTlsHandshake(final HttpContext httpContext, final HttpHost endpointHost) {
}

protected void onAfterTlsHandshake(final HttpContext httpContext, final HttpHost endpointHost) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public PoolingAsyncClientConnectionManager(
}

@Internal
protected PoolingAsyncClientConnectionManager(
public PoolingAsyncClientConnectionManager(
final AsyncClientConnectionOperator connectionOperator,
final PoolConcurrencyPolicy poolConcurrencyPolicy,
final PoolReusePolicy poolReusePolicy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,25 @@
*/
public class PoolingAsyncClientConnectionManagerBuilder {

private TlsStrategy tlsStrategy;
private SchemePortResolver schemePortResolver;
private DnsResolver dnsResolver;
private PoolConcurrencyPolicy poolConcurrencyPolicy;
private PoolReusePolicy poolReusePolicy;
protected TlsStrategy tlsStrategy;
protected SchemePortResolver schemePortResolver;
protected DnsResolver dnsResolver;
protected PoolConcurrencyPolicy poolConcurrencyPolicy;
protected PoolReusePolicy poolReusePolicy;

private boolean systemProperties;
protected boolean systemProperties;

private int maxConnTotal;
private int maxConnPerRoute;
protected int maxConnTotal;
protected int maxConnPerRoute;

private Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver;
private Resolver<HttpHost, TlsConfig> tlsConfigResolver;
protected Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver;
protected Resolver<HttpHost, TlsConfig> tlsConfigResolver;

public static PoolingAsyncClientConnectionManagerBuilder create() {
return new PoolingAsyncClientConnectionManagerBuilder();
}

PoolingAsyncClientConnectionManagerBuilder() {
protected PoolingAsyncClientConnectionManagerBuilder() {
super();
}

Expand Down

0 comments on commit 6905e83

Please sign in to comment.