Skip to content

Commit

Permalink
Refactored and cleaned up encoder and decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
amichair committed May 22, 2016
1 parent dcc0ce0 commit 8f87ac8
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
public class FIXMessageDecoder implements MessageDecoder {

private static final char SOH = '\001';
private static final String FIELD_DELIMITER = String.valueOf(SOH);

private final Logger log = LoggerFactory.getLogger(getClass());

Expand Down Expand Up @@ -78,11 +77,11 @@ private void resetState() {
}

public FIXMessageDecoder() throws UnsupportedEncodingException {
this(CharsetSupport.getCharset(), FIELD_DELIMITER);
this(CharsetSupport.getCharset());
}

public FIXMessageDecoder(String charset) throws UnsupportedEncodingException {
this(charset, FIELD_DELIMITER);
this(charset, String.valueOf(SOH));
}

public FIXMessageDecoder(String charset, String delimiter) throws UnsupportedEncodingException {
Expand All @@ -93,12 +92,14 @@ public FIXMessageDecoder(String charset, String delimiter) throws UnsupportedEnc
resetState();
}

@Override
public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
boolean hasHeader = HEADER_PATTERN.find(in, in.position()) != -1L;
return hasHeader ? MessageDecoderResult.OK :
(in.remaining() > MAX_UNDECODED_DATA_LENGTH ? MessageDecoderResult.NOT_OK : MessageDecoderResult.NEED_DATA);
}

