diff --git a/src/com/serotonin/modbus4j/ip/rtu/Tcp2RtuMaster.java b/src/com/serotonin/modbus4j/ip/rtu/Tcp2RtuMaster.java new file mode 100644 index 0000000..5c5a210 --- /dev/null +++ b/src/com/serotonin/modbus4j/ip/rtu/Tcp2RtuMaster.java @@ -0,0 +1,171 @@ +package com.serotonin.modbus4j.ip.rtu; + +import com.serotonin.modbus4j.ModbusMaster; +import com.serotonin.modbus4j.exception.ModbusInitException; +import com.serotonin.modbus4j.exception.ModbusTransportException; +import com.serotonin.modbus4j.ip.IpParameters; +import com.serotonin.modbus4j.msg.ModbusRequest; +import com.serotonin.modbus4j.msg.ModbusResponse; +import com.serotonin.modbus4j.serial.rtu.RtuMessageParser; +import com.serotonin.modbus4j.serial.rtu.RtuMessageRequest; +import com.serotonin.modbus4j.serial.rtu.RtuMessageResponse; +import com.serotonin.modbus4j.sero.messaging.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; + +public class Tcp2RtuMaster extends ModbusMaster { + + private final Log LOG = LogFactory.getLog(Tcp2RtuMaster.class); + + private final IpParameters ipParameters; + private final boolean keepAlive; + private Socket socket; + private Transport transport; + private MessageControl conn; + + public Tcp2RtuMaster(IpParameters params, boolean keepAlive) { + this.ipParameters = params; + this.keepAlive = keepAlive; + } + @Override + protected MessageControl getMessageControl() { + MessageControl messageControl = super.getMessageControl(); + messageControl.DEBUG = true; + + return messageControl; + } + @Override + public void init() throws ModbusInitException { + try { + if (this.keepAlive) { + this.openConnection(); + } + } catch (Exception var2) { + throw new ModbusInitException(var2); + } + this.initialized = true; + + } + + public synchronized void destroy() { + this.closeConnection(); + this.initialized = false; + } + + @Override + public ModbusResponse sendImpl(ModbusRequest request) throws ModbusTransportException { + try { + if (!this.keepAlive) { + this.openConnection(); + } + + if (this.conn == null) { + this.LOG.debug("Connection null: " + this.ipParameters.getPort()); + } + } catch (Exception var18) { + this.closeConnection(); + throw new ModbusTransportException(var18, request.getSlaveId()); + } + + RtuMessageRequest rtuRequest = new RtuMessageRequest(request); + + try { + RtuMessageResponse rtuResponse = (RtuMessageResponse)this.conn.send(rtuRequest); + ModbusResponse var4; + if (rtuResponse == null) { + var4 = null; + return var4; + } else { + var4 = rtuResponse.getModbusResponse(); + return var4; + } + } catch (Exception var8) { +// throw new ModbusTransportException(var8, request.getSlaveId()); + if (this.keepAlive) { + + try { + this.openConnection(); + RtuMessageResponse rtuResponse = (RtuMessageResponse)this.conn.send(rtuRequest); + ModbusResponse var4; + if (rtuResponse == null) { + var4 = null; + return var4; + } else { + var4 = rtuResponse.getModbusResponse(); + return var4; + } + } catch (Exception e) { + throw new ModbusTransportException(e, request.getSlaveId()); + } + + } + throw new ModbusTransportException(var8, request.getSlaveId()); + } finally { + if (!this.keepAlive) { + this.closeConnection(); + } + } + } + private void closeConnection() { + this.closeMessageControl(this.conn); + + try { + if (this.socket != null) { + this.socket.close(); + } + } catch (IOException var2) { + this.getExceptionHandler().receivedException(var2); + } + + this.conn = null; + this.socket = null; + } + private void openConnection() throws IOException { + this.closeConnection(); + int retries = this.getRetries(); + int retryPause = 50; + + while(true) { + try { + this.socket = new Socket(); + this.socket.setSoTimeout(this.getTimeout()); + this.socket.connect(new InetSocketAddress(this.ipParameters.getHost(), this.ipParameters.getPort()), this.getTimeout()); + if (this.getePoll() != null) { + this.transport = new EpollStreamTransport(this.socket.getInputStream(), this.socket.getOutputStream(), this.getePoll()); + } else { + this.transport = new StreamTransport(this.socket.getInputStream(), this.socket.getOutputStream()); + } + break; + } catch (IOException var6) { + this.closeConnection(); + if (retries <= 0) { + throw var6; + } + + --retries; + + try { + Thread.sleep(retryPause); + } catch (InterruptedException var5) { + } + + retryPause *= 2; + if (retryPause > 1000) { + retryPause = 1000; + } + } + } + RtuMessageParser rtuMessageParser = new RtuMessageParser(true); + + this.conn = this.getMessageControl(); + this.conn.start(this.transport, rtuMessageParser, (RequestHandler)null, new Tcp2RtuSerialWaitingRoomKeyFactory()); + if (this.getePoll() == null) { + ((StreamTransport)this.transport).start("Modbus4J TcpMaster"); + } + } + +} diff --git a/src/com/serotonin/modbus4j/ip/rtu/Tcp2RtuSerialWaitingRoomKeyFactory.java b/src/com/serotonin/modbus4j/ip/rtu/Tcp2RtuSerialWaitingRoomKeyFactory.java new file mode 100644 index 0000000..c2cc72a --- /dev/null +++ b/src/com/serotonin/modbus4j/ip/rtu/Tcp2RtuSerialWaitingRoomKeyFactory.java @@ -0,0 +1,40 @@ +package com.serotonin.modbus4j.ip.rtu; + +import com.serotonin.modbus4j.sero.messaging.IncomingResponseMessage; +import com.serotonin.modbus4j.sero.messaging.OutgoingRequestMessage; +import com.serotonin.modbus4j.sero.messaging.WaitingRoomKey; +import com.serotonin.modbus4j.sero.messaging.WaitingRoomKeyFactory; + +public class Tcp2RtuSerialWaitingRoomKeyFactory implements WaitingRoomKeyFactory { + private final Sync sync = new Sync(); + + public Tcp2RtuSerialWaitingRoomKeyFactory() { + } + + public WaitingRoomKey createWaitingRoomKey(OutgoingRequestMessage request) { + return sync; + } + + public WaitingRoomKey createWaitingRoomKey(IncomingResponseMessage response) { + return sync; + } + + static class Sync implements WaitingRoomKey { + Sync() { + } + + public int hashCode() { + return 1; + } + + public boolean equals(Object obj) { + if (this == obj) { + return true; + } else if (obj == null) { + return false; + } else { + return this.getClass() == obj.getClass(); + } + } + } +} diff --git a/src/com/serotonin/modbus4j/sero/messaging/MessageControl.java b/src/com/serotonin/modbus4j/sero/messaging/MessageControl.java index 4ec5d89..6f939ca 100644 --- a/src/com/serotonin/modbus4j/sero/messaging/MessageControl.java +++ b/src/com/serotonin/modbus4j/sero/messaging/MessageControl.java @@ -245,6 +245,10 @@ public void send(OutgoingResponseMessage response) throws IOException { * Incoming data from the transport. Single-threaded. */ public void data(byte[] b, int len) { + if(!waitingRoom.hasWait()) + { + return; + } if (DEBUG) System.out.println("MessagingConnection.read: " + StreamUtils.dumpHex(b, 0, len)); if (ioLog != null) diff --git a/src/com/serotonin/modbus4j/sero/messaging/StreamTransport.java b/src/com/serotonin/modbus4j/sero/messaging/StreamTransport.java index d496b39..f54c12d 100644 --- a/src/com/serotonin/modbus4j/sero/messaging/StreamTransport.java +++ b/src/com/serotonin/modbus4j/sero/messaging/StreamTransport.java @@ -72,8 +72,11 @@ public void setConsumer(DataConsumer consumer) { *

removeConsumer.

*/ public void removeConsumer() { - listener.stop(); - listener = null; + if(listener != null) + { + listener.stop(); + listener = null; + } } /** diff --git a/src/com/serotonin/modbus4j/sero/messaging/WaitingRoom.java b/src/com/serotonin/modbus4j/sero/messaging/WaitingRoom.java index 85e87fd..f8ba58e 100644 --- a/src/com/serotonin/modbus4j/sero/messaging/WaitingRoom.java +++ b/src/com/serotonin/modbus4j/sero/messaging/WaitingRoom.java @@ -22,6 +22,10 @@ void setKeyFactory(WaitingRoomKeyFactory keyFactory) { this.keyFactory = keyFactory; } + public Boolean hasWait() + { + return !waitHere.isEmpty(); + } /** * The request message should be sent AFTER entering the waiting room so that the (vanishingly small) chance of a * response being returned before the thread is waiting for it is eliminated.