diff --git a/ros2cli/ros2cli/helpers.py b/ros2cli/ros2cli/helpers.py
index 2dd83400a..3cfd1a44e 100644
--- a/ros2cli/ros2cli/helpers.py
+++ b/ros2cli/ros2cli/helpers.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from argparse import ArgumentTypeError
 import functools
 import inspect
 import os
@@ -96,3 +97,13 @@ def wrapper(*args, **kwargs):
             return func(*args, **kwargs)
     wrapper.__signature__ = inspect.signature(func)
     return wrapper
+
+
+def unsigned_int(string):
+    try:
+        value = int(string)
+    except ValueError:
+        value = -1
+    if value < 0:
+        raise ArgumentTypeError('value must be non-negative integer')
+    return value
diff --git a/ros2service/ros2service/api/__init__.py b/ros2service/ros2service/api/__init__.py
index 9143ca163..e5bd70ee4 100644
--- a/ros2service/ros2service/api/__init__.py
+++ b/ros2service/ros2service/api/__init__.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from rclpy.node import Node
 from rclpy.topic_or_service_is_hidden import topic_or_service_is_hidden
 from ros2cli.node.strategy import NodeStrategy
 from rosidl_runtime_py import get_service_interfaces
@@ -34,6 +35,60 @@ def get_service_names(*, node, include_hidden_services=False):
     return [n for (n, t) in service_names_and_types]
 
 
+def get_service_class(node: Node, service_name: str, include_hidden_services: bool):
+    """
+    Load service type module for the given service.
+
+    The service should be running for this function to find the service type.
+    :param node: The node object of rclpy Node class.
+    :param service_name: The fully-qualified name of the service.
+    :param include_hidden_services: Whether to include hidden services while finding the
+    list of currently running services.
+    :return: the service class or None
+    """
+    service_names_and_types = get_service_names_and_types(
+        node=node,
+        include_hidden_services=include_hidden_services)
+
+    # get_service_names_and_types() returns a list of lists, like the following:
+    #  [
+    #    ['/service1', ['service/srv/Type1]],
+    #    ['/service2', ['service/srv/Type2]],
+    #  ]
+    #
+    # If there are more than one server for a service with the same type, that is only represented
+    # once.  If there are more than one server for a service name with different types, those are
+    # represented like:
+    #
+    # [
+    #  ['/service1', ['service/srv/Type1', 'service/srv/Type2']],
+    # ]
+    matched_names_and_types = list(filter(lambda x: x[0] == service_name, service_names_and_types))
+    if len(matched_names_and_types) < 1:
+        raise RuntimeError(f"Cannot find type for '{service_name}'")
+    if len(matched_names_and_types) > 1:
+        raise RuntimeError(f"Unexpectedly saw more than one entry for service '{service_name}'")
+
+    # Now check whether there are multiple types associated with this service, which is unsupported
+    service_name_and_types = matched_names_and_types[0]
+
+    types = service_name_and_types[1]
+    if len(types) < 1:
+        raise RuntimeError(f"No types associated with '{service_name}'")
+    if len(types) > 1:
+        raise RuntimeError(f"More than one type associated with service '{service_name}'")
+
+    service_type = types[0]
+
+    if service_type is None:
+        return None
+
+    try:
+        return get_service(service_type)
+    except (AttributeError, ModuleNotFoundError, ValueError):
+        raise RuntimeError(f"The service type '{service_type}' is invalid")
+
+
 def service_type_completer(**kwargs):
     """Callable returning a list of service types."""
     service_types = []