@Override
public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws ProtocolCodecException {
int messageCount = 0;
Expand Down Expand Up @@ -148,9 +149,9 @@ private boolean parseMessage(IoBuffer in, ProtocolDecoderOutput out)

if (state == PARSING_LENGTH) {
byte ch = 0;
while (hasRemaining(in)) {
ch = get(in);
if (!Character.isDigit((char) ch)) {
while (position < in.limit()) { // while data remains
ch = in.get(position++);
if (ch < '0' || ch > '9') { // if not digit
break;
}
bodyLength = bodyLength * 10 + (ch - '0');
Expand All @@ -161,7 +162,7 @@ private boolean parseMessage(IoBuffer in, ProtocolDecoderOutput out)
log.debug("body length = " + bodyLength + ": " + getBufferDebugInfo(in));
}
} else {
if (hasRemaining(in)) {
if (position < in.limit()) { // if data remains
String messageString = getMessageStringForError(in);
handleError(in, in.position() + 1, "Length format error in message (last character:" + ch + "): " + messageString,
false);
Expand All @@ -173,7 +174,7 @@ private boolean parseMessage(IoBuffer in, ProtocolDecoderOutput out)
}

if (state == READING_BODY) {
if (remaining(in) < bodyLength) {
if (in.limit() - position < bodyLength) { // if remaining data is less than body
break;
}
position += bodyLength;
Expand Down Expand Up @@ -223,9 +224,7 @@ private boolean parseMessage(IoBuffer in, ProtocolDecoderOutput out)
}
return messageFound;
} catch (Throwable t) {
state = SEEKING_HEADER;
position = 0;
bodyLength = 0;
resetState();
if (t instanceof ProtocolCodecException) {
throw (ProtocolCodecException) t;
} else {
Expand All @@ -234,23 +233,11 @@ private boolean parseMessage(IoBuffer in, ProtocolDecoderOutput out)
}
}

private int remaining(IoBuffer in) {
return in.limit() - position;
}

private String getBufferDebugInfo(IoBuffer in) {
return "pos=" + in.position() + ",lim=" + in.limit() + ",rem=" + in.remaining()
+ ",offset=" + position + ",state=" + state;
}

private byte get(IoBuffer in) {
return in.get(position++);
}

private boolean hasRemaining(IoBuffer in) {
return position < in.limit();
}

private String getMessageString(IoBuffer buffer) throws UnsupportedEncodingException {
byte[] data = new byte[position - buffer.position()];
buffer.get(data);
Expand Down Expand Up @@ -282,7 +269,8 @@ private boolean isLogon(IoBuffer buffer) {
return LOGON_PATTERN.find(buffer, buffer.position()) != -1L;
}

public void finishDecode(IoSession arg0, ProtocolDecoderOutput arg1) throws Exception {
@Override
public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
// empty
}

Expand All @@ -309,6 +297,7 @@ public interface MessageListener {
public List<String> extractMessages(File file) throws IOException, ProtocolCodecException {
final List<String> messages = new ArrayList<String>();
extractMessages(file, new MessageListener() {
@Override
public void onMessage(String message) {
messages.add(message);
}
Expand All @@ -331,22 +320,23 @@ public void extractMessages(File file, final MessageListener listener) throws IO
ProtocolCodecException {
// Set up a read-only memory-mapped file
RandomAccessFile fileIn = new RandomAccessFile(file, "r");
FileChannel readOnlyChannel = fileIn.getChannel();
MappedByteBuffer memoryMappedBuffer = readOnlyChannel.map(FileChannel.MapMode.READ_ONLY, 0,
(int) readOnlyChannel.size());

decode(null, IoBuffer.wrap(memoryMappedBuffer), new ProtocolDecoderOutput() {

public void write(Object message) {
listener.onMessage((String) message);
}

public void flush(IoFilter.NextFilter nextFilter, IoSession ioSession) {
// ignored
}
});
readOnlyChannel.close();
fileIn.close();
try {
FileChannel readOnlyChannel = fileIn.getChannel();
MappedByteBuffer memoryMappedBuffer = readOnlyChannel.map(FileChannel.MapMode.READ_ONLY, 0,
(int) readOnlyChannel.size());
decode(null, IoBuffer.wrap(memoryMappedBuffer), new ProtocolDecoderOutput() {
@Override
public void write(Object message) {
listener.onMessage((String) message);
}
@Override
public void flush(IoFilter.NextFilter nextFilter, IoSession ioSession) {
// ignored
}
});
} finally {
fileIn.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package quickfix.mina.message;

import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

Expand All @@ -39,16 +39,10 @@
*/
public class FIXMessageEncoder implements MessageEncoder<Object> {

private static final Set<Class<?>> TYPES;
private static final Set<Class<?>> TYPES =
new HashSet<>(Arrays.<Class<?>>asList(Message.class, String.class));
private final String charsetEncoding;

static {
Set<Class<?>> types = new HashSet<Class<?>>();
types.add(Message.class);
types.add(String.class);
TYPES = Collections.unmodifiableSet(types);
}

public FIXMessageEncoder() {
charsetEncoding = CharsetSupport.getCharset();
}
Expand All @@ -57,25 +51,28 @@ public static Set<Class<?>> getMessageTypes() {
return TYPES;
}

private byte[] toBytes(String str) throws ProtocolCodecException {
try {
return str.getBytes(charsetEncoding);
} catch (UnsupportedEncodingException e) {
throw new ProtocolCodecException(e);
}
}

@Override
public void encode(IoSession session, Object message, ProtocolEncoderOutput out)
throws ProtocolCodecException {
String fixMessageString;
// get message bytes
byte[] bytes;
if (message instanceof String) {
fixMessageString = (String) message;
bytes = toBytes((String) message);
} else if (message instanceof Message) {
fixMessageString = message.toString();
bytes = toBytes(message.toString());
} else {
throw new ProtocolCodecException("Invalid FIX message object type: "
+ message.getClass());
}

byte[] bytes;
try {
bytes = fixMessageString.getBytes(charsetEncoding);
} catch (UnsupportedEncodingException e) {
throw new ProtocolCodecException(e);
}

// write bytes to buffer and output it
IoBuffer buffer = IoBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
Expand Down

0 comments on commit 8f87ac8

Please sign in to comment.