Skip to content

Commit

Permalink
use object pools instead of ThreadLocals; closes #197
Browse files Browse the repository at this point in the history
  • Loading branch information
unixoid committed May 27, 2018
1 parent 314fca7 commit 4643d7a
Show file tree
Hide file tree
Showing 22 changed files with 332 additions and 150 deletions.
4 changes: 4 additions & 0 deletions commons/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy</artifactId>
</dependency>
<dependency>
<groupId>org.vibur</groupId>
<artifactId>vibur-object-pool</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openehealth.ipf.commons.core;

import groovy.lang.Closure;
import lombok.extern.slf4j.Slf4j;
import org.vibur.objectpool.ConcurrentPool;
import org.vibur.objectpool.PoolObjectFactory;
import org.vibur.objectpool.PoolService;
import org.vibur.objectpool.util.ConcurrentLinkedQueueCollection;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import java.util.function.Function;

/**
* Pool for DOM document builders (which are not thread-safe).
*
* @author Dmytro Rud
* @since 3.5.1
*/
@Slf4j
public class DomBuildersPool {
public static final String POOL_SIZE_PROPERTY = DomBuildersPool.class.getName() + ".POOLSIZE";
private static final int DEFAULT_POOL_SIZE = 100;

private DomBuildersPool() {
throw new IllegalStateException("Utility class, not instantiable");
}

private static final PoolService<DocumentBuilder> POOL;
static {
int poolSize = Integer.getInteger(POOL_SIZE_PROPERTY, -1);
POOL = new ConcurrentPool<>(new ConcurrentLinkedQueueCollection<>(), new DocumentBuilderPoolFactory(),
0, (poolSize > 0) ? poolSize : DEFAULT_POOL_SIZE, false);

}


/**
* Returns a document builder instance.
*/
public static DocumentBuilder take() {
return POOL.take();
}

/**
* Returns a document builder (previously gained via {@link #take()}) to the pool.
* This method MUST be called as soon as the use of the document builder is finished.
*
* @param documentBuilder document builder, <code>null</code> values are safe.
*/
public static void restore (DocumentBuilder documentBuilder) {
if (documentBuilder != null) {
documentBuilder.reset();
POOL.restore(documentBuilder);
log.debug("Returned document builder {} to the pool", documentBuilder);
}
}

/**
* Takes a document builder object from the pool, uses it to execute the given operation,
* and returns it to the pool.
*
* @param operation operation to execute using a document builder
* @param <R> operation return type
* @return result of the execution of the operation
*/
public static <R> R use(Function<DocumentBuilder, R> operation) {
DocumentBuilder builder = null;
try {
builder = take();
return operation.apply(builder);
} finally {
restore(builder);
}
}

/**
* Takes a document builder object from the pool, uses it to execute the given Groovy closure,
* and returns it to the pool.
*
* @param closure closure to execute using a document builder
* @param <R> closure return type
* @return result of the execution of the closure
*/
public static <R> R use(Closure<R> closure) {
return use(closure::call);
}


private static class DocumentBuilderPoolFactory implements PoolObjectFactory<DocumentBuilder> {
@Override
public DocumentBuilder create() {
try {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setNamespaceAware(true);
DocumentBuilder documentBuilder = factory.newDocumentBuilder();
log.debug("Created a new document builder {}", documentBuilder);
return documentBuilder;
} catch (ParserConfigurationException e) {
throw new RuntimeException(e);
}
}

@Override
public boolean readyToTake(DocumentBuilder obj) {
return true;
}

@Override
public boolean readyToRestore(DocumentBuilder obj) {
return true;
}

@Override
public void destroy(DocumentBuilder obj) {
// nop
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
/**
* Thread local for DOM document builders.
* @author Dmytro Rud
*
* @deprecated since IPF 3.5.1, use {@link DomBuildersPool} instead.
*/
@Deprecated
public class DomBuildersThreadLocal extends ThreadLocal<DocumentBuilder> {

public DocumentBuilder initialValue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*/
public class RequestDetailProvider extends InterceptorAdapter {

private static ThreadLocal<RequestDetails> requestDetails = new ThreadLocal<>();
private static final ThreadLocal<RequestDetails> requestDetails = new ThreadLocal<>();

public static RequestDetails getRequestDetails() {
return requestDetails.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.openehealth.ipf.commons.ihe.core.atna.AuditStrategy;
import org.openehealth.ipf.commons.ihe.hl7v3.audit.Hl7v3AuditDataset;
import org.openehealth.ipf.commons.ihe.ws.JaxWsRequestClientFactory;
import org.openehealth.ipf.commons.ihe.ws.WsSecurityInformation;
import org.openehealth.ipf.commons.ihe.ws.correlation.AsynchronyCorrelator;
import org.openehealth.ipf.commons.ihe.ws.cxf.databinding.plainxml.PlainXmlDataBinding;
import org.openehealth.ipf.commons.ihe.ws.cxf.payload.InNamespaceMergeInterceptor;
Expand Down Expand Up @@ -60,10 +61,11 @@ public Hl7v3ClientFactory(
InterceptorProvider customInterceptors,
List<AbstractFeature> features,
Map<String, Object> properties,
AsynchronyCorrelator<Hl7v3AuditDataset> correlator)
AsynchronyCorrelator<Hl7v3AuditDataset> correlator,
WsSecurityInformation securityInformation)
{
super(wsTransactionConfiguration, serviceUrl, auditStrategy, auditContext,
customInterceptors, features, properties, correlator);
customInterceptors, features, properties, correlator, securityInformation);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.openehealth.ipf.commons.ihe.core.atna.AuditStrategy;
import org.openehealth.ipf.commons.ihe.hl7v3.audit.Hl7v3AuditDataset;
import org.openehealth.ipf.commons.ihe.ws.JaxWsClientFactory;
import org.openehealth.ipf.commons.ihe.ws.WsSecurityInformation;
import org.openehealth.ipf.commons.ihe.ws.cxf.audit.AuditResponseInterceptor;
import org.openehealth.ipf.commons.ihe.ws.cxf.databinding.plainxml.PlainXmlDataBinding;

Expand All @@ -42,10 +43,11 @@ public Hl7v3DeferredResponderFactory(
AuditContext auditContext,
InterceptorProvider customInterceptors,
List<AbstractFeature> features,
Map<String, Object> properties)
Map<String, Object> properties,
WsSecurityInformation securityInformation)
{
super(wsTransactionConfiguration, serviceUrl, auditStrategy, auditContext,
customInterceptors, features, properties, null);
customInterceptors, features, properties, null, securityInformation);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
import org.openehealth.ipf.commons.ihe.ws.utils.SoapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.vibur.objectpool.ConcurrentPool;
import org.vibur.objectpool.PoolObjectFactory;
import org.vibur.objectpool.PoolService;
import org.vibur.objectpool.util.ConcurrentLinkedQueueCollection;

import javax.xml.namespace.QName;
import javax.xml.ws.Binding;
Expand All @@ -44,7 +48,6 @@
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

import static java.util.Objects.requireNonNull;

Expand All @@ -56,7 +59,10 @@
public class JaxWsClientFactory<AuditDatasetType extends WsAuditDataset> {
private static final Logger LOG = LoggerFactory.getLogger(JaxWsClientFactory.class);

protected final ThreadLocal<Object> threadLocalPort = new ThreadLocal<>();
public static final String POOL_SIZE_PROPERTY = JaxWsClientFactory.class.getName() + ".POOLSIZE";
private static final int DEFAULT_POOL_SIZE = 100;

protected final PoolService<Object> clientPool;
protected final WsTransactionConfiguration<AuditDatasetType> wsTransactionConfiguration;
protected final String serviceUrl;
protected final InterceptorProvider customInterceptors;
Expand All @@ -65,6 +71,7 @@ public class JaxWsClientFactory<AuditDatasetType extends WsAuditDataset> {
protected final AuditStrategy<AuditDatasetType> auditStrategy;
protected final AuditContext auditContext;
protected final AsynchronyCorrelator<AuditDatasetType> correlator;
protected final WsSecurityInformation securityInformation;

/**
* Constructs the factory.
Expand All @@ -83,7 +90,9 @@ public JaxWsClientFactory(
InterceptorProvider customInterceptors,
List<AbstractFeature> features,
Map<String, Object> properties,
AsynchronyCorrelator<AuditDatasetType> correlator) {
AsynchronyCorrelator<AuditDatasetType> correlator,
WsSecurityInformation securityInformation)
{
requireNonNull(wsTransactionConfiguration, "wsTransactionConfiguration");
this.wsTransactionConfiguration = wsTransactionConfiguration;
this.serviceUrl = serviceUrl;
Expand All @@ -93,35 +102,20 @@ public JaxWsClientFactory(
this.features = features;
this.properties = properties;
this.correlator = correlator;
this.securityInformation = securityInformation;

int poolSize = Integer.getInteger(POOL_SIZE_PROPERTY, -1);
clientPool = new ConcurrentPool<>(new ConcurrentLinkedQueueCollection<>(), new PortFactory(),
0, (poolSize > 0) ? poolSize : DEFAULT_POOL_SIZE, false);
}

/**
* Returns a client stub for the web-service.
*
* @param securityInformationSupplier Conduit-related security information or null if no security shall be set
* @return the client stub
*/
public synchronized Object getClient(Supplier<WsSecurityInformation> securityInformationSupplier) {
if (threadLocalPort.get() == null) {
URL wsdlURL = getClass().getClassLoader().getResource(wsTransactionConfiguration.getWsdlLocation());
Service service = Service.create(wsdlURL, wsTransactionConfiguration.getServiceName());
Object port = service.getPort(wsTransactionConfiguration.getSei());
Client client = ClientProxy.getClient(port);
configureBinding(port);
configureInterceptors(client);
configureProperties(client);
WsSecurityInformation securityInformation = securityInformationSupplier.get();
if (securityInformation != null) {
securityInformation.configureHttpConduit((HTTPConduit) client.getConduit());
}
threadLocalPort.set(port);
LOG.debug("Created client adapter for: {}", wsTransactionConfiguration.getServiceName());
}
return threadLocalPort.get();
}

public synchronized Object getClient() {
return getClient(() -> null);
return clientPool.take();
}

/**
Expand Down Expand Up @@ -204,4 +198,52 @@ private void configureBinding(Object port) {
SOAPBinding soapBinding = (SOAPBinding) binding;
soapBinding.setMTOMEnabled(wsTransactionConfiguration.isMtom());
}

/**
* Returns a client stub (previously gained via {@link #getClient()}) to the pool.
* This method MUST be called as soon as the use of the port stub is finished.
*
* @param client client stub, <code>null</code> values are safe.
*/
public void restoreClient(Object client) {
if (client != null) {
clientPool.restore(client);
LOG.debug("Returned client stub {} to the pool", client);
}
}


class PortFactory implements PoolObjectFactory<Object> {
@Override
public Object create() {
URL wsdlURL = getClass().getClassLoader().getResource(wsTransactionConfiguration.getWsdlLocation());
Service service = Service.create(wsdlURL, wsTransactionConfiguration.getServiceName());
Object port = service.getPort(wsTransactionConfiguration.getSei());
Client client = ClientProxy.getClient(port);
configureBinding(port);
configureInterceptors(client);
configureProperties(client);
if (securityInformation != null) {
securityInformation.configureHttpConduit((HTTPConduit) client.getConduit());
}
LOG.debug("Created client stub {} for {}", port, wsTransactionConfiguration.getServiceName());
return port;
}

@Override
public boolean readyToTake(Object o) {
return true;
}

@Override
public boolean readyToRestore(Object o) {
return true;
}

@Override
public void destroy(Object o) {
// nop
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ public JaxWsRequestClientFactory(
InterceptorProvider customInterceptors,
List<AbstractFeature> features,
Map<String, Object> properties,
AsynchronyCorrelator<AuditDatasetType> correlator)
AsynchronyCorrelator<AuditDatasetType> correlator,
WsSecurityInformation securityInformation)
{
super(wsTransactionConfiguration, serviceUrl, auditStrategy, auditContext,
customInterceptors, features, properties, correlator);
customInterceptors, features, properties, correlator, securityInformation);
}

@Override
Expand Down
7 changes: 7 additions & 0 deletions dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<spring-version>4.3.14.RELEASE</spring-version>
<spring-boot-version>1.5.10.RELEASE</spring-boot-version>
<vertx-version>3.5.0</vertx-version>
<vibur-version>22.2</vibur-version>
<xmlsec-version>2.1.0</xmlsec-version>

<github.global.server>github</github.global.server>
Expand Down Expand Up @@ -718,6 +719,12 @@
<version>${vertx-version}</version>
</dependency>

<dependency>
<groupId>org.vibur</groupId>
<artifactId>vibur-object-pool</artifactId>
<version>${vibur-version}</version>
</dependency>

<!-- All IPF dependencies -->

<dependency>
Expand Down
Loading

0 comments on commit 4643d7a

Please sign in to comment.