From 3bd58be4f9351d9340549f82d3bbf343e2414528 Mon Sep 17 00:00:00 2001 From: Raj Sarkapally Date: Mon, 4 Jun 2018 11:17:52 -0700 Subject: [PATCH 1/3] TSDB read load balancing --- ArgusCore/pom.xml | 2 +- .../service/tsdb/AbstractTSDBService.java | 42 ++++++++++++++++--- 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/ArgusCore/pom.xml b/ArgusCore/pom.xml index 9fe5bb767..3bba585d6 100644 --- a/ArgusCore/pom.xml +++ b/ArgusCore/pom.xml @@ -189,7 +189,7 @@ org.apache.httpcomponents httpcore - 4.4 + 4.4.9 com.fasterxml.jackson.core diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java index 5f8135662..4cf7f2440 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java @@ -54,7 +54,9 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.http.ConnectionReuseStrategy; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.HttpResponse; @@ -65,11 +67,13 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.conn.routing.HttpRoute; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.protocol.HttpContext; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,6 +152,8 @@ public AbstractTSDBService(SystemConfiguration config, MonitorService monitorSer Property.TSD_ENDPOINT_CONNECTION_TIMEOUT.getDefaultValue())); int socketTimeout = Integer.parseInt(config.getValue(Property.TSD_ENDPOINT_SOCKET_TIMEOUT.getName(), Property.TSD_ENDPOINT_SOCKET_TIMEOUT.getDefaultValue())); + int tsdbConnectionReuseCount=Integer.parseInt(config.getValue(Property.TSDB_READ_CONNECTION_REUSE_COUNT.getName(), + Property.TSDB_READ_CONNECTION_REUSE_COUNT.getDefaultValue())); _readEndPoints = Arrays.asList(config.getValue(Property.TSD_ENDPOINT_READ.getName(), Property.TSD_ENDPOINT_READ.getDefaultValue()).split(",")); @@ -176,16 +182,16 @@ public AbstractTSDBService(SystemConfiguration config, MonitorService monitorSer try { int index = 0; for (String readEndpoint : _readEndPoints) { - _readPortMap.put(readEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,readEndpoint)); + _readPortMap.put(readEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,tsdbConnectionReuseCount ,readEndpoint)); _readBackupEndPointsMap.put(readEndpoint, _readBackupEndPoints.get(index)); index ++; } for (String readBackupEndpoint : _readBackupEndPoints) { if (!readBackupEndpoint.isEmpty()) - _readPortMap.put(readBackupEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,readBackupEndpoint)); + _readPortMap.put(readBackupEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,tsdbConnectionReuseCount, readBackupEndpoint)); } - _writeHttpClient = getClient(connCount / 2, connTimeout, socketTimeout, _writeEndpoints); + _writeHttpClient = getClient(connCount / 2, connTimeout, socketTimeout,tsdbConnectionReuseCount, _writeEndpoints); _roundRobinIterator = Iterables.cycle(_writeEndpoints).iterator(); _executorService = Executors.newFixedThreadPool(connCount); @@ -386,7 +392,7 @@ private void put(List objects, String endpoint, HttpMethod method) throws } /* Helper to create the read and write clients. */ - protected CloseableHttpClient getClient(int connCount, int connTimeout, int socketTimeout, String...endpoints) throws MalformedURLException { + protected CloseableHttpClient getClient(int connCount, int connTimeout, int socketTimeout, int connectionReuseCount, String...endpoints) throws MalformedURLException { PoolingHttpClientConnectionManager connMgr = new PoolingHttpClientConnectionManager(); connMgr.setMaxTotal(connCount); @@ -401,7 +407,7 @@ protected CloseableHttpClient getClient(int connCount, int connTimeout, int sock RequestConfig reqConfig = RequestConfig.custom().setConnectionRequestTimeout(connTimeout).setConnectTimeout(connTimeout).setSocketTimeout( socketTimeout).build(); - return HttpClients.custom().setConnectionManager(connMgr).setDefaultRequestConfig(reqConfig).build(); + return HttpClients.custom().setConnectionManager(connMgr).setConnectionReuseStrategy(new TSDBReadConnectionReuseStrategy(connectionReuseCount)).setDefaultRequestConfig(reqConfig).build(); } /* Converts a list of annotations into a list of annotation wrappers for use in serialization. Resulting list is sorted by target annotation @@ -623,7 +629,8 @@ public enum Property { TSD_CONNECTION_COUNT("service.property.tsdb.connection.count", "2"), TSD_RETRY_COUNT("service.property.tsdb.retry.count", "3"), /** The TSDB backup read endpoint. */ - TSD_ENDPOINT_BACKUP_READ("service.property.tsdb.endpoint.backup.read", "http://localhost:4466,http://localhost:4467"); + TSD_ENDPOINT_BACKUP_READ("service.property.tsdb.endpoint.backup.read", "http://localhost:4466,http://localhost:4467"), + TSDB_READ_CONNECTION_REUSE_COUNT("service.property.tsdb.read.connection.reuse.count", "100"); private final String _name; private final String _defaultValue; @@ -833,5 +840,28 @@ public Map> getMetrics(List queries) { public List getAnnotations(List queries) { throw new UnsupportedOperationException("This method should be overriden by a specific implementation."); } + /** + * Used to close http connections after reusing the same connection for certain number of times + * @author rsarkapally + * + */ + class TSDBReadConnectionReuseStrategy implements ConnectionReuseStrategy{ + int connectionReuseCount; + AtomicInteger numOfTimesReused = new AtomicInteger(1); + public TSDBReadConnectionReuseStrategy(int connectionReuseCount) { + this.connectionReuseCount=connectionReuseCount; + } + + @Override + public boolean keepAlive(HttpResponse response, HttpContext context) { + HttpClientContext httpContext = (HttpClientContext) context; + _logger.debug("http connection {} reused for {} times", httpContext.getConnection(), httpContext.getConnection().getMetrics().getRequestCount()); + if (numOfTimesReused.getAndIncrement() % connectionReuseCount == 0) { + numOfTimesReused.set(1); + return false; + } + return true; + } + } } /* Copyright (c) 2016, Salesforce.com, Inc. All rights reserved. */ From 46f4bcc31914369823c066fbd98fedbf4605a95c Mon Sep 17 00:00:00 2001 From: Raj Sarkapally Date: Tue, 5 Jun 2018 14:52:03 -0700 Subject: [PATCH 2/3] Increase default connection reuse count --- .../service/tsdb/AbstractTSDBService.java | 57 ++++++++++--------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java index 4cf7f2440..d852e02b9 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java @@ -110,7 +110,7 @@ public class AbstractTSDBService extends DefaultService implements TSDBService { //~ Instance fields ****************************************************************************************************************************** private final ObjectMapper _mapper; protected final Logger _logger = LoggerFactory.getLogger(getClass()); - + private final String[] _writeEndpoints; protected final CloseableHttpClient _writeHttpClient; @@ -170,7 +170,7 @@ public AbstractTSDBService(SystemConfiguration config, MonitorService monitorSer _writeEndpoints = config.getValue(Property.TSD_ENDPOINT_WRITE.getName(), Property.TSD_ENDPOINT_WRITE.getDefaultValue()).split(","); RETRY_COUNT = Integer.parseInt(config.getValue(Property.TSD_RETRY_COUNT.getName(), - Property.TSD_RETRY_COUNT.getDefaultValue())); + Property.TSD_RETRY_COUNT.getDefaultValue())); for(String writeEndpoint : _writeEndpoints) { requireArgument((writeEndpoint != null) && (!writeEndpoint.isEmpty()), "Illegal write endpoint URL."); @@ -190,7 +190,7 @@ public AbstractTSDBService(SystemConfiguration config, MonitorService monitorSer if (!readBackupEndpoint.isEmpty()) _readPortMap.put(readBackupEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,tsdbConnectionReuseCount, readBackupEndpoint)); } - + _writeHttpClient = getClient(connCount / 2, connTimeout, socketTimeout,tsdbConnectionReuseCount, _writeEndpoints); _roundRobinIterator = Iterables.cycle(_writeEndpoints).iterator(); @@ -324,21 +324,21 @@ public void putMetrics(List metrics) { } } - public void _retry(List objects, Iterator endPointIterator) { - for(int i=0;i void _retry(List objects, Iterator endPointIterator) { + for(int i=0;i annotations) { put(wrappers, endpoint + "/api/annotation/bulk", HttpMethod.POST); } catch(IOException ex) { _logger.warn("IOException while trying to push annotations", ex); - _retry(wrappers, _roundRobinIterator); + _retry(wrappers, _roundRobinIterator); } } } @@ -369,7 +369,7 @@ private ObjectMapper getMapper() { mapper.registerModule(module); return mapper; } - + /* Writes objects in chunks. */ private void put(List objects, String endpoint, HttpMethod method) throws IOException { if (objects != null) { @@ -406,8 +406,11 @@ protected CloseableHttpClient getClient(int connCount, int connTimeout, int sock RequestConfig reqConfig = RequestConfig.custom().setConnectionRequestTimeout(connTimeout).setConnectTimeout(connTimeout).setSocketTimeout( socketTimeout).build(); - - return HttpClients.custom().setConnectionManager(connMgr).setConnectionReuseStrategy(new TSDBReadConnectionReuseStrategy(connectionReuseCount)).setDefaultRequestConfig(reqConfig).build(); + if(connectionReuseCount>0) { + return HttpClients.custom().setConnectionManager(connMgr).setConnectionReuseStrategy(new TSDBReadConnectionReuseStrategy(connectionReuseCount)).setDefaultRequestConfig(reqConfig).build(); + }else { + return HttpClients.custom().setConnectionManager(connMgr).setDefaultRequestConfig(reqConfig).build(); + } } /* Converts a list of annotations into a list of annotation wrappers for use in serialization. Resulting list is sorted by target annotation @@ -497,7 +500,7 @@ protected T toEntity(String content, TypeReference type) { throw new SystemException(ex); } } - + /* Helper method to convert a Java entity to a JSON string. */ protected String fromEntity(T type) { try { @@ -529,7 +532,7 @@ protected String extractResponse(HttpResponse response) { /* Execute a request given by type requestType. */ protected HttpResponse executeHttpRequest(HttpMethod requestType, String url, CloseableHttpClient client, StringEntity entity) throws IOException { - + HttpResponse httpResponse = null; if (entity != null) { @@ -630,7 +633,7 @@ public enum Property { TSD_RETRY_COUNT("service.property.tsdb.retry.count", "3"), /** The TSDB backup read endpoint. */ TSD_ENDPOINT_BACKUP_READ("service.property.tsdb.endpoint.backup.read", "http://localhost:4466,http://localhost:4467"), - TSDB_READ_CONNECTION_REUSE_COUNT("service.property.tsdb.read.connection.reuse.count", "100"); + TSDB_READ_CONNECTION_REUSE_COUNT("service.property.tsdb.read.connection.reuse.count", "2000"); private final String _name; private final String _defaultValue; @@ -851,11 +854,11 @@ class TSDBReadConnectionReuseStrategy implements ConnectionReuseStrategy{ public TSDBReadConnectionReuseStrategy(int connectionReuseCount) { this.connectionReuseCount=connectionReuseCount; } - + @Override public boolean keepAlive(HttpResponse response, HttpContext context) { HttpClientContext httpContext = (HttpClientContext) context; - _logger.debug("http connection {} reused for {} times", httpContext.getConnection(), httpContext.getConnection().getMetrics().getRequestCount()); + _logger.error("http connection {} reused for {} times", httpContext.getConnection(), httpContext.getConnection().getMetrics().getRequestCount()); if (numOfTimesReused.getAndIncrement() % connectionReuseCount == 0) { numOfTimesReused.set(1); return false; From 3d2fc963ebe2e9a1675320179824269bb3c9852d Mon Sep 17 00:00:00 2001 From: Raj Sarkapally Date: Tue, 5 Jun 2018 14:55:12 -0700 Subject: [PATCH 3/3] set connection reuse log to debug --- .../salesforce/dva/argus/service/tsdb/AbstractTSDBService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java index d852e02b9..cc0478aa6 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java @@ -858,7 +858,7 @@ public TSDBReadConnectionReuseStrategy(int connectionReuseCount) { @Override public boolean keepAlive(HttpResponse response, HttpContext context) { HttpClientContext httpContext = (HttpClientContext) context; - _logger.error("http connection {} reused for {} times", httpContext.getConnection(), httpContext.getConnection().getMetrics().getRequestCount()); + _logger.debug("http connection {} reused for {} times", httpContext.getConnection(), httpContext.getConnection().getMetrics().getRequestCount()); if (numOfTimesReused.getAndIncrement() % connectionReuseCount == 0) { numOfTimesReused.set(1); return false;