diff --git a/mqtt-client/src/main/java/org/fusesource/mqtt/client/CallbackConnection.java b/mqtt-client/src/main/java/org/fusesource/mqtt/client/CallbackConnection.java
index 9aadb95..4f32621 100644
--- a/mqtt-client/src/main/java/org/fusesource/mqtt/client/CallbackConnection.java
+++ b/mqtt-client/src/main/java/org/fusesource/mqtt/client/CallbackConnection.java
@@ -71,7 +71,10 @@
* @author Hiram Chirino
*/
public class CallbackConnection {
-
+ private boolean onRefillCalled =false;
+ public static final Task NOOP = Dispatch.NOOP;
+ private short nextMessageId = 1;
+
private static class Request {
private final MQTTFrame frame;
private final short id;
@@ -411,7 +414,6 @@ public void onFailure(Throwable value) {
}
}
- private boolean onRefillCalled =false;
public void onSessionEstablished(Transport transport) {
this.transport = transport;
if( suspendCount.get() > 0 ) {
@@ -741,7 +743,6 @@ private void send(Request request) {
}
}
- private short nextMessageId = 1;
private short getNextMessageId() {
short rc = nextMessageId;
nextMessageId++;
@@ -858,8 +859,6 @@ private void processFrame(MQTTFrame frame) {
}
}
- static public final Task NOOP = Dispatch.NOOP;
-
private void toReceiver(final PUBLISH publish) {
if( listener !=null ) {
try {
diff --git a/mqtt-client/src/main/java/org/fusesource/mqtt/client/MQTT.java b/mqtt-client/src/main/java/org/fusesource/mqtt/client/MQTT.java
index 2e1b3ca..67584f1 100644
--- a/mqtt-client/src/main/java/org/fusesource/mqtt/client/MQTT.java
+++ b/mqtt-client/src/main/java/org/fusesource/mqtt/client/MQTT.java
@@ -45,44 +45,7 @@ public class MQTT {
private static final long KEEP_ALIVE = Long.parseLong(System.getProperty("mqtt.thread.keep_alive", Integer.toString(1000)));
private static final long STACK_SIZE = Long.parseLong(System.getProperty("mqtt.thread.stack_size", Integer.toString(1024*512)));
private static ThreadPoolExecutor blockingThreadPool;
-
-
- public synchronized static ThreadPoolExecutor getBlockingThreadPool() {
- if( blockingThreadPool == null ) {
- blockingThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, KEEP_ALIVE, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadFactory() {
- public Thread newThread(Runnable r) {
- Thread rc = new Thread(null, r, "MQTT Task", STACK_SIZE);
- rc.setDaemon(true);
- return rc;
- }
- }) {
-
- @Override
- public void shutdown() {
- // we don't ever shutdown since we are shared..
- }
-
- @Override
- public List shutdownNow() {
- // we don't ever shutdown since we are shared..
- return Collections.emptyList();
- }
- };
- }
- return blockingThreadPool;
- }
- public synchronized static void setBlockingThreadPool(ThreadPoolExecutor pool) {
- blockingThreadPool = pool;
- }
-
private static final URI DEFAULT_HOST = createDefaultHost();
- private static URI createDefaultHost() {
- try {
- return new URI("tcp://127.0.0.1:1883");
- } catch (URISyntaxException e) {
- return null;
- }
- }
protected URI host = DEFAULT_HOST;
protected URI localAddress;
@@ -127,6 +90,42 @@ public MQTT(MQTT other) {
this.tracer = other.tracer;
}
+ public synchronized static ThreadPoolExecutor getBlockingThreadPool() {
+ if( blockingThreadPool == null ) {
+ blockingThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, KEEP_ALIVE, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ Thread rc = new Thread(null, r, "MQTT Task", STACK_SIZE);
+ rc.setDaemon(true);
+ return rc;
+ }
+ }) {
+
+ @Override
+ public void shutdown() {
+ // we don't ever shutdown since we are shared..
+ }
+
+ @Override
+ public List shutdownNow() {
+ // we don't ever shutdown since we are shared..
+ return Collections.emptyList();
+ }
+ };
+ }
+ return blockingThreadPool;
+ }
+ public synchronized static void setBlockingThreadPool(ThreadPoolExecutor pool) {
+ blockingThreadPool = pool;
+ }
+
+ private static URI createDefaultHost() {
+ try {
+ return new URI("tcp://127.0.0.1:1883");
+ } catch (URISyntaxException e) {
+ return null;
+ }
+ }
+
public CallbackConnection callbackConnection() {
if( !isCleanSession() && ( getClientId()==null || getClientId().length==0 )) {
throw new IllegalArgumentException("The client id MUST be configured when clean session is set to false");
diff --git a/mqtt-client/src/main/java/org/fusesource/mqtt/codec/MQTTProtocolCodec.java b/mqtt-client/src/main/java/org/fusesource/mqtt/codec/MQTTProtocolCodec.java
index a5d300c..3b96d42 100644
--- a/mqtt-client/src/main/java/org/fusesource/mqtt/codec/MQTTProtocolCodec.java
+++ b/mqtt-client/src/main/java/org/fusesource/mqtt/codec/MQTTProtocolCodec.java
@@ -37,6 +37,25 @@ public class MQTTProtocolCodec extends AbstractProtocolCodec {
private int maxMessageLength = 1024*1024*100;
+ private final Action readHeader = new Action() {
+ public MQTTFrame apply() throws IOException {
+ int length = readLength();
+ if( length >= 0 ) {
+ if( length > maxMessageLength) {
+ throw new IOException("The maximum message length was exceeded");
+ }
+ byte header = readBuffer.get(readStart);
+ readStart = readEnd;
+ if( length > 0 ) {
+ nextDecodeAction = readBody(header, length);
+ } else {
+ return new MQTTFrame().header(header);
+ }
+ }
+ return null;
+ }
+ };
+
public MQTTProtocolCodec() {
this.bufferPools = BUFFER_POOLS;
}
@@ -76,25 +95,6 @@ protected Action initialDecodeAction() {
return readHeader;
}
- private final Action readHeader = new Action() {
- public MQTTFrame apply() throws IOException {
- int length = readLength();
- if( length >= 0 ) {
- if( length > maxMessageLength) {
- throw new IOException("The maximum message length was exceeded");
- }
- byte header = readBuffer.get(readStart);
- readStart = readEnd;
- if( length > 0 ) {
- nextDecodeAction = readBody(header, length);
- } else {
- return new MQTTFrame().header(header);
- }
- }
- return null;
- }
- };
-
private int readLength() throws IOException {
readEnd = readStart+2; // Header is at least 2 bytes..
int limit = readBuffer.position();
diff --git a/mqtt-client/src/main/java/org/fusesource/mqtt/codec/PUBREL.java b/mqtt-client/src/main/java/org/fusesource/mqtt/codec/PUBREL.java
index 121ea7e..aab1dd6 100644
--- a/mqtt-client/src/main/java/org/fusesource/mqtt/codec/PUBREL.java
+++ b/mqtt-client/src/main/java/org/fusesource/mqtt/codec/PUBREL.java
@@ -38,14 +38,14 @@ public class PUBREL extends MessageSupport.HeaderBase implements Message, Acked
private short messageId;
- public byte messageType() {
- return TYPE;
- }
-
public PUBREL() {
qos(QoS.AT_LEAST_ONCE);
}
+ public byte messageType() {
+ return TYPE;
+ }
+
public PUBREL decode(MQTTFrame frame) throws ProtocolException {
assert(frame.buffers.length == 1);
header(frame.header());