diff --git a/.github/workflows/jira-link.yaml b/.github/workflows/jira-link.yaml new file mode 100644 index 0000000..18240fb --- /dev/null +++ b/.github/workflows/jira-link.yaml @@ -0,0 +1,22 @@ +name: jira-link + +on: + pull_request: + types: [opened, edited, reopened, synchronize] + +jobs: + jira-link: + runs-on: ubuntu-20.04 + steps: + - name: check pull request title and source branch name + run: | + echo "Checking pull request with title ${{ github.event.pull_request.title }} from source branch ${{ github.event.pull_request.head.ref }}" + if ! [[ "${{ github.event.pull_request.title }}" =~ ^AIRO-[0-9]+[[:space:]].*$ ]] && ! [[ "${{ github.event.pull_request.head.ref }}" =~ ^AIRO-[0-9]+.*$ ]] + then + echo -e "Please make sure one of the following is true:\n \ + 1. the pull request title starts with 'AIRO-xxxx ', e.g. 'AIRO-1024 My Pull Request'\n \ + 2. the source branch starts with 'AIRO-xxx', e.g. 'AIRO-1024-my-branch'" + exit 1 + else + echo "Completed checking" + fi diff --git a/.yamato/sonar.yml b/.yamato/sonar.yml new file mode 100644 index 0000000..6ade886 --- /dev/null +++ b/.yamato/sonar.yml @@ -0,0 +1,17 @@ +name: Sonarqube Standard Scan +agent: + type: Unity::metal::macmini + image: package-ci/mac + flavor: m1.mac +variables: + SONARQUBE_PROJECT_KEY: ai-robotics-endpoint-ros +commands: + - curl https://binaries.sonarsource.com/Distribution/sonar-scanner-cli/sonar-scanner-cli-4.6.2.2472-macosx.zip -o sonar-scanner-macosx.zip -L + - unzip sonar-scanner-macosx.zip -d ~/sonar-scanner + - ~/sonar-scanner/sonar-scanner-4.6.2.2472-macosx/bin/sonar-scanner -Dsonar.projectKey=$SONARQUBE_PROJECT_KEY -Dsonar.sources=. -Dsonar.host.url=$SONARQUBE_ENDPOINT_URL_PRD -Dsonar.login=$SONARQUBE_TOKEN_PRD +triggers: + cancel_old_ci: true + expression: | + ((pull_request.target eq "main" OR pull_request.target eq "dev-ros") + AND NOT pull_request.push.changes.all match "**/*.md") OR + (push.branch eq "main" OR push.branch eq "dev-ros") \ No newline at end of file diff --git a/.yamato/yamato-config.yml b/.yamato/yamato-config.yml index b402644..fd3165d 100644 --- a/.yamato/yamato-config.yml +++ b/.yamato/yamato-config.yml @@ -3,6 +3,9 @@ agent: type: Unity::VM image: robotics/ci-ubuntu20:v0.1.0-795910 flavor: i1.large +variables: + # Coverage measured as a percentage (out of 100) + COVERAGE_EXPECTED: 30 commands: # run unit tests and save test results in /home/bokken/build/output/Unity-Technologies/ROS-TCP-Endpoint - command: | @@ -15,14 +18,14 @@ commands: - command: | linecoverage=$(head -2 test-results/coverage.xml | grep -Eo 'line-rate="[0-9]+([.][0-9]+)?"' | grep -Eo "[0-9]+([.][0-9]+)?") echo "Line coverage: $linecoverage" - if (( $(echo "$linecoverage < 0.3" | bc -l) )); then exit 1; fi + if (( $(echo "$linecoverage * 100.0 < $COVERAGE_EXPECTED" | bc -l) )); + then echo "ERROR: Code Coverage is under threshold of $COVERAGE_EXPECTED%" && exit 1 + fi triggers: cancel_old_ci: true expression: | - (pull_request.target eq "main" AND - NOT pull_request.push.changes.all match "**/*.md") OR - (pull_request.target eq "dev" AND - NOT pull_request.push.changes.all match "**/*.md") + (pull_request.target eq "main" OR push.branch eq "main" OR pull_request.target eq "dev-ros" OR push.branch eq "dev-ros") + AND NOT pull_request.push.changes.all match "**/*.md" artifacts: logs: paths: diff --git a/CHANGELOG.md b/CHANGELOG.md index 6869c9d..1f6aa8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,19 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) a ### Fixed +## [0.7.0] - 2022-02-01 + +### Added + +Added Sonarqube Scanner + +Private ros params + +Send information during hand shaking for ros and package version checks + +Send service response as one queue item + + ## [0.6.0] - 2021-09-30 Add the [Close Stale Issues](https://github.com/marketplace/actions/close-stale-issues) action diff --git a/README.md b/README.md index 40f60cb..57104d4 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,19 @@ # ROS TCP Endpoint -[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) +[![License](https://img.shields.io/badge/license-Apache--2.0-green.svg)](LICENSE.md) +[![Version](https://img.shields.io/github/v/tag/Unity-Technologies/ROS-TCP-Endpoint)](https://github.com/Unity-Technologies/ROS-TCP-Endpoint/releases) +![ROS](https://img.shields.io/badge/ros-melodic,noetic-brightgreen) +![ROS](https://img.shields.io/badge/ros2-foxy,galactic-brightgreen) + +--- + +We're currently working on lots of things! Please take a short moment fill out our [survey](https://unitysoftware.co1.qualtrics.com/jfe/form/SV_0ojVkDVW0nNrHkW) to help us identify what products and packages to build next. + +--- ## Introduction -[ROS](https://www.ros.org/) package used to create an endpoint to accept ROS messages sent from a Unity scene using the [ROS TCP Connector](https://github.com/Unity-Technologies/ROS-TCP-Connector) scripts. +[ROS](https://www.ros.org/) package used to create an endpoint to accept ROS messages sent from a Unity scene. This ROS package works in tandem with the [ROS TCP Connector](https://github.com/Unity-Technologies/ROS-TCP-Connector) Unity package. Instructions and examples on how to use this ROS package can be found on the [Unity Robotics Hub](https://github.com/Unity-Technologies/Unity-Robotics-Hub/blob/master/tutorials/ros_unity_integration/README.md) repository. diff --git a/config/params.yaml b/config/params.yaml deleted file mode 100644 index 2508c4a..0000000 --- a/config/params.yaml +++ /dev/null @@ -1 +0,0 @@ -ROS_IP: 127.0.0.1 diff --git a/launch/endpoint.launch b/launch/endpoint.launch index f5d7788..fbc29ce 100644 --- a/launch/endpoint.launch +++ b/launch/endpoint.launch @@ -1,4 +1,9 @@ - - - \ No newline at end of file + + + + + + + + diff --git a/package.xml b/package.xml index 5f464eb..7d435a1 100644 --- a/package.xml +++ b/package.xml @@ -1,7 +1,7 @@ ros_tcp_endpoint - 0.6.0 + 0.7.0 Acts as the bridge between Unity messages sent via Websocket and ROS messages. Unity Robotics diff --git a/src/ros_tcp_endpoint/__init__.py b/src/ros_tcp_endpoint/__init__.py index adb5b02..381ca8e 100644 --- a/src/ros_tcp_endpoint/__init__.py +++ b/src/ros_tcp_endpoint/__init__.py @@ -12,8 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .publisher import RosPublisher -from .subscriber import RosSubscriber -from .service import RosService from .server import TcpServer -from .unity_service import UnityService diff --git a/src/ros_tcp_endpoint/client.py b/src/ros_tcp_endpoint/client.py index 69b0e55..1c5585a 100644 --- a/src/ros_tcp_endpoint/client.py +++ b/src/ros_tcp_endpoint/client.py @@ -71,8 +71,7 @@ def read_int32(conn): num = struct.unpack(" 0 and not data: - rospy.logerr("No data for a message size of {}, breaking!".format(full_message_size)) + self.tcp_server.logerr( + "No data for a message size of {}, breaking!".format(full_message_size) + ) return + destination = destination.rstrip("\x00") return destination, data @staticmethod @@ -151,16 +152,16 @@ def serialize_command(command, params): return cmd_info + json_info def send_ros_service_request(self, srv_id, destination, data): - if destination not in self.tcp_server.ros_services.keys(): + if destination not in self.tcp_server.ros_services_table.keys(): error_msg = "Service destination '{}' is not registered! Known services are: {} ".format( - destination, self.tcp_server.ros_services.keys() + destination, self.tcp_server.ros_services_table.keys() ) self.tcp_server.send_unity_error(error_msg) - rospy.logerr(error_msg) + self.tcp_server.logerr(error_msg) # TODO: send a response to Unity anyway? return else: - ros_communicator = self.tcp_server.ros_services[destination] + ros_communicator = self.tcp_server.ros_services_table[destination] service_thread = threading.Thread( target=self.service_call_thread, args=(srv_id, destination, data, ros_communicator) ) @@ -173,7 +174,7 @@ def service_call_thread(self, srv_id, destination, data, ros_communicator): if not response: error_msg = "No response data from service '{}'!".format(destination) self.tcp_server.send_unity_error(error_msg) - rospy.logerr(error_msg) + self.tcp_server.logerr(error_msg) # TODO: send a response to Unity anyway? return @@ -194,7 +195,7 @@ def run(self): msg: the ROS msg type as bytes """ - rospy.loginfo("Connection from {}".format(self.incoming_ip)) + self.tcp_server.loginfo("Connection from {}".format(self.incoming_ip)) halt_event = threading.Event() self.tcp_server.unity_tcp_sender.start_sender(self.conn, halt_event) try: @@ -219,18 +220,18 @@ def run(self): elif destination.startswith("__"): # handle a system command, such as registering new topics self.tcp_server.handle_syscommand(destination, data) - elif destination in self.tcp_server.publishers: - ros_communicator = self.tcp_server.publishers[destination] + elif destination in self.tcp_server.publishers_table: + ros_communicator = self.tcp_server.publishers_table[destination] ros_communicator.send(data) else: error_msg = "Not registered to publish topic '{}'! Valid publish topics are: {} ".format( - destination, self.tcp_server.publishers.keys() + destination, self.tcp_server.publishers_table.keys() ) self.tcp_server.send_unity_error(error_msg) - rospy.logerr(error_msg) + self.tcp_server.logerr(error_msg) except IOError as e: - rospy.logerr("Exception: {}".format(e)) + self.tcp_server.logerr("Exception: {}".format(e)) finally: halt_event.set() self.conn.close() - rospy.loginfo("Disconnected from {}".format(self.incoming_ip)) + self.tcp_server.loginfo("Disconnected from {}".format(self.incoming_ip)) diff --git a/src/ros_tcp_endpoint/communication.py b/src/ros_tcp_endpoint/communication.py index d53a1d2..469c741 100644 --- a/src/ros_tcp_endpoint/communication.py +++ b/src/ros_tcp_endpoint/communication.py @@ -15,10 +15,11 @@ class RosSender: """ - Base class for ROS communication where data is sent to the ROS network. + Base class for ROS communication where data is sent to the ROS network. """ - def __init__(self): + def __init__(self, node_name): + self.node_name = node_name pass def send(self, *args): @@ -27,10 +28,11 @@ def send(self, *args): class RosReceiver: """ - Base class for ROS communication where data is being sent outside of the ROS network. + Base class for ROS communication where data is being sent outside of the ROS network. """ - def __init__(self): + def __init__(self, node_name): + self.node_name = node_name pass def send(self, *args): diff --git a/src/ros_tcp_endpoint/default_server_endpoint.py b/src/ros_tcp_endpoint/default_server_endpoint.py index 2431572..fdc92b9 100755 --- a/src/ros_tcp_endpoint/default_server_endpoint.py +++ b/src/ros_tcp_endpoint/default_server_endpoint.py @@ -5,12 +5,10 @@ from ros_tcp_endpoint import TcpServer -def main(): - ros_node_name = rospy.get_param("/TCP_NODE_NAME", "TCPServer") - tcp_server = TcpServer(ros_node_name) - +def main(args=None): # Start the Server Endpoint - rospy.init_node(ros_node_name, anonymous=True) + rospy.init_node("unity_endpoint", anonymous=True) + tcp_server = TcpServer(rospy.get_name()) tcp_server.start() rospy.spin() diff --git a/src/ros_tcp_endpoint/publisher.py b/src/ros_tcp_endpoint/publisher.py index 612e853..5d10d5f 100644 --- a/src/ros_tcp_endpoint/publisher.py +++ b/src/ros_tcp_endpoint/publisher.py @@ -13,7 +13,7 @@ # limitations under the License. import rospy - +import re from .communication import RosSender @@ -22,6 +22,7 @@ class RosPublisher(RosSender): Class to publish messages to a ROS topic """ + # TODO: surface latch functionality def __init__(self, topic, message_class, queue_size=10, latch=False): """ @@ -30,7 +31,9 @@ def __init__(self, topic, message_class, queue_size=10, latch=False): message_class: The message class in catkin workspace queue_size: Max number of entries to maintain in an outgoing queue """ - RosSender.__init__(self) + strippedTopic = re.sub("[^A-Za-z0-9_]+", "", topic) + node_name = "{}_RosPublisher".format(strippedTopic) + RosSender.__init__(self, node_name) self.msg = message_class() self.pub = rospy.Publisher(topic, message_class, queue_size=queue_size, latch=latch) diff --git a/src/ros_tcp_endpoint/server.py b/src/ros_tcp_endpoint/server.py index 1c54ef3..ed82f79 100644 --- a/src/ros_tcp_endpoint/server.py +++ b/src/ros_tcp_endpoint/server.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import sys -import json import rospy import socket import logging +import json +import sys import threading import importlib @@ -33,7 +33,7 @@ class TcpServer: Initializes ROS node and TCP server. """ - def __init__(self, node_name, buffer_size=1024, connections=2, tcp_ip="", tcp_port=-1): + def __init__(self, node_name, buffer_size=1024, connections=10, tcp_ip=None, tcp_port=None): """ Initializes ROS node and class variables. @@ -42,23 +42,25 @@ def __init__(self, node_name, buffer_size=1024, connections=2, tcp_ip="", tcp_po buffer_size: The read buffer size used when reading from a socket connections: Max number of queued connections. See Python Socket documentation """ - if tcp_ip != "": + if tcp_ip: + self.loginfo("Using 'tcp_ip' override from constructor: {}".format(tcp_ip)) self.tcp_ip = tcp_ip else: - self.tcp_ip = rospy.get_param("/ROS_IP") + self.tcp_ip = rospy.get_param("~tcp_ip", "0.0.0.0") - if tcp_port != -1: + if tcp_port: + self.loginfo("Using 'tcp_port' override from constructor: {}".format(tcp_port)) self.tcp_port = tcp_port else: - self.tcp_port = rospy.get_param("/ROS_TCP_PORT", 10000) + self.tcp_port = rospy.get_param("~tcp_port", 10000) - self.unity_tcp_sender = UnityTcpSender() + self.unity_tcp_sender = UnityTcpSender(self) self.node_name = node_name - self.publishers = {} - self.subscribers = {} - self.ros_services = {} - self.unity_services = {} + self.publishers_table = {} + self.subscribers_table = {} + self.ros_services_table = {} + self.unity_services_table = {} self.buffer_size = buffer_size self.connections = connections self.syscommands = SysCommands(self) @@ -67,9 +69,9 @@ def __init__(self, node_name, buffer_size=1024, connections=2, tcp_ip="", tcp_po def start(self, publishers=None, subscribers=None): if publishers is not None: - self.publishers = publishers + self.publishers_table = publishers if subscribers is not None: - self.subscribers = subscribers + self.subscribers_table = subscribers server_thread = threading.Thread(target=self.listen_loop) # Exit the server thread when the main thread terminates server_thread.daemon = True @@ -77,10 +79,10 @@ def start(self, publishers=None, subscribers=None): def listen_loop(self): """ - Creates and binds sockets using TCP variables then listens for incoming connections. - For each new connection a client thread will be created to handle communication. + Creates and binds sockets using TCP variables then listens for incoming connections. + For each new connection a client thread will be created to handle communication. """ - rospy.loginfo("Starting server on {}:{}".format(self.tcp_ip, self.tcp_port)) + self.loginfo("Starting server on {}:{}".format(self.tcp_ip, self.tcp_port)) tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) tcp_server.bind((self.tcp_ip, self.tcp_port)) @@ -92,7 +94,7 @@ def listen_loop(self): (conn, (ip, port)) = tcp_server.accept() ClientThread(conn, self, ip, port).start() except socket.timeout as err: - logging.exception("ros_tcp_endpoint.TcpServer: socket timeout") + self.logerr("ros_tcp_endpoint.TcpServer: socket timeout") def send_unity_error(self, error): self.unity_tcp_sender.send_unity_error(error) @@ -110,12 +112,24 @@ def handle_syscommand(self, topic, data): function = getattr(self.syscommands, topic[2:]) if function is None: self.send_unity_error("Don't understand SysCommand.'{}'".format(topic)) - return else: message_json = data.decode("utf-8") params = json.loads(message_json) function(**params) + def loginfo(self, text): + rospy.loginfo(text) + + def logwarn(self, text): + rospy.logwarn(text) + + def logerr(self, text): + rospy.logerr(text) + + def unregister_node(self, old_node): + if old_node is not None: + old_node.unregister() + class SysCommands: def __init__(self, tcp_server): @@ -130,19 +144,21 @@ def subscribe(self, topic, message_name): ) return - message_class = resolve_message_name(message_name) + message_class = self.resolve_message_name(message_name) if message_class is None: self.tcp_server.send_unity_error( "SysCommand.subscribe - Unknown message class '{}'".format(message_name) ) return - rospy.loginfo("RegisterSubscriber({}, {}) OK".format(topic, message_class)) + old_node = self.tcp_server.subscribers_table.get(topic) + if old_node is not None: + self.tcp_server.unregister_node(old_node) - if topic in self.tcp_server.subscribers: - self.tcp_server.subscribers[topic].unregister() + new_subscriber = RosSubscriber(topic, message_class, self.tcp_server) + self.tcp_server.subscribers_table[topic] = new_subscriber - self.tcp_server.subscribers[topic] = RosSubscriber(topic, message_class, self.tcp_server) + self.tcp_server.loginfo("RegisterSubscriber({}, {}) OK".format(topic, message_class)) def publish(self, topic, message_name, queue_size=10, latch=False): if topic == "": @@ -153,19 +169,22 @@ def publish(self, topic, message_name, queue_size=10, latch=False): ) return - message_class = resolve_message_name(message_name) + message_class = self.resolve_message_name(message_name) if message_class is None: self.tcp_server.send_unity_error( "SysCommand.publish - Unknown message class '{}'".format(message_name) ) return - rospy.loginfo("RegisterPublisher({}, {}) OK".format(topic, message_class)) + old_node = self.tcp_server.publishers_table.get(topic) + if old_node is not None: + self.tcp_server.unregister_node(old_node) - if topic in self.tcp_server.publishers: - self.tcp_server.publishers[topic].unregister() + new_publisher = RosPublisher(topic, message_class, queue_size=queue_size, latch=latch) - self.tcp_server.publishers[topic] = RosPublisher(topic, message_class, queue_size, latch) + self.tcp_server.publishers_table[topic] = new_publisher + + self.tcp_server.loginfo("RegisterPublisher({}, {}) OK".format(topic, message_class)) def ros_service(self, topic, message_name): if topic == "": @@ -175,8 +194,7 @@ def ros_service(self, topic, message_name): ) ) return - - message_class = resolve_message_name(message_name, "srv") + message_class = self.resolve_message_name(message_name, "srv") if message_class is None: self.tcp_server.send_unity_error( "RegisterRosService({}, {}) - Unknown service class '{}'".format( @@ -185,12 +203,15 @@ def ros_service(self, topic, message_name): ) return - rospy.loginfo("RegisterRosService({}, {}) OK".format(topic, message_class)) + old_node = self.tcp_server.ros_services_table.get(topic) + if old_node is not None: + self.tcp_server.unregister_node(old_node) - if topic in self.tcp_server.ros_services: - self.tcp_server.ros_services[topic].unregister() + new_service = RosService(topic, message_class) - self.tcp_server.ros_services[topic] = RosService(topic, message_class) + self.tcp_server.ros_services_table[topic] = new_service + + self.tcp_server.loginfo("RegisterRosService({}, {}) OK".format(topic, message_class)) def unity_service(self, topic, message_name): if topic == "": @@ -201,7 +222,7 @@ def unity_service(self, topic, message_name): ) return - message_class = resolve_message_name(message_name, "srv") + message_class = self.resolve_message_name(message_name, "srv") if message_class is None: self.tcp_server.send_unity_error( "RegisterUnityService({}, {}) - Unknown service class '{}'".format( @@ -210,12 +231,15 @@ def unity_service(self, topic, message_name): ) return - rospy.loginfo("RegisterUnityService({}, {}) OK".format(topic, message_class)) + old_node = self.tcp_server.unity_services_table.get(topic) + if old_node is not None: + self.tcp_server.unregister_node(old_node) + + new_service = UnityService(str(topic), message_class, self.tcp_server) - if topic in self.tcp_server.unity_services: - self.tcp_server.unity_services[topic].unregister() + self.tcp_server.unity_services_table[topic] = new_service - self.tcp_server.unity_services[topic] = UnityService(topic, message_class, self.tcp_server) + self.tcp_server.loginfo("RegisterUnityService({}, {}) OK".format(topic, message_class)) def response(self, srv_id): # the next message is a service response self.tcp_server.pending_srv_id = srv_id @@ -228,25 +252,26 @@ def request(self, srv_id): # the next message is a service request def topic_list(self): self.tcp_server.unity_tcp_sender.send_topic_list() - -def resolve_message_name(name, extension="msg"): - try: - names = name.split("/") - module_name = names[0] - class_name = names[1] - importlib.import_module(module_name + "." + extension) - module = sys.modules[module_name] - if module is None: - rospy.logerr("Failed to resolve module {}".format(module_name)) - module = getattr(module, extension) - if module is None: - rospy.logerr("Failed to resolve module {}.{}".format(module_name, extension)) - module = getattr(module, class_name) - if module is None: - rospy.logerr( - "Failed to resolve module {}.{}.{}".format(module_name, extension, class_name) - ) - return module - except (IndexError, KeyError, AttributeError, ImportError) as e: - rospy.logerr("Failed to resolve message name: {}".format(e)) - return None + def resolve_message_name(self, name, extension="msg"): + try: + names = name.split("/") + module_name = names[0] + class_name = names[1] + importlib.import_module(module_name + "." + extension) + module = sys.modules[module_name] + if module is None: + self.tcp_server.logerr("Failed to resolve module {}".format(module_name)) + module = getattr(module, extension) + if module is None: + self.tcp_server.logerr( + "Failed to resolve module {}.{}".format(module_name, extension) + ) + module = getattr(module, class_name) + if module is None: + self.tcp_server.logerr( + "Failed to resolve module {}.{}.{}".format(module_name, extension, class_name) + ) + return module + except (IndexError, KeyError, AttributeError, ImportError) as e: + self.tcp_server.logerr("Failed to resolve message name: {}".format(e)) + return None diff --git a/src/ros_tcp_endpoint/service.py b/src/ros_tcp_endpoint/service.py index ec8f213..185a6ba 100644 --- a/src/ros_tcp_endpoint/service.py +++ b/src/ros_tcp_endpoint/service.py @@ -14,6 +14,7 @@ import rospy from rospy.service import ServiceException +import re from .communication import RosSender @@ -29,7 +30,10 @@ def __init__(self, service, service_class): service: The service name in ROS service_class: The service class in catkin workspace """ - RosSender.__init__(self) + strippedService = re.sub("[^A-Za-z0-9_]+", "", service) + node_name = "{}_RosService".format(strippedService) + RosSender.__init__(self, node_name) + self.srv_class = service_class._request_class() self.srv = rospy.ServiceProxy(service, service_class) diff --git a/src/ros_tcp_endpoint/subscriber.py b/src/ros_tcp_endpoint/subscriber.py index 803fb61..f4c42b4 100644 --- a/src/ros_tcp_endpoint/subscriber.py +++ b/src/ros_tcp_endpoint/subscriber.py @@ -14,6 +14,7 @@ import rospy import socket +import re from .communication import RosReceiver from .client import ClientThread @@ -32,9 +33,10 @@ def __init__(self, topic, message_class, tcp_server, queue_size=10): message_class: The message class in catkin workspace queue_size: Max number of entries to maintain in an outgoing queue """ - RosReceiver.__init__(self) + strippedTopic = re.sub("[^A-Za-z0-9_]+", "", topic) + self.node_name = "{}_RosSubscriber".format(strippedTopic) + RosReceiver.__init__(self, self.node_name) self.topic = topic - self.node_name = "{}_subscriber".format(topic) self.msg = message_class self.tcp_server = tcp_server self.queue_size = queue_size @@ -52,7 +54,6 @@ def send(self, data): self.msg: The deserialize message """ - self.tcp_server.send_unity_message(self.topic, data) return self.msg diff --git a/src/ros_tcp_endpoint/tcp_sender.py b/src/ros_tcp_endpoint/tcp_sender.py index c1d954b..ea57baf 100644 --- a/src/ros_tcp_endpoint/tcp_sender.py +++ b/src/ros_tcp_endpoint/tcp_sender.py @@ -17,6 +17,8 @@ import time import threading import struct +import json + from .client import ClientThread from .thread_pauser import ThreadPauser from io import BytesIO @@ -35,10 +37,10 @@ class UnityTcpSender: Sends messages to Unity. """ - def __init__(self): - # if we have a valid IP at this point, it was overridden locally so always use that + def __init__(self, tcp_server): self.sender_id = 1 self.time_between_halt_checks = 5 + self.tcp_server = tcp_server # Each sender thread has its own queue: this is always the queue for the currently active thread. self.queue = None @@ -74,9 +76,9 @@ def send_ros_service_response(self, srv_id, destination, response): if self.queue is not None: command = SysCommand_Service() command.srv_id = srv_id - serialized_bytes = ClientThread.serialize_command("__response", command) - self.queue.put(serialized_bytes) - self.send_unity_message(destination, response) + serialized_header = ClientThread.serialize_command("__response", command) + serialized_message = ClientThread.serialize_message(destination, response) + self.queue.put(b"".join([serialized_header, serialized_message])) def send_unity_message(self, topic, message): if self.queue is not None: @@ -95,9 +97,9 @@ def send_unity_service_request(self, topic, service_class, request): command = SysCommand_Service() command.srv_id = srv_id - serialized_bytes = ClientThread.serialize_command("__request", command) - self.queue.put(serialized_bytes) - self.send_unity_message(topic, request) + serialized_header = ClientThread.serialize_command("__request", command) + serialized_message = ClientThread.serialize_message(topic, request) + self.queue.put(b"".join([serialized_header, serialized_message])) # rospy starts a new thread for each service request, # so it won't break anything if we sleep now while waiting for the response @@ -114,6 +116,18 @@ def send_unity_service_response(self, srv_id, data): thread_pauser.resume_with_result(data) + def get_registered_topic(self, topic): + if topic in self.tcp_server.publishers_table: + return self.tcp_server.publishers_table[topic] + elif topic in self.tcp_server.subscribers_table: + return self.tcp_server.subscribers_table[topic] + elif topic in self.tcp_server.ros_services_table: + return self.tcp_server.ros_services_table[topic] + elif topic in self.tcp_server.unity_services_table: + return self.tcp_server.unity_services_table[topic] + else: + return None + def send_topic_list(self): if self.queue is not None: topic_list = SysCommand_TopicsResponse() @@ -136,9 +150,12 @@ def start_sender(self, conn, halt_event): def sender_loop(self, conn, tid, halt_event): s = None local_queue = Queue() - # send an empty message to confirm connection - # minimal message: 4 zero bytes for topic length 0, 4 zero bytes for payload length 0 - local_queue.put(b"\0\0\0\0\0\0\0\0") + + # send a handshake message to confirm the connection and version number + handshake_metadata = SysCommand_Handshake_Metadata() + handshake = SysCommand_Handshake(handshake_metadata) + local_queue.put(ClientThread.serialize_command("__handshake", handshake)) + with self.queue_lock: self.queue = local_queue @@ -156,7 +173,7 @@ def sender_loop(self, conn, tid, halt_event): try: conn.sendall(item) except Exception as e: - rospy.logerr("Exception on Send {}".format(e)) + self.tcp_server.logerr("Exception {}".format(e)) break finally: halt_event.set() @@ -164,15 +181,40 @@ def sender_loop(self, conn, tid, halt_event): if self.queue is local_queue: self.queue = None + def parse_message_name(self, name): + try: + # Example input string: + names = (str(type(name))).split(".") + module_name = names[0][8:] + class_name = names[-1].split("_")[-1][:-2] + return "{}/{}".format(module_name, class_name) + except (IndexError, AttributeError, ImportError) as e: + self.tcp_server.logerr("Failed to resolve message name: {}".format(e)) + return None + class SysCommand_Log: - text = "" + def __init__(self): + self.text = "" class SysCommand_Service: - srv_id = 0 + def __init__(self): + self.srv_id = 0 class SysCommand_TopicsResponse: - topics = [] - types = [] + def __init__(self): + self.topics = [] + self.types = [] + + +class SysCommand_Handshake: + def __init__(self, metadata): + self.version = "v0.7.0" + self.metadata = json.dumps(metadata.__dict__) + + +class SysCommand_Handshake_Metadata: + def __init__(self): + self.protocol = "ROS1" diff --git a/src/ros_tcp_endpoint/unity_service.py b/src/ros_tcp_endpoint/unity_service.py index 2a6cc34..da54c0e 100644 --- a/src/ros_tcp_endpoint/unity_service.py +++ b/src/ros_tcp_endpoint/unity_service.py @@ -14,6 +14,7 @@ import rospy import socket +import re from .communication import RosReceiver from .client import ClientThread @@ -32,16 +33,17 @@ def __init__(self, topic, service_class, tcp_server, queue_size=10): service_class: The message class in catkin workspace queue_size: Max number of entries to maintain in an outgoing queue """ + strippedTopic = re.sub("[^A-Za-z0-9_]+", "", topic) + self.node_name = "{}_service".format(strippedTopic) + self.topic = topic - self.node_name = "{}_service".format(topic) self.service_class = service_class self.tcp_server = tcp_server self.queue_size = queue_size - # Start Subscriber listener function self.service = rospy.Service(self.topic, self.service_class, self.send) - def send(self, data): + def send(self, request): """ Connect to TCP endpoint on client, pass along message and get reply Args: @@ -50,7 +52,7 @@ def send(self, data): Returns: The response message """ - return self.tcp_server.send_unity_service(self.topic, self.service_class, data) + return self.tcp_server.send_unity_service(self.topic, self.service_class, request) def unregister(self): """ diff --git a/test/test_client.py b/test/test_client.py index 30b847d..23a3c6c 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -44,6 +44,8 @@ def test_read_string_should_decode(mock_recvall, mock_read_int): @mock.patch.object(ClientThread, "read_int32", return_value=4) @mock.patch.object(ClientThread, "recvall", return_value=b"Hello world!") def test_read_message_return_destination_data(mock_recvall, mock_read_int, mock_read_str): + tcp_server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) mock_conn = mock.Mock() - result = ClientThread.read_message(mock_conn) + client_thread = ClientThread(mock_conn, tcp_server, "127.0.0.1", 10000) + result = client_thread.read_message(mock_conn) assert result == ("__srv", b"Hello world!") diff --git a/test/test_server.py b/test/test_server.py index e57cbc1..d86ce78 100644 --- a/test/test_server.py +++ b/test/test_server.py @@ -1,7 +1,6 @@ from unittest import mock from ros_tcp_endpoint import TcpServer from ros_tcp_endpoint.server import SysCommands -from ros_tcp_endpoint.server import resolve_message_name import importlib import rospy import sys @@ -16,14 +15,14 @@ def test_server_constructor(mock_ros, mock_socket): assert server.node_name == "test-tcp-server" assert server.tcp_ip == "127.0.0.1" assert server.buffer_size == 1024 - assert server.connections == 2 + assert server.connections == 10 def test_start_server(): server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) assert server.tcp_ip == "127.0.0.1" assert server.tcp_port == 10000 - assert server.connections == 2 + assert server.connections == 10 server.start() @@ -43,13 +42,11 @@ def test_unity_service_resolve_message_name_failure(): @mock.patch.object(rospy, "Service") @mock.patch.object( - ros_tcp_endpoint.server, - "resolve_message_name", - return_value="unity_interfaces.msg/RosUnitySrvMessage", + SysCommands, "resolve_message_name", return_value="unity_interfaces.msg/RosUnitySrvMessage" ) def test_unity_service_resolve_news_service(mock_resolve_message, mock_ros_service): server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) - assert server.ros_services == {} + assert server.ros_services_table == {} system_cmds = SysCommands(server) result = system_cmds.unity_service("get_pos", "unity_interfaces.msg/RosUnitySrvMessage") mock_ros_service.assert_called_once @@ -58,9 +55,7 @@ def test_unity_service_resolve_news_service(mock_resolve_message, mock_ros_servi @mock.patch.object(rospy, "Service") @mock.patch.object( - ros_tcp_endpoint.server, - "resolve_message_name", - return_value="unity_interfaces.msg/RosUnitySrvMessage", + SysCommands, "resolve_message_name", return_value="unity_interfaces.msg/RosUnitySrvMessage" ) def test_unity_service_resolve_existing_service(mock_resolve_message, mock_ros_service): server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) @@ -74,8 +69,9 @@ def test_unity_service_resolve_existing_service(mock_resolve_message, mock_ros_s @mock.patch.object(sys, "modules", return_value="unity_interfaces.msg") @mock.patch.object(importlib, "import_module") def test_resolve_message_name(mock_import_module, mock_sys_modules): + server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) msg_name = "unity_interfaces.msg/UnityColor.msg" - result = resolve_message_name(msg_name) + result = SysCommands(server).resolve_message_name(msg_name) mock_import_module.assert_called_once mock_sys_modules.assert_called_once assert result is not None @@ -85,16 +81,16 @@ def test_resolve_message_name(mock_import_module, mock_sys_modules): def test_publish_add_new_topic(mock_ros_publisher): server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) result = SysCommands(server).publish("object_pos_topic", "std_msgs/Bool") - assert server.publishers != {} + assert server.publishers_table != {} mock_ros_publisher.assert_called_once @mock.patch.object(rospy, "Publisher") def test_publish_existing_topic(mock_ros_publisher): server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) - server.publishers = {"object_pos_topic": mock.Mock()} + server.publishers_table = {"object_pos_topic": mock.Mock()} result = SysCommands(server).publish("object_pos_topic", "std_msgs/Bool") - assert server.publishers["object_pos_topic"] is not None + assert server.publishers_table["object_pos_topic"] is not None mock_ros_publisher.assert_called_once @@ -102,36 +98,32 @@ def test_publish_empty_topic_should_return_none(): server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) result = SysCommands(server).publish("", "pos") assert result is None - assert server.publishers == {} + assert server.publishers_table == {} def test_publish_empty_message_should_return_none(): server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) result = SysCommands(server).publish("test-topic", "") assert result is None - assert server.publishers == {} + assert server.publishers_table == {} @mock.patch.object(rospy, "Subscriber") -@mock.patch.object( - ros_tcp_endpoint.server, "resolve_message_name", return_value="unity_interfaces.msg/Pos" -) +@mock.patch.object(SysCommands, "resolve_message_name", return_value="unity_interfaces.msg/Pos") def test_subscribe_to_new_topic(mock_resolve_msg, mock_ros_subscriber): server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) result = SysCommands(server).subscribe("object_pos_topic", "pos") - assert server.subscribers != {} + assert server.subscribers_table != {} mock_ros_subscriber.assert_called_once @mock.patch.object(rospy, "Subscriber") -@mock.patch.object( - ros_tcp_endpoint.server, "resolve_message_name", return_value="unity_interfaces.msg/Pos" -) +@mock.patch.object(SysCommands, "resolve_message_name", return_value="unity_interfaces.msg/Pos") def test_subscribe_to_existing_topic(mock_resolve_msg, mock_ros_subscriber): server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) - server.subscribers = {"object_pos_topic": mock.Mock()} + server.subscribers_table = {"object_pos_topic": mock.Mock()} result = SysCommands(server).subscribe("object_pos_topic", "pos") - assert server.subscribers["object_pos_topic"] is not None + assert server.subscribers_table["object_pos_topic"] is not None mock_ros_subscriber.assert_called_once @@ -139,32 +131,32 @@ def test_subscribe_to_empty_topic_should_return_none(): server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) result = SysCommands(server).subscribe("", "pos") assert result is None - assert server.subscribers == {} + assert server.subscribers_table == {} def test_subscribe_to_empty_message_should_return_none(): server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) result = SysCommands(server).subscribe("test-topic", "") assert result is None - assert server.subscribers == {} + assert server.subscribers_table == {} @mock.patch.object(rospy, "ServiceProxy") -@mock.patch.object(ros_tcp_endpoint.server, "resolve_message_name") +@mock.patch.object(SysCommands, "resolve_message_name") def test_ros_service_new_topic(mock_resolve_msg, mock_ros_service): server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) result = SysCommands(server).ros_service("object_pos_topic", "pos") - assert server.ros_services != {} + assert server.ros_services_table != {} mock_ros_service.assert_called_once @mock.patch.object(rospy, "ServiceProxy") -@mock.patch.object(ros_tcp_endpoint.server, "resolve_message_name") +@mock.patch.object(SysCommands, "resolve_message_name") def test_ros_service_existing_topic(mock_resolve_msg, mock_ros_service): server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) - server.ros_services = {"object_pos_topic": mock.Mock()} + server.ros_services_table = {"object_pos_topic": mock.Mock()} result = SysCommands(server).ros_service("object_pos_topic", "pos") - assert server.ros_services["object_pos_topic"] is not None + assert server.ros_services_table["object_pos_topic"] is not None mock_ros_service.assert_called_once @@ -172,11 +164,11 @@ def test_ros_service_empty_topic_should_return_none(): server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) result = SysCommands(server).ros_service("", "pos") assert result is None - assert server.ros_services == {} + assert server.ros_services_table == {} def test_ros_service_empty_message_should_return_none(): server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) result = SysCommands(server).ros_service("test-topic", "") assert result is None - assert server.ros_services == {} + assert server.ros_services_table == {} diff --git a/test/test_subscriber.py b/test/test_subscriber.py index 18822b9..2a7f13e 100644 --- a/test/test_subscriber.py +++ b/test/test_subscriber.py @@ -7,7 +7,7 @@ def test_subscriber_send(mock_ros): mock_tcp_server = mock.Mock() subscriber = RosSubscriber("color", mock.Mock(), mock_tcp_server) - assert subscriber.node_name == "color_subscriber" + assert subscriber.node_name == "color_RosSubscriber" subscriber.send("test data") mock_tcp_server.send_unity_message.assert_called_once() @@ -16,6 +16,6 @@ def test_subscriber_send(mock_ros): def test_subscriber_unregister(mock_ros): mock_tcp_server = mock.Mock() subscriber = RosSubscriber("color", mock.Mock(), mock_tcp_server) - assert subscriber.node_name == "color_subscriber" + assert subscriber.node_name == "color_RosSubscriber" subscriber.unregister() subscriber.sub.unregister.assert_called_once() diff --git a/test/test_tcp_sender.py b/test/test_tcp_sender.py index 6e2f7ef..93efd13 100644 --- a/test/test_tcp_sender.py +++ b/test/test_tcp_sender.py @@ -1,12 +1,14 @@ import queue import socket from unittest import mock +from ros_tcp_endpoint import TcpServer import ros_tcp_endpoint @mock.patch("ros_tcp_endpoint.tcp_sender.rospy") def test_tcp_sender_constructor(mock_ros): - tcp_sender = ros_tcp_endpoint.tcp_sender.UnityTcpSender() + server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) + tcp_sender = ros_tcp_endpoint.tcp_sender.UnityTcpSender(server) assert tcp_sender.sender_id == 1 assert tcp_sender.time_between_halt_checks == 5 assert tcp_sender.queue == None @@ -26,7 +28,8 @@ def test_tcp_sender_constructor(mock_ros): @mock.patch.object(ros_tcp_endpoint.client.ClientThread, "serialize_message") def test_send_message_should_serialize_message(mock_serialize_msg): - sender = ros_tcp_endpoint.tcp_sender.UnityTcpSender() + server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) + sender = ros_tcp_endpoint.tcp_sender.UnityTcpSender(server) sender.queue = queue.Queue() sender.send_unity_message("test topic", "test msg") mock_serialize_msg.assert_called_once() @@ -44,7 +47,8 @@ def test_send_message_should_serialize_message(mock_serialize_msg): @mock.patch("ros_tcp_endpoint.thread_pauser.ThreadPauser") def test_send_unity_service_response_should_resume(mock_thread_pauser_class): - sender = ros_tcp_endpoint.tcp_sender.UnityTcpSender() + server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) + sender = ros_tcp_endpoint.tcp_sender.UnityTcpSender(server) thread_pauser = mock_thread_pauser_class() sender.services_waiting = {"moveit": thread_pauser} sender.send_unity_service_response("moveit", "test data") @@ -52,7 +56,8 @@ def test_send_unity_service_response_should_resume(mock_thread_pauser_class): def test_start_sender_should_increase_sender_id(): - sender = ros_tcp_endpoint.tcp_sender.UnityTcpSender() + server = TcpServer(node_name="test-tcp-server", tcp_ip="127.0.0.1", tcp_port=10000) + sender = ros_tcp_endpoint.tcp_sender.UnityTcpSender(server) init_sender_id = 1 assert sender.sender_id == init_sender_id sender.start_sender(mock.Mock(), mock.Mock())