diff --git a/ros2service/ros2service/verb/echo.py b/ros2service/ros2service/verb/echo.py
new file mode 100644
index 000000000..e484bd7a8
--- /dev/null
+++ b/ros2service/ros2service/verb/echo.py
@@ -0,0 +1,162 @@
+# Copyright 2022 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from collections import OrderedDict
+import sys
+from typing import TypeVar
+
+import rclpy
+
+from rclpy.qos import QoSPresetProfiles
+from ros2cli.helpers import unsigned_int
+from ros2cli.node.strategy import NodeStrategy
+from ros2service.api import get_service_class
+from ros2service.api import ServiceNameCompleter
+from ros2service.api import ServiceTypeCompleter
+from ros2service.verb import VerbExtension
+from rosidl_runtime_py import message_to_csv
+from rosidl_runtime_py import message_to_ordereddict
+from rosidl_runtime_py.utilities import get_service
+from service_msgs.msg import ServiceEventInfo
+
+import yaml
+
+
+DEFAULT_TRUNCATE_LENGTH = 128
+MsgType = TypeVar('MsgType')
+
+
+class EchoVerb(VerbExtension):
+    """Echo a service."""
+
+    # Custom representer for getting clean YAML output that preserves the order in an OrderedDict.
+    # Inspired by: http://stackoverflow.com/a/16782282/7169408
+    def __represent_ordereddict(self, dumper, data):
+        items = []
+        for k, v in data.items():
+            items.append((dumper.represent_data(k), dumper.represent_data(v)))
+        return yaml.nodes.MappingNode(u'tag:yaml.org,2002:map', items)
+
+    def __init__(self):
+        self._event_number_to_name = {}
+        for k, v in ServiceEventInfo._Metaclass_ServiceEventInfo__constants.items():
+            self._event_number_to_name[v] = k
+
+        yaml.add_representer(OrderedDict, self.__represent_ordereddict)
+
+    def add_arguments(self, parser, cli_name):
+        arg = parser.add_argument(
+            'service_name',
+            help="Name of the ROS service to echo (e.g. '/add_two_ints')")
+        arg.completer = ServiceNameCompleter(
+            include_hidden_services_key='include_hidden_services')
+        arg = parser.add_argument(
+            'service_type', nargs='?',
+            help="Type of the ROS service (e.g. 'example_interfaces/srv/AddTwoInts')")
+        arg.completer = ServiceTypeCompleter(service_name_key='service_name')
+        parser.add_argument(
+            '--csv', action='store_true', default=False,
+            help=(
+                'Output all recursive fields separated by commas (e.g. for plotting).'
+            ))
+        parser.add_argument(
+            '--full-length', '-f', action='store_true',
+            help='Output all elements for arrays, bytes, and string with a '
+                 "length > '--truncate-length', by default they are truncated "
+                 "after '--truncate-length' elements with '...''")
+        parser.add_argument(
+            '--truncate-length', '-l', type=unsigned_int, default=DEFAULT_TRUNCATE_LENGTH,
+            help='The length to truncate arrays, bytes, and string to '
+                 '(default: %d)' % DEFAULT_TRUNCATE_LENGTH)
+        parser.add_argument(
+            '--no-arr', action='store_true', help="Don't print array fields of messages")
+        parser.add_argument(
+            '--no-str', action='store_true', help="Don't print string fields of messages")
+        parser.add_argument(
+            '--flow-style', action='store_true',
+            help='Print collections in the block style (not available with csv format)')
+
+    def main(self, *, args):
+        if args.service_type is None:
+            with NodeStrategy(args) as node:
+                try:
+                    srv_module = get_service_class(
+                        node, args.service_name, include_hidden_services=True)
+                except (AttributeError, ModuleNotFoundError, ValueError):
+                    raise RuntimeError(f"The service name '{args.service_name}' is invalid")
+        else:
+            try:
+                srv_module = get_service(args.service_type)
+            except (AttributeError, ModuleNotFoundError, ValueError):
+                raise RuntimeError(f"The service type '{args.service_type}' is invalid")
+
+        if srv_module is None:
+            raise RuntimeError('Could not load the type for the passed service')
+
+        event_msg_type = srv_module.Event
+
+        # TODO(clalancette): We should probably expose this postfix from rclpy
+        event_topic_name = args.service_name + '/_service_event'
+
+        self.csv = args.csv
+        self.truncate_length = args.truncate_length if not args.full_length else None
+        self.flow_style = args.flow_style
+        self.no_arr = args.no_arr
+        self.no_str = args.no_str
+
+        with NodeStrategy(args) as node:
+            sub = node.create_subscription(
+                event_msg_type,
+                event_topic_name,
+                self._subscriber_callback,
+                QoSPresetProfiles.get_from_short_key('services_default'))
+
+            have_printed_warning = False
+            executor = rclpy.get_global_executor()
+            try:
+                executor.add_node(node)
+                while executor.context.ok():
+                    if not have_printed_warning and sub.get_publisher_count() < 1:
+                        print(f"No publishers on topic '{event_topic_name}'; "
+                              'is service introspection on the client or server enabled?')
+                        have_printed_warning = True
+                    executor.spin_once()
+            finally:
+                executor.remove_node(node)
+
+            sub.destroy()
+
+    def _subscriber_callback(self, msg):
+        if self.csv:
+            to_print = message_to_csv(msg, truncate_length=self.truncate_length,
+                                      no_arr=self.no_arr, no_str=self.no_str)
+        else:
+            # The "easy" way to print out a representation here is to call message_to_yaml().
+            # However, the message contains numbers for the event type, but we want to show
+            # meaningful names to the user.  So we call message_to_ordereddict() instead,
+            # and replace the numbers with meaningful names before dumping to YAML.
+            msgdict = message_to_ordereddict(msg, truncate_length=self.truncate_length,
+                                             no_arr=self.no_arr, no_str=self.no_str)
+
+            if 'info' in msgdict:
+                info = msgdict['info']
+                if 'event_type' in info:
+                    info['event_type'] = self._event_number_to_name[info['event_type']]
+
+            to_print = yaml.dump(msgdict, allow_unicode=True, width=sys.maxsize,
+                                 default_flow_style=self.flow_style)
+
+            to_print += '---'
+
+        print(to_print)
diff --git a/ros2service/setup.py b/ros2service/setup.py
index 1a432c537..0e8431a7c 100644
--- a/ros2service/setup.py
+++ b/ros2service/setup.py
@@ -41,6 +41,7 @@
         ],
         'ros2service.verb': [
             'call = ros2service.verb.call:CallVerb',
+            'echo = ros2service.verb.echo:EchoVerb',
             'find = ros2service.verb.find:FindVerb',
             'list = ros2service.verb.list:ListVerb',
             'type = ros2service.verb.type:TypeVerb',
diff --git a/ros2service/test/fixtures/introspectable.py b/ros2service/test/fixtures/introspectable.py
new file mode 100644
index 000000000..70c00c627
--- /dev/null
+++ b/ros2service/test/fixtures/introspectable.py
@@ -0,0 +1,93 @@
+# Copyright 2023 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import rclpy
+from rclpy.executors import ExternalShutdownException
+from rclpy.executors import SingleThreadedExecutor
+from rclpy.node import Node
+from rclpy.qos import qos_profile_system_default
+from rclpy.service_introspection import ServiceIntrospectionState
+
+from test_msgs.srv import BasicTypes
+
+
+class IntrospectableService(Node):
+
+    def __init__(self):
+        super().__init__('introspectable_service')
+        self.service = self.create_service(BasicTypes, 'test_introspectable', self.callback)
+        self.service.configure_introspection(
+            self.get_clock(), qos_profile_system_default, ServiceIntrospectionState.CONTENTS)
+
+    def callback(self, request, response):
+        for field_name in request.get_fields_and_field_types():
+            setattr(response, field_name, getattr(request, field_name))
+        return response
+
+
+class IntrospectableClient(Node):
+
+    def __init__(self):
+        super().__init__('introspectable_client')
+        self.client = self.create_client(BasicTypes, 'test_introspectable')
+        self.client.configure_introspection(
+            self.get_clock(), qos_profile_system_default, ServiceIntrospectionState.CONTENTS)
+
+        self.timer = self.create_timer(0.1, self.timer_callback)
+        self.future = None
+
+    def timer_callback(self):
+        if not self.client.service_is_ready():
+            return
+
+        if self.future is None:
+            request = BasicTypes.Request()
+            request.bool_value = True
+            request.int32_value = 42
+            request.string_value = 'test_string_value'
+            self.future = self.client.call_async(request)
+            return
+
+        if not self.future.done():
+            return
+
+        if self.future.result() is None:
+            self.get_logger().error(f'Exception calling service: {self.future.exception()!r}')
+
+        self.future = None
+
+
+def main(args=None):
+    rclpy.init(args=args)
+
+    service_node = IntrospectableService()
+    client_node = IntrospectableClient()
+
+    executor = SingleThreadedExecutor()
+    executor.add_node(service_node)
+    executor.add_node(client_node)
+
+    try:
+        executor.spin()
+    except (KeyboardInterrupt, ExternalShutdownException):
+        executor.remove_node(client_node)
+        executor.remove_node(service_node)
+        executor.shutdown()
+        service_node.destroy_node()
+        client_node.destroy_node()
+        rclpy.try_shutdown()
+
+
+if __name__ == '__main__':
+    main()
diff --git a/ros2service/test/test_echo.py b/ros2service/test/test_echo.py
new file mode 100644
index 000000000..4fca43740
--- /dev/null
+++ b/ros2service/test/test_echo.py
@@ -0,0 +1,167 @@
+# Copyright 2023 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import contextlib
+import functools
+import os
+import re
+import sys
+import unittest
+
+from launch import LaunchDescription
+from launch.actions import ExecuteProcess
+from launch_ros.actions import Node
+
+import launch_testing
+import launch_testing.actions
+import launch_testing.asserts
+import launch_testing.markers
+import launch_testing.tools
+import launch_testing_ros.tools
+
+import pytest
+
+from rclpy.utilities import get_available_rmw_implementations
+
+
+# Skip cli tests on Windows while they exhibit pathological behavior
+# https://github.com/ros2/build_farmer/issues/248
+if sys.platform.startswith('win'):
+    pytest.skip(
+            'CLI tests can block for a pathological amount of time on Windows.',
+            allow_module_level=True)
+
+
+@pytest.mark.rostest
+@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations())
+def generate_test_description(rmw_implementation):
+    path_to_introspectable_script = os.path.join(
+        os.path.dirname(__file__), 'fixtures', 'introspectable.py'
+    )
+    additional_env = {'RMW_IMPLEMENTATION': rmw_implementation}
+    return LaunchDescription([
+        # Always restart daemon to isolate tests.
+        ExecuteProcess(
+            cmd=['ros2', 'daemon', 'stop'],
+            name='daemon-stop',
+            on_exit=[
+                ExecuteProcess(
+                    cmd=['ros2', 'daemon', 'start'],
+                    name='daemon-start',
+                    on_exit=[
+                        # Add test fixture actions.
+                        Node(
+                            executable=sys.executable,
+                            arguments=[path_to_introspectable_script],
+                            name='introspectable_service',
+                            additional_env=additional_env,
+                        ),
+                        launch_testing.actions.ReadyToTest()
+                    ],
+                    additional_env=additional_env
+                )
+            ]
+        ),
+    ])
+
+
+class TestROS2ServiceEcho(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(
+        cls,
+        launch_service,
+        proc_info,
+        proc_output,
+        rmw_implementation
+    ):
+        @contextlib.contextmanager
+        def launch_service_command(self, arguments):
+            service_command_action = ExecuteProcess(
+                cmd=['ros2', 'service', *arguments],
+                additional_env={
+                    'RMW_IMPLEMENTATION': rmw_implementation,
+                    'PYTHONUNBUFFERED': '1'
+                },
+                name='ros2service-echo',
+                output='screen'
+            )
+            with launch_testing.tools.launch_process(
+                launch_service, service_command_action, proc_info, proc_output,
+                output_filter=launch_testing_ros.tools.basic_output_filter(
+                    filtered_prefixes=[
+                        'waiting for service to become available...',
+                        '/launch_ros'  # cope with launch_ros internal node.
+                    ],
+                    filtered_rmw_implementation=rmw_implementation
+                )
+            ) as service_command:
+                yield service_command
+        cls.launch_service_command = launch_service_command
+
+    @launch_testing.markers.retry_on_failure(times=5, delay=1)
+    def test_echo_no_arr(self):
+        echo_arguments = ['echo', '/test_introspectable', '--no-arr']
+        expected_output = [
+            'info:',
+            '  event_type: REQUEST_SENT',
+            '  stamp:',
+            re.compile(r'    sec: \d+'),
+            re.compile(r'    nanosec: \d+'),
+            "  client_gid: '<array type: uint8[16]>'",
+            re.compile(r'  sequence_number: \d+'),
+            "request: '<sequence type: test_msgs/srv/BasicTypes_Request[1], length: 1>'",
+            "response: '<sequence type: test_msgs/srv/BasicTypes_Response[1], length: 0>'",
+            '---',
+            'info:',
+            '  event_type: REQUEST_RECEIVED',
+            '  stamp:',
+            re.compile(r'    sec: \d+'),
+            re.compile(r'    nanosec: \d+'),
+            "  client_gid: '<array type: uint8[16]>'",
+            re.compile(r'  sequence_number: \d+'),
+            "request: '<sequence type: test_msgs/srv/BasicTypes_Request[1], length: 1>'",
+            "response: '<sequence type: test_msgs/srv/BasicTypes_Response[1], length: 0>'",
+            '---',
+            'info:',
+            '  event_type: RESPONSE_SENT',
+            '  stamp:',
+            re.compile(r'    sec: \d+'),
+            re.compile(r'    nanosec: \d+'),
+            "  client_gid: '<array type: uint8[16]>'",
+            re.compile(r'  sequence_number: \d+'),
+            "request: '<sequence type: test_msgs/srv/BasicTypes_Request[1], length: 0>'",
+            "response: '<sequence type: test_msgs/srv/BasicTypes_Response[1], length: 1>'",
+            '---',
+            'info:',
+            '  event_type: RESPONSE_RECEIVED',
+            '  stamp:',
+            re.compile(r'    sec: \d+'),
+            re.compile(r'    nanosec: \d+'),
+            "  client_gid: '<array type: uint8[16]>'",
+            re.compile(r'  sequence_number: \d+'),
+            "request: '<sequence type: test_msgs/srv/BasicTypes_Request[1], length: 0>'",
+            "response: '<sequence type: test_msgs/srv/BasicTypes_Response[1], length: 1>'",
+        ],
+
+        with self.launch_service_command(arguments=echo_arguments) as service_command:
+            assert service_command.wait_for_output(
+                functools.partial(
+                    launch_testing.tools.expect_output,
+                    expected_lines=expected_output,
+                    strict=True
+                ),
+                timeout=10,
+            )
+        assert service_command.wait_for_shutdown(timeout=10)
diff --git a/ros2topic/ros2topic/api/__init__.py b/ros2topic/ros2topic/api/__init__.py
index 265c85bc0..7b5b4f94a 100644
--- a/ros2topic/ros2topic/api/__init__.py
+++ b/ros2topic/ros2topic/api/__init__.py
@@ -29,16 +29,6 @@
 from rosidl_runtime_py.utilities import get_message
 
 
-def unsigned_int(string):
-    try:
-        value = int(string)
-    except ValueError:
-        value = -1
-    if value < 0:
-        raise ArgumentTypeError('value must be non-negative integer')
-    return value
-
-
 def positive_int(string):
     try:
         value = int(string)
diff --git a/ros2topic/ros2topic/verb/echo.py b/ros2topic/ros2topic/verb/echo.py
index 339fb0b91..7f669c17c 100644
--- a/ros2topic/ros2topic/verb/echo.py
+++ b/ros2topic/ros2topic/verb/echo.py
@@ -24,6 +24,7 @@
 from rclpy.qos import QoSProfile
 from rclpy.qos import QoSReliabilityPolicy
 from rclpy.task import Future
+from ros2cli.helpers import unsigned_int
 from ros2cli.node.strategy import add_arguments as add_strategy_node_arguments
 from ros2cli.node.strategy import NodeStrategy
 from ros2topic.api import add_qos_arguments
@@ -31,7 +32,6 @@
 from ros2topic.api import positive_float
 from ros2topic.api import qos_profile_from_short_keys
 from ros2topic.api import TopicNameCompleter
-from ros2topic.api import unsigned_int
 from ros2topic.verb import VerbExtension
 from rosidl_runtime_py import message_to_csv
 from rosidl_runtime_py import message_to_yaml