From 0f1f92306060ed62e27d43055e7ae05dd7c7a387 Mon Sep 17 00:00:00 2001 From: Kylin Date: Thu, 10 Jul 2014 17:29:04 +0800 Subject: [PATCH] add remoting test cocde --- .../java/org/jboss/logging/LoggerTest.java | 14 ++ logging/pom.xml | 22 +++ logging/quickstart/pom.xml | 45 +++++ .../org/jboss/logging/sample/HelloWorld.java | 20 ++ .../logging/sample/LocalSystemConfig.java | 32 ++++ .../org/jboss/logging/sample/LoggerTest.java | 16 ++ pom.xml | 2 + remoting/pom.xml | 22 +++ remoting/quickstart/pom.xml | 64 +++++++ .../sample/CharSequenceReceiver.java | 78 ++++++++ .../remoting3/sample/CharSequenceSender.java | 80 ++++++++ .../jboss/remoting3/test/ConnectionTest.java | 168 ++++++++++++++++ .../test/OutboundMessageCountTest.java | 181 ++++++++++++++++++ .../java/org/jboss/remoting3/test/Runner.java | 10 + 14 files changed, 754 insertions(+) create mode 100644 domain/domain-deployment/src/main/java/org/jboss/logging/LoggerTest.java create mode 100644 logging/pom.xml create mode 100644 logging/quickstart/pom.xml create mode 100644 logging/quickstart/src/main/java/org/jboss/logging/sample/HelloWorld.java create mode 100644 logging/quickstart/src/main/java/org/jboss/logging/sample/LocalSystemConfig.java create mode 100644 logging/quickstart/src/main/java/org/jboss/logging/sample/LoggerTest.java create mode 100644 remoting/pom.xml create mode 100644 remoting/quickstart/pom.xml create mode 100644 remoting/quickstart/src/main/java/org/jboss/remoting3/sample/CharSequenceReceiver.java create mode 100644 remoting/quickstart/src/main/java/org/jboss/remoting3/sample/CharSequenceSender.java create mode 100644 remoting/quickstart/src/main/java/org/jboss/remoting3/test/ConnectionTest.java create mode 100644 remoting/quickstart/src/main/java/org/jboss/remoting3/test/OutboundMessageCountTest.java create mode 100644 remoting/quickstart/src/main/java/org/jboss/remoting3/test/Runner.java diff --git a/domain/domain-deployment/src/main/java/org/jboss/logging/LoggerTest.java b/domain/domain-deployment/src/main/java/org/jboss/logging/LoggerTest.java new file mode 100644 index 0000000..7ce3fb3 --- /dev/null +++ b/domain/domain-deployment/src/main/java/org/jboss/logging/LoggerTest.java @@ -0,0 +1,14 @@ +package org.jboss.logging; + +import org.jboss.as.host.controller.HostControllerLogger; + +public class LoggerTest { + + public static void main(String[] args) { + + System.out.println(HostControllerLogger.class.getPackage().getName()); + + HostControllerLogger.ROOT_LOGGER.tracef("trying to reconnect to %s current-state (%s) required-state (%s)", "jo004006", "SERVER_STARTED", "SERVER_STARTED"); + } + +} diff --git a/logging/pom.xml b/logging/pom.xml new file mode 100644 index 0000000..3799c3d --- /dev/null +++ b/logging/pom.xml @@ -0,0 +1,22 @@ + + + 4.0.0 + + org.wildfly.logging + wildfly-logging + 1.0 + Parent + Parent of WildFly logging + pom + + https://github.com/kylinsoong + + + UTF-8 + + + + quickstart + + + diff --git a/logging/quickstart/pom.xml b/logging/quickstart/pom.xml new file mode 100644 index 0000000..a14bd8e --- /dev/null +++ b/logging/quickstart/pom.xml @@ -0,0 +1,45 @@ + + + 4.0.0 + + org.jboss.logging + jboss-logging-quickstart + 1.0 + jar + JBoss logging quickstart + JBoss logging quickstart + + https://github.com/kylinsoong + + + 3.1.2.GA-redhat-1 + 1.6 + 1.6 + 2.3.1 + UTF-8 + + + + + org.jboss.logging + jboss-logging + ${logging.version} + provided + + + + + ${artifactId} + + + maven-compiler-plugin + ${compiler.plugin.version} + + ${maven.compiler.source} + ${maven.compiler.target} + + + + + diff --git a/logging/quickstart/src/main/java/org/jboss/logging/sample/HelloWorld.java b/logging/quickstart/src/main/java/org/jboss/logging/sample/HelloWorld.java new file mode 100644 index 0000000..4c0bb67 --- /dev/null +++ b/logging/quickstart/src/main/java/org/jboss/logging/sample/HelloWorld.java @@ -0,0 +1,20 @@ +package org.jboss.logging.sample; + +import org.jboss.logging.Logger; + +public class HelloWorld { + + private static final Logger LOGGER = Logger.getLogger(HelloWorld.class); + + public static void main(String[] args) { + + LOGGER.trace("TRACE Message"); + LOGGER.debug("DEBUG Message"); + LOGGER.info("INFO Message"); + LOGGER.error("Error Message"); + LOGGER.fatal("FATAL Message"); + + LOGGER.error("Configuration file not found."); + } + +} diff --git a/logging/quickstart/src/main/java/org/jboss/logging/sample/LocalSystemConfig.java b/logging/quickstart/src/main/java/org/jboss/logging/sample/LocalSystemConfig.java new file mode 100644 index 0000000..3039c2e --- /dev/null +++ b/logging/quickstart/src/main/java/org/jboss/logging/sample/LocalSystemConfig.java @@ -0,0 +1,32 @@ +package org.jboss.logging.sample; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Properties; + +import org.jboss.logging.Logger; + +public class LocalSystemConfig { + + private static final Logger LOGGER = Logger.getLogger(LocalSystemConfig.class); + + public Properties openCustomProperties(String configname)throws FileNotFoundException { + Properties props = new Properties(); + try { + LOGGER.info("Loading custom configuration from " + configname); + props.load(new FileInputStream(configname)); + } catch (IOException e) { + LOGGER.error("Custom configuration file (" + configname + ") not found. Using defaults."); + throw new FileNotFoundException(configname); + } + + return props; + } + + public static void main(String[] args) throws FileNotFoundException { + + new LocalSystemConfig().openCustomProperties("XXOO"); + } + +} diff --git a/logging/quickstart/src/main/java/org/jboss/logging/sample/LoggerTest.java b/logging/quickstart/src/main/java/org/jboss/logging/sample/LoggerTest.java new file mode 100644 index 0000000..841397d --- /dev/null +++ b/logging/quickstart/src/main/java/org/jboss/logging/sample/LoggerTest.java @@ -0,0 +1,16 @@ +package org.jboss.logging.sample; + +import org.jboss.logging.Logger; + +public class LoggerTest { + + public static void main(String[] args) { + + System.setProperty("org.jboss.logging.provider", "jboss"); + + Logger logger = Logger.getLogger(LoggerTest.class, "com.kylin.test"); + + logger.info("logger test"); + } + +} diff --git a/pom.xml b/pom.xml index 00355f1..b3da678 100644 --- a/pom.xml +++ b/pom.xml @@ -25,6 +25,8 @@ undertow perf domain + remoting + logging diff --git a/remoting/pom.xml b/remoting/pom.xml new file mode 100644 index 0000000..79190ab --- /dev/null +++ b/remoting/pom.xml @@ -0,0 +1,22 @@ + + + 4.0.0 + + org.wildfly.remoting + wildfly-remoting + 1.0 + Parent + Parent of WildFly remoting + pom + + https://github.com/kylinsoong + + + UTF-8 + + + + quickstart + + + diff --git a/remoting/quickstart/pom.xml b/remoting/quickstart/pom.xml new file mode 100644 index 0000000..ce38a33 --- /dev/null +++ b/remoting/quickstart/pom.xml @@ -0,0 +1,64 @@ + + + 4.0.0 + + org.jboss.remoting3 + jboss-remoting-quickstart + 1.0 + jar + JBoss Remoting 3 quickstart + JBoss Remoting 3 quickstart + + https://github.com/kylinsoong + + + 3.0.7.GA-redhat-1 + 1.6 + 1.6 + 2.3.1 + UTF-8 + + + + + org.jboss.remoting3 + jboss-remoting + 3.2.16.GA-redhat-1 + + + + org.jboss.xnio + xnio-nio + ${xnio.version} + + + + + ${artifactId} + + + maven-compiler-plugin + ${compiler.plugin.version} + + ${maven.compiler.source} + ${maven.compiler.target} + + + + maven-jar-plugin + + + + org.jboss.remoting3.test.Runner + + + ${project.version} + ${project.artifactId} + + + + + + + diff --git a/remoting/quickstart/src/main/java/org/jboss/remoting3/sample/CharSequenceReceiver.java b/remoting/quickstart/src/main/java/org/jboss/remoting3/sample/CharSequenceReceiver.java new file mode 100644 index 0000000..b30e9f0 --- /dev/null +++ b/remoting/quickstart/src/main/java/org/jboss/remoting3/sample/CharSequenceReceiver.java @@ -0,0 +1,78 @@ +package org.jboss.remoting3.sample; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.jboss.remoting3.Channel; +import org.jboss.remoting3.Endpoint; +import org.jboss.remoting3.MessageInputStream; +import org.jboss.remoting3.OpenListener; +import org.jboss.remoting3.Remoting; +import org.jboss.remoting3.remote.RemoteConnectionProviderFactory; +import org.jboss.remoting3.security.SimpleServerAuthenticationProvider; +import org.jboss.remoting3.spi.NetworkServerProvider; +import org.xnio.IoUtils; +import org.xnio.OptionMap; +import org.xnio.Options; +import org.xnio.Sequence; +import org.xnio.channels.AcceptingChannel; +import org.xnio.channels.ConnectedStreamChannel; + +public class CharSequenceReceiver { + + private static final int THREAD_POOL_SIZE = 100; + private static final int BUFFER_SIZE = 8192; + private static byte[] buffer; + + protected final Endpoint serverEndpoint; + + private AcceptingChannel server; + + public CharSequenceReceiver() throws IOException { + serverEndpoint = Remoting.createEndpoint("connection-test-server", OptionMap.create(Options.WORKER_TASK_CORE_THREADS, THREAD_POOL_SIZE, Options.WORKER_TASK_MAX_THREADS, THREAD_POOL_SIZE)); + serverEndpoint.addConnectionProvider("remote", new RemoteConnectionProviderFactory(), OptionMap.create(Options.SSL_ENABLED, Boolean.FALSE)); + final NetworkServerProvider networkServerProvider = serverEndpoint.getConnectionProviderInterface("remote", NetworkServerProvider.class); + SimpleServerAuthenticationProvider provider = new SimpleServerAuthenticationProvider(); + provider.addUser("bob", "test", "pass".toCharArray()); + server = networkServerProvider.createServer(new InetSocketAddress("localhost", 30123), OptionMap.create(Options.SASL_MECHANISMS, Sequence.of("CRAM-MD5")), provider, null); + System.out.println("Server Created, " + server.getLocalAddress()); + + serverEndpoint.registerService("test", new OpenListener(){ + + public void channelOpened(Channel channel) { + channel.receiveMessage(new Channel.Receiver(){ + + public void handleError(Channel channel, IOException error) { + + } + + public void handleEnd(Channel channel) { +// System.out.println(channel.getConnection().getRemoteEndpointName() + " ended"); + } + + public void handleMessage(Channel channel, MessageInputStream message) { + try { + channel.receiveMessage(this); + buffer = new byte[BUFFER_SIZE]; + while (message.read(buffer) > -1); + System.out.println(" Receive: " + new String(buffer)); + } catch (Exception e) { + e.printStackTrace(); + } finally { + IoUtils.safeClose(message); + } + } + }); + } + + public void registrationTerminated() { + + }}, OptionMap.EMPTY); + } + + + public static void main(String[] args) throws IOException { + new CharSequenceReceiver(); + } + +} diff --git a/remoting/quickstart/src/main/java/org/jboss/remoting3/sample/CharSequenceSender.java b/remoting/quickstart/src/main/java/org/jboss/remoting3/sample/CharSequenceSender.java new file mode 100644 index 0000000..2deb2f2 --- /dev/null +++ b/remoting/quickstart/src/main/java/org/jboss/remoting3/sample/CharSequenceSender.java @@ -0,0 +1,80 @@ +package org.jboss.remoting3.sample; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.InetSocketAddress; + +import org.jboss.remoting3.Channel; +import org.jboss.remoting3.Connection; +import org.jboss.remoting3.Endpoint; +import org.jboss.remoting3.MessageOutputStream; +import org.jboss.remoting3.Registration; +import org.jboss.remoting3.Remoting; +import org.jboss.remoting3.remote.RemoteConnectionProviderFactory; +import org.xnio.IoFuture; +import org.xnio.IoUtils; +import org.xnio.OptionMap; +import org.xnio.Options; + +public class CharSequenceSender { + + private static final Integer THREAD_POOL_SIZE = 100; + + protected final Endpoint clientEndpoint; + + protected final Registration clientReg; + + protected final Connection conn; + + public CharSequenceSender() throws IOException { + clientEndpoint = Remoting.createEndpoint("connection-test-client", OptionMap.create(Options.WORKER_TASK_CORE_THREADS, THREAD_POOL_SIZE, Options.WORKER_TASK_MAX_THREADS, THREAD_POOL_SIZE)); + clientReg = clientEndpoint.addConnectionProvider("remote", new RemoteConnectionProviderFactory(), OptionMap.create(Options.SSL_ENABLED, Boolean.FALSE)); + conn = clientEndpoint.connect("remote", new InetSocketAddress("localhost", 0), new InetSocketAddress("localhost", 30123), OptionMap.EMPTY, "bob", "test", "pass".toCharArray()).get(); + System.out.println("Connection created, " + conn.getEndpoint().getName()); + send(); + } + + + private void send() { + System.out.println("enter 'quit' or 'exit' exit the ChatDemo"); + + BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); + + while (true) { + try { + String line = in.readLine().toLowerCase(); + if (line.startsWith("quit") || line.startsWith("exit")) { + break; + } + + MessageOutputStream stream = null; + Channel channel = null; + + try { + final IoFuture future = conn.openChannel("test", OptionMap.EMPTY); + channel = future.get(); + stream = channel.writeMessage(); + stream.write(new String(line.getBytes(), "UTF-8").getBytes()); + } catch (Exception e) { + throw new RuntimeException("send char sequence error", e); + } finally { + stream.close(); + IoUtils.safeClose(channel); + } + + } catch (Exception e) { + throw new RuntimeException("send char sequence error", e); + } + } + + IoUtils.safeClose(clientReg); + IoUtils.safeClose(clientEndpoint); + } + + + public static void main(String[] args) throws IOException { + new CharSequenceSender(); + } + +} diff --git a/remoting/quickstart/src/main/java/org/jboss/remoting3/test/ConnectionTest.java b/remoting/quickstart/src/main/java/org/jboss/remoting3/test/ConnectionTest.java new file mode 100644 index 0000000..a2448f6 --- /dev/null +++ b/remoting/quickstart/src/main/java/org/jboss/remoting3/test/ConnectionTest.java @@ -0,0 +1,168 @@ +package org.jboss.remoting3.test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Queue; +import java.util.Random; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReferenceArray; + +import org.jboss.remoting3.Channel; +import org.jboss.remoting3.Connection; +import org.jboss.remoting3.Endpoint; +import org.jboss.remoting3.MessageInputStream; +import org.jboss.remoting3.MessageOutputStream; +import org.jboss.remoting3.OpenListener; +import org.jboss.remoting3.Registration; +import org.jboss.remoting3.Remoting; +import org.jboss.remoting3.remote.RemoteConnectionProviderFactory; +import org.jboss.remoting3.security.SimpleServerAuthenticationProvider; +import org.jboss.remoting3.spi.NetworkServerProvider; +import org.xnio.IoFuture; +import org.xnio.IoUtils; +import org.xnio.OptionMap; +import org.xnio.Options; +import org.xnio.Sequence; +import org.xnio.XnioWorker; +import org.xnio.channels.AcceptingChannel; +import org.xnio.channels.ConnectedStreamChannel; + +public class ConnectionTest { + + private static final int THREAD_POOL_SIZE = 100; + private static final int CHANNEL_COUNT = 10; + private static final int CONNECTION_COUNT = 10; + private static final int BUFFER_SIZE = 8192; + private static final byte[] junkBuffer = new byte[BUFFER_SIZE]; + private static final int MESSAGE_COUNT = 8; + + protected final Endpoint clientEndpoint; + protected final Endpoint serverEndpoint; + private final Registration clientReg; + private final Registration serverReg; + + private AcceptingChannel server; + + public ConnectionTest() throws IOException { + clientEndpoint = Remoting.createEndpoint("connection-test-client", OptionMap.create(Options.WORKER_TASK_CORE_THREADS, THREAD_POOL_SIZE, Options.WORKER_TASK_MAX_THREADS, THREAD_POOL_SIZE)); + serverEndpoint = Remoting.createEndpoint("connection-test-server", OptionMap.create(Options.WORKER_TASK_CORE_THREADS, THREAD_POOL_SIZE, Options.WORKER_TASK_MAX_THREADS, THREAD_POOL_SIZE)); + clientReg = clientEndpoint.addConnectionProvider("remote", new RemoteConnectionProviderFactory(), OptionMap.create(Options.SSL_ENABLED, Boolean.FALSE)); + serverReg = serverEndpoint.addConnectionProvider("remote", new RemoteConnectionProviderFactory(), OptionMap.create(Options.SSL_ENABLED, Boolean.FALSE)); + + final NetworkServerProvider networkServerProvider = serverEndpoint.getConnectionProviderInterface("remote", NetworkServerProvider.class); + SimpleServerAuthenticationProvider provider = new SimpleServerAuthenticationProvider(); + provider.addUser("bob", "test", "pass".toCharArray()); + server = networkServerProvider.createServer(new InetSocketAddress("localhost", 30123), OptionMap.create(Options.SASL_MECHANISMS, Sequence.of("CRAM-MD5")), provider, null); + } + + private void destroy() { + IoUtils.safeClose(server); + IoUtils.safeClose(clientReg); + IoUtils.safeClose(serverReg); + IoUtils.safeClose(clientEndpoint); + IoUtils.safeClose(serverEndpoint); + } + + private void testManyChannelsLotsOfData() throws Exception { + final XnioWorker clientWorker = clientEndpoint.getXnioWorker(); + final XnioWorker serverWorker = serverEndpoint.getXnioWorker(); + final Queue problems = new ConcurrentLinkedQueue(); + final CountDownLatch serverChannelCount = new CountDownLatch(CHANNEL_COUNT * CONNECTION_COUNT); + final CountDownLatch clientChannelCount = new CountDownLatch(CHANNEL_COUNT * CONNECTION_COUNT); + serverEndpoint.registerService("test", new OpenListener(){ + + public void channelOpened(Channel channel) { + channel.receiveMessage(new Channel.Receiver() { + + public void handleMessage(Channel channel, MessageInputStream message) { + try { + channel.receiveMessage(this); + while (message.read(junkBuffer) > -1); + System.out.println(junkBuffer); + } catch (Exception e) { + e.printStackTrace(); + problems.add(e); + } finally { + IoUtils.safeClose(message); + } + } + + public void handleError(Channel channel, IOException error) { + problems.add(error); + error.printStackTrace(); + serverChannelCount.countDown(); + } + + public void handleEnd(Channel channel) { + serverChannelCount.countDown(); + } + }); + } + + public void registrationTerminated() { + + }}, OptionMap.EMPTY); + + final AtomicReferenceArray connections = new AtomicReferenceArray(CONNECTION_COUNT); + for (int h = 0; h < CONNECTION_COUNT; h ++) { + final Connection connection = clientEndpoint.connect("remote", new InetSocketAddress("localhost", 0), new InetSocketAddress("localhost", 30123), OptionMap.EMPTY, "bob", "test", "pass".toCharArray()).get(); + connections.set(h, connection); + for (int i = 0; i < CHANNEL_COUNT; i ++) { + clientWorker.execute(new Runnable(){ + + public void run() { + final Random random = new Random(); + final IoFuture future = connection.openChannel("test", OptionMap.EMPTY); + try { + final Channel channel = future.get(); + try { + final byte[] bytes = new byte[BUFFER_SIZE]; + for (int j = 0; j < MESSAGE_COUNT; j++) { + final MessageOutputStream stream = channel.writeMessage(); + try { + for (int k = 0; k < 100; k++) { + random.nextBytes(bytes); + stream.write(bytes, 0, random.nextInt(BUFFER_SIZE - 1) + 1); + } + stream.close(); + } finally { + IoUtils.safeClose(stream); + } + stream.close(); + } + } finally { + IoUtils.safeClose(channel); + } + } catch (IOException e) { + e.printStackTrace(); + problems.add(e); + } finally { + clientChannelCount.countDown(); + } + }}); + } + } + + Thread.sleep(500); + serverChannelCount.await(); + clientChannelCount.await(); + for (int h = 0; h < CONNECTION_COUNT; h ++) { + Connection conn = connections.get(h); + System.out.println(conn); + conn.close(); + } + } + + + public static void main(String[] args) throws Exception { + ConnectionTest test = new ConnectionTest(); + + test.testManyChannelsLotsOfData(); + + test.destroy(); + + System.out.println("DONE"); + } + +} diff --git a/remoting/quickstart/src/main/java/org/jboss/remoting3/test/OutboundMessageCountTest.java b/remoting/quickstart/src/main/java/org/jboss/remoting3/test/OutboundMessageCountTest.java new file mode 100644 index 0000000..b8e7b7c --- /dev/null +++ b/remoting/quickstart/src/main/java/org/jboss/remoting3/test/OutboundMessageCountTest.java @@ -0,0 +1,181 @@ +package org.jboss.remoting3.test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; + +import org.jboss.remoting3.Channel; +import org.jboss.remoting3.Connection; +import org.jboss.remoting3.Endpoint; +import org.jboss.remoting3.MessageInputStream; +import org.jboss.remoting3.MessageOutputStream; +import org.jboss.remoting3.OpenListener; +import org.jboss.remoting3.Registration; +import org.jboss.remoting3.Remoting; +import org.jboss.remoting3.RemotingOptions; +import org.jboss.remoting3.remote.RemoteConnectionProviderFactory; +import org.jboss.remoting3.security.SimpleServerAuthenticationProvider; +import org.jboss.remoting3.spi.NetworkServerProvider; +import org.xnio.FutureResult; +import org.xnio.IoFuture; +import org.xnio.IoUtils; +import org.xnio.OptionMap; +import org.xnio.Options; +import org.xnio.Sequence; +import org.xnio.channels.AcceptingChannel; +import org.xnio.channels.ConnectedStreamChannel; + +public class OutboundMessageCountTest { + + private static final int MAX_OUTBOUND_MESSAGES = 20; + + private Endpoint endpoint; + private Registration registration; + private AcceptingChannel streamServer; + + private Registration serviceRegistration; + private Connection connection; + + private Channel clientChannel; + private Channel serverChannel; + + private static final int BUFFER_SIZE = 8192; + private static byte[] buffer; + + public OutboundMessageCountTest() throws IOException, URISyntaxException { + endpoint = Remoting.createEndpoint("test", OptionMap.EMPTY); + registration = endpoint.addConnectionProvider("remote", new RemoteConnectionProviderFactory(), OptionMap.create(Options.SSL_ENABLED, Boolean.FALSE)); + NetworkServerProvider networkServerProvider = endpoint.getConnectionProviderInterface("remote", NetworkServerProvider.class); + SimpleServerAuthenticationProvider provider = new SimpleServerAuthenticationProvider(); + provider.addUser("bob", "test", "pass".toCharArray()); + streamServer = networkServerProvider.createServer(new InetSocketAddress("::1", 30123), OptionMap.create(Options.SASL_MECHANISMS, Sequence.of("CRAM-MD5")), provider, null); + + final FutureResult passer = new FutureResult(); + serviceRegistration = endpoint.registerService("org.jboss.test", new OpenListener(){ + + public void channelOpened(Channel channel) { + passer.setResult(channel); + channel.receiveMessage(new Channel.Receiver(){ + + public void handleError(Channel channel, IOException error) { + + } + + public void handleEnd(Channel channel) { +// System.out.println(channel.getConnection().getRemoteEndpointName() + " ended"); + } + + public void handleMessage(Channel channel, MessageInputStream message) { + try { + channel.receiveMessage(this); + buffer = new byte[BUFFER_SIZE]; + while (message.read(buffer) > -1); + System.out.println(" Receive: " + new String(buffer)); + } catch (Exception e) { + e.printStackTrace(); + } finally { + IoUtils.safeClose(message); + } + } + }); + } + + public void registrationTerminated() { + + }}, OptionMap.EMPTY); + + IoFuture futureConnection = endpoint.connect(new URI("remote://[::1]:30123"), OptionMap.EMPTY, "bob", "test", "pass".toCharArray()); + connection = futureConnection.get(); + final OptionMap channelCreationOptions = OptionMap.create(RemotingOptions.MAX_OUTBOUND_MESSAGES, MAX_OUTBOUND_MESSAGES); + IoFuture futureChannel = connection.openChannel("org.jboss.test", channelCreationOptions); + clientChannel = futureChannel.get(); + serverChannel = passer.getIoFuture().get(); + } + + public void testOutboundMessageSend() throws Exception { + final int NUM_THREADS = 150; + final ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS); + final Future[] futureFailures = new Future[NUM_THREADS]; + final Semaphore semaphore = new Semaphore(MAX_OUTBOUND_MESSAGES, true); + try { + // create and submit the tasks which will send out the messages + for (int i = 0; i < NUM_THREADS; i++) { + futureFailures[i] = executorService.submit(new MessageSender(this.clientChannel, semaphore)); + } + int failureCount = 0; + // wait for the tasks to complete and then collect any failures + for (int i = 0; i < NUM_THREADS; i++) { + final Throwable failure = futureFailures[i].get(); + if (failure == null) { + continue; + } + failureCount++; + System.out.println("Thread#" + i + " failed with exception " + failure); + } + } finally { + executorService.shutdown(); + } + } + + public void destroy(){ + IoUtils.safeClose(connection); + serviceRegistration.close(); + IoUtils.safeClose(streamServer); + IoUtils.safeClose(endpoint); + IoUtils.safeClose(registration); + } + + public static void main(String[] args) throws Exception { + OutboundMessageCountTest test = new OutboundMessageCountTest(); + test.testOutboundMessageSend(); + test.destroy(); + } + + private class MessageSender implements Callable { + + private Semaphore semaphore; + private Channel channel; + + MessageSender(final Channel channel, final Semaphore semaphore) { + this.semaphore = semaphore; + this.channel = channel; + } + + @Override + public Throwable call() throws Exception { + for (int i = 0; i < 3; i++) { + try { + // get a permit before trying to send a message to the channel + this.semaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return e; + } + MessageOutputStream messageOutputStream = null; + try { + // now send a message + messageOutputStream = this.channel.writeMessage(); + messageOutputStream.write("hello".getBytes()); + } catch (IOException e) { + return e; + } finally { + // close the message + if (messageOutputStream != null) { + messageOutputStream.close(); + } + // release the permit for others to use + this.semaphore.release(); + } + } + // no failures, return null + return null; + } + } + +} diff --git a/remoting/quickstart/src/main/java/org/jboss/remoting3/test/Runner.java b/remoting/quickstart/src/main/java/org/jboss/remoting3/test/Runner.java new file mode 100644 index 0000000..ccc8f89 --- /dev/null +++ b/remoting/quickstart/src/main/java/org/jboss/remoting3/test/Runner.java @@ -0,0 +1,10 @@ +package org.jboss.remoting3.test; + +public class Runner { + + public static void main(String[] args) { + + System.out.println("Done"); + } + +}