diff --git a/setup.cfg b/setup.cfg index 1311af8..46fe247 100644 --- a/setup.cfg +++ b/setup.cfg @@ -24,6 +24,7 @@ classifiers = [options] zip_safe = true install_requires = + bitstruct package_dir = = src packages = find: diff --git a/src/someip/header.py b/src/someip/header.py index 04e094b..3da2235 100644 --- a/src/someip/header.py +++ b/src/someip/header.py @@ -14,6 +14,8 @@ except ImportError: # pragma: nocover cached_property = property # type: ignore[misc,assignment] +import bitstruct + import someip.utils @@ -24,6 +26,8 @@ SD_SERVICE = 0xFFFF SD_METHOD = 0x8100 SD_INTERFACE_VERSION = 1 +TP_FLAG = 0x20 +MAX_PAYLOAD_SIZE = 1400 class ParseError(RuntimeError): @@ -38,11 +42,16 @@ class SOMEIPMessageType(enum.IntEnum): REQUEST = 0 REQUEST_NO_RETURN = 1 NOTIFICATION = 2 + TP_REQUEST = 0x20 + TP_REQUEST_NO_RETURN = 0x21 + TP_NOTIFICATION = 0x22 REQUEST_ACK = 0x40 REQUEST_NO_RETURN_ACK = 0x41 NOTIFICATION_ACK = 0x42 RESPONSE = 0x80 ERROR = 0x81 + TP_RESPONSE = 0xA0 + TP_ERROR = 0xA1 RESPONSE_ACK = 0xC0 ERROR_ACK = 0xC1 @@ -61,6 +70,20 @@ class SOMEIPReturnCode(enum.IntEnum): E_WRONG_MESSAGE_TYPE = 10 +class StructLikeBitstruct: + """ + A wrapper around `bitstruct.compile` that provides a similar interface to struct. + """ + + def __init__(self, fmt: str): + self._compiled = bitstruct.compile(fmt) + self.format = fmt + self.size = self._compiled.calcsize() // 8 + + def __getattr__(self, item): + return getattr(self._compiled, item) + + def _unpack(fmt, buf): if len(buf) < fmt.size: raise IncompleteReadError( @@ -210,6 +233,176 @@ def build(self) -> bytes: return hdr + self.payload +@dataclasses.dataclass(frozen=True) +class SOMEIPTPHeader(SOMEIPHeader): + """Represents a top-level SOME/IP TP packet (header and payload).""" + + __format: typing.ClassVar[struct.Struct] = struct.Struct("!HHIHHBBBB") + TP_STRUCT: typing.ClassVar[StructLikeBitstruct] = StructLikeBitstruct("u28p3b1") + + offset: int = 0 + more_segments: bool = False + + @property + def description(self): # pragma: nocover + return ( + f"service: 0x{self.service_id:04x}" + f"method: 0x{self.method_id:04x}" + f"client: 0x{self.client_id:04x}" + f"session: 0x{self.session_id:04x}" + f"protocol: {self.protocol_version}" + f"interface: 0x{self.interface_version:02x}" + f"message: {self.message_type.name}" + f"return code: {self.return_code.name}" + f"offset: {self.offset}" + f"more segments: {self.more_segments}" + f"payload: {len(self.payload)} bytes" + ) + + def __str__(self): # pragma: nocover + return ( + f"service=0x{self.service_id:04x}, " + f"method=0x{self.method_id:04x}, " + f"client=0x{self.client_id:04x}, " + f"session=0x{self.session_id:04x}, " + f"protocol={self.protocol_version}, " + f"interface=0x{self.interface_version:02x}, " + f"message={self.message_type.name}, " + f"return_code={self.return_code.name}, " + f"offset={self.offset}, " + f"more_segments={self.more_segments}, " + f"payload: {len(self.payload)}" + ) + + @classmethod + def _build_from_buffer( + cls, + buf: bytes, + size: int, + builder: typing.Callable[[bytes, int, bool], SOMEIPTPHeader], + ) -> typing.Tuple[SOMEIPTPHeader, bytes]: + """Helper function to build a SOMEIP packet from a buffer. + + :param buf: buffer in which the payload is located. + :param size: size of the SOME/IP packet. + :param builder: callable to build the header from the payload + :return: tuple of :class:`SOMEIPTPHeader` instance and unparsed rest of `buf`. + """ + tp_args, buf = _unpack(cls.TP_STRUCT, buf) + payload_len = size - (8 + cls.TP_STRUCT.size) + payload_b, buf = buf[: payload_len], buf[payload_len:] + header = builder(payload_b, *tp_args) + return header, buf + + @classmethod + def _parse_header( + cls, parsed + ) -> typing.Tuple[int, typing.Callable[[bytes, int, bool], SOMEIPTPHeader]]: + """Validate the header fields from `parsed` tuple. + + :param parsed: tuple of parsed header fields. + :return: Return the size of the whole SOMEIP packet and a builder function to + create a :class:`SOMEIPTPHeader` instance from the payload. + """ + sid, mid, size, cid, sessid, pv, iv, mt_b, rc_b = parsed + if pv != 1: + raise ParseError(f"bad someip protocol version 0x{pv:02x}, expected 0x01") + + try: + if not (mt_b & TP_FLAG): + raise ValueError("TP flag not set in SOMEIP message type") + + mt = SOMEIPMessageType(mt_b) + except ValueError as exc: + raise ParseError(f"bad someip message type {mt_b:#x}") from exc + + try: + rc = SOMEIPReturnCode(rc_b) + except ValueError as exc: + raise ParseError(f"bad someip return code {rc_b:#x}") from exc + + if size < 8: + raise ParseError("SOMEIP length must be at least 8") + + return ( + size, + lambda payload_b, offset=0, more_segments=False: cls( + service_id=sid, + method_id=mid, + client_id=cid, + session_id=sessid, + protocol_version=pv, + interface_version=iv, + message_type=mt, + return_code=rc, + offset=offset, + more_segments=more_segments, + payload=payload_b, + ), + ) + + @classmethod + def parse(cls, buf: bytes) -> typing.Tuple[SOMEIPTPHeader, bytes]: + """Parse SOME/IP packet in `buf`. + + :param buf: buffer containing SOME/IP packet. + :raises IncompleteReadError: if the buffer did not contain enough data to unpack + the SOMEIP packet. Either there was less data than one SOMEIP header length, + or the size in the header was too big + :raises ParseError: if the packet contained invalid data, such as an unknown + message type or return code + :return: tuple (S, B) where S is the parsed :class:`SOMEIPTPHeader` instance + and B is the unparsed rest of `buf`. + """ + parsed, buf_rest = _unpack(cls.__format, buf) + size, builder = cls._parse_header(parsed) + if len(buf_rest) < size - 8: + raise IncompleteReadError( + f"packet too short, expected {size+4}, got {len(buf)}" + ) + + header, buf_rest = cls._build_from_buffer(buf_rest, size, builder) + return header, buf_rest + + def build(self) -> bytes: + """Build the byte representation of this SOMEIP packet. + + :raises struct.error: if any attribute was out of range for serialization + :return: the byte representation + """ + size = len(self.payload) + 8 + self.TP_STRUCT.size + tp_hdr = self.TP_STRUCT.pack(self.offset, self.more_segments) + + hdr = self.__format.pack( + self.service_id, + self.method_id, + size, + self.client_id, + self.session_id, + self.protocol_version, + self.interface_version, + self.message_type.value, + self.return_code.value, + ) + return hdr + tp_hdr + self.payload + + +def header_parser( + buf: bytes, +) -> typing.Tuple[typing.Union[SOMEIPHeader, SOMEIPTPHeader], bytes]: + """ + Parse data and return either `SOMEIPHeader` or `SOMEIPTPHeader` and the rest of + the buffer. + + :param buf: Buffer containing SOME/IP or SOME/IP TP packet. + :return: tuple of parsed header and unparsed buffer. + """ + try: + return SOMEIPTPHeader.parse(buf) + except ParseError: + return SOMEIPHeader.parse(buf) + + class SOMEIPReader: """ Wrapper class around :class:`asyncio.StreamReader` that returns parsed diff --git a/src/someip/sd.py b/src/someip/sd.py index c1f18f7..03e0090 100644 --- a/src/someip/sd.py +++ b/src/someip/sd.py @@ -17,6 +17,7 @@ import someip.header import someip.config from someip.config import _T_SOCKNAME as _T_SOCKADDR +from someip.header import MAX_PAYLOAD_SIZE, SOMEIPTPHeader, TP_FLAG, header_parser from someip.utils import log_exceptions, wait_cancelled LOG = logging.getLogger("someip.sd") @@ -49,6 +50,39 @@ def format_address(addr: _T_SOCKADDR) -> str: raise NotImplementedError(f"unknown ip address format: {addr!r} -> {ip!r}") +def segment_msg( + msg: someip.header.SOMEIPHeader, + max_payload_size: int = MAX_PAYLOAD_SIZE, +) -> typing.Iterator[SOMEIPTPHeader]: + """Split a SOME/IP message into multiple SOME/IP-TP messages.""" + bytes_sent = 0 + offset = 0 + max_len = (max_payload_size - SOMEIPTPHeader.TP_STRUCT.size) // 16 * 16 + message_type = someip.header.SOMEIPMessageType(msg.message_type.value | TP_FLAG) + data = msg.payload + original_payload_len = len(data) + while data: + payload, data = data[:max_len], data[max_len:] + bytes_sent = bytes_sent + len(payload) + more_segments = bytes_sent < original_payload_len + + yield SOMEIPTPHeader( + service_id=msg.service_id, + method_id=msg.method_id, + client_id=msg.client_id, + session_id=msg.session_id, + interface_version=msg.interface_version, + message_type=message_type, + protocol_version=msg.protocol_version, + return_code=msg.return_code, + offset=offset, + more_segments=more_segments, + payload=payload, + ) + + offset = bytes_sent // 16 + + class SOMEIPDatagramProtocol: """ is actually not a subclass of asyncio.BaseProtocol or asyncio.DatagramProtocol, @@ -85,13 +119,70 @@ def __init__(self, logger: str = "someip"): # default_addr=None means use connected address from socket self.default_addr: _T_OPT_SOCKADDR = None + # Dict that keeps track of dict of kwargs of SOME/IP messages currently being + # assembled keyed by PDU ID (i.e. a tuple of service_id and method_id). + self._tp_buffer: typing.Dict[ + typing.Tuple[int, int], typing.Dict[str, typing.Any] + ] = {} + + def _assemble_msg( + self, segment: SOMEIPTPHeader, + ) -> typing.Union[None, someip.header.SOMEIPHeader]: + """Assemble SOME/IP message from SOME/IP TP messages. + + :param segment: The SOME/IP TP message to process. + :return: The assembled SOME/IP message if all segments were received, None + otherwise. + """ + header = None + pdu_id = segment.service_id, segment.method_id + if segment.offset == 0: + message_type = someip.header.SOMEIPMessageType( + segment.message_type.value & ~TP_FLAG + ) + self._tp_buffer[pdu_id] = { + "service_id": segment.service_id, + "method_id": segment.method_id, + "client_id": segment.client_id, + "session_id": segment.session_id, + "interface_version": segment.interface_version, + "message_type": message_type, + "protocol_version": segment.protocol_version, + "return_code": segment.return_code, + "payload": segment.payload, + } + elif segment.offset > 0: + wip = self._tp_buffer.get(pdu_id) + if wip is not None: + current_offset = len(wip["payload"]) / 16 + if segment.offset == current_offset: + wip["payload"] = wip["payload"] + segment.payload + if not segment.more_segments: + del self._tp_buffer[pdu_id] + header = someip.header.SOMEIPHeader(**wip) + else: + self.log.error( + "Received TP segment with wrong offset: %s", segment + ) + else: + self.log.error( + "Received TP segment without first segment: %s", segment + ) + + return header + def datagram_received(self, data, addr: _T_SOCKADDR, multicast: bool) -> None: try: while data: # 4.2.1, TR_SOMEIP_00140 more than one SOMEIP message per UDP frame # allowed - parsed, data = someip.header.SOMEIPHeader.parse(data) - self.message_received(parsed, addr, multicast) + parsed, data = header_parser(data) + if isinstance(parsed, SOMEIPTPHeader): + parsed = self._assemble_msg(parsed) + if parsed: + self.message_received(parsed, addr, multicast) + else: + self.message_received(parsed, addr, multicast) except someip.header.ParseError as exc: self.log.error( "failed to parse SOME/IP datagram from %s: %r", @@ -135,6 +226,14 @@ def send(self, buf: bytes, remote: _T_OPT_SOCKADDR = None): remote = self.default_addr self.transport.sendto(buf, remote) + def send_msg(self, msg: someip.header.SOMEIPHeader, remote: _T_OPT_SOCKADDR = None): + """Send a SOME/IP message or split it into multiple SOME/IP-TP messages.""" + if len(msg.payload) <= MAX_PAYLOAD_SIZE: + self.send(msg.build(), remote) + else: + for msg in segment_msg(msg): + self.send(msg.build(), remote) + class DatagramProtocolAdapter(asyncio.DatagramProtocol): def __init__(self, protocol: SOMEIPDatagramProtocol, is_multicast: bool): diff --git a/tests/test_header.py b/tests/test_header.py index 3194c3d..5a27ca0 100644 --- a/tests/test_header.py +++ b/tests/test_header.py @@ -110,6 +110,163 @@ def test_someip_bad_length(self): with self.assertRaises(hdr.ParseError): hdr.SOMEIPHeader.parse(payload) + def test_someiptp_no_payload(self): + """Test parsing of a SOME/IP-TP message without payload.""" + payload = ( + b"\xde\xad\xbe\xef" + b"\x00\x00\x00\x0c" + b"\xcc\xcc\xdd\xdd" + b"\x01\x02\x20\x04" + b"\x00\x00\x05\x71" + ) + message = hdr.SOMEIPTPHeader( + service_id=0xDEAD, + method_id=0xBEEF, + client_id=0xCCCC, + session_id=0xDDDD, + protocol_version=1, + interface_version=2, + message_type=hdr.SOMEIPMessageType.TP_REQUEST, + return_code=hdr.SOMEIPReturnCode.E_NOT_READY, + offset=87, + more_segments=True, + ) + self._check(payload, message, hdr.SOMEIPTPHeader.parse) + + def test_someiptp_no_payload_rest(self): + """Test parsing of a SOME/IP-TP message without payload and extra data.""" + payload = ( + b"\xde\xad\xbe\xef" + b"\x00\x00\x00\x0c" + b"\xcc\xcc\xdd\xdd" + b"\x01\x02\x20\x04" + b"\x00\x00\x05\x71" + ) + message = hdr.SOMEIPTPHeader( + service_id=0xDEAD, + method_id=0xBEEF, + client_id=0xCCCC, + session_id=0xDDDD, + protocol_version=1, + interface_version=2, + message_type=hdr.SOMEIPMessageType.TP_REQUEST, + return_code=hdr.SOMEIPReturnCode.E_NOT_READY, + offset=87, + more_segments=True, + ) + self._check(payload, message, hdr.SOMEIPTPHeader.parse, extra=b"\1\2\3\4") + self._check(payload, message, hdr.SOMEIPTPHeader.parse, extra=payload) + + def test_someiptp_with_payload(self): + """Test parsing of a SOME/IP-TP message with payload.""" + payload = ( + b"\xde\xad\xbe\xef" + b"\x00\x00\x00\x0e" + b"\xcc\xcc\xdd\xdd" + b"\x01\x02\x20\x04" + b"\x00\x00\x05\x71" + b"\xaa\x55" + ) + message = hdr.SOMEIPTPHeader( + service_id=0xDEAD, + method_id=0xBEEF, + client_id=0xCCCC, + session_id=0xDDDD, + protocol_version=1, + interface_version=2, + message_type=hdr.SOMEIPMessageType.TP_REQUEST, + return_code=hdr.SOMEIPReturnCode.E_NOT_READY, + offset=87, + more_segments=True, + payload=b"\xaa\x55", + ) + self._check(payload, message, hdr.SOMEIPTPHeader.parse) + + def test_someiptp_with_payload_rest(self): + """Test parsing of a SOME/IP-TP message with payload and extra data.""" + payload = ( + b"\xde\xad\xbe\xef" + b"\x00\x00\x00\x0e" + b"\xcc\xcc\xdd\xdd" + b"\x01\x02\x20\x04" + b"\x00\x00\x05\x71" + b"\xaa\x55" + ) + message = hdr.SOMEIPTPHeader( + service_id=0xDEAD, + method_id=0xBEEF, + client_id=0xCCCC, + session_id=0xDDDD, + protocol_version=1, + interface_version=2, + message_type=hdr.SOMEIPMessageType.TP_REQUEST, + return_code=hdr.SOMEIPReturnCode.E_NOT_READY, + offset=87, + more_segments=True, + payload=b"\xaa\x55", + ) + self._check(payload, message, hdr.SOMEIPTPHeader.parse, extra=b"\1\2\3\4") + + def test_someiptp_raises_parser_error(self): + """Test parsing SOME/IP-TP message raises a ParseError.""" + params = [ + ( + ( + b"\xde\xad\xbe\xef" + b"\x00\x00\x00\x0c" + b"\xcc\xcc\xdd\xdd" + b"\x01\x02\x20\x04" + b"\x00\x00\x05" + ), + "short", + ), + ( + ( + b"\xde\xad\xbe\xef" + b"\x00\x00\x00\x0c" + b"\xcc\xcc\xdd\xdd" + b"\x00\x02\x20\x04" + b"\x00\x00\x05\x71" + ), + "bad_version", + ), + ( + ( + b"\xde\xad\xbe\xef" + b"\x00\x00\x00\x0c" + b"\xcc\xcc\xdd\xdd" + b"\x00\x02\x40\x04" + b"\x00\x00\x05\x71" + ), + "bad_message_type", + ), + ( + ( + b"\xde\xad\xbe\xef" + b"\x00\x00\x00\x0c" + b"\xcc\xcc\xdd\xdd" + b"\x00\x02\x20\xaa" + b"\x00\x00\x05\x71" + ), + "bad_return_code", + ), + ( + ( + b"\xde\xad\xbe\xef" + b"\x00\x00\x00\x0d" + b"\xcc\xcc\xdd\xdd" + b"\x00\x02\x20\xaa" + b"\x00\x00\x05\x71" + ), + "bad_length", + ), + ] + + for payload, msg in params: + with self.subTest(msg=msg): + with self.assertRaises(hdr.ParseError): + hdr.SOMEIPTPHeader.parse(payload) + async def test_someip_stream_async(self): bytes_reader = asyncio.StreamReader() someip_reader = hdr.SOMEIPReader(bytes_reader) diff --git a/tests/test_sd.py b/tests/test_sd.py index 7f218ba..d36554d 100644 --- a/tests/test_sd.py +++ b/tests/test_sd.py @@ -11,6 +11,7 @@ import typing import unittest import unittest.mock +from collections import namedtuple from dataclasses import replace import someip.header as hdr @@ -120,31 +121,69 @@ async def settle(): # }}} +EndPointParam = namedtuple("EndPointParam", "datas, expected, id") + +params = ( + EndPointParam( + datas=(b"\xde\xad\xbe\xef\x00\x00\x00\x08\xcc\xcc\xdd\xdd\x01\x02\x40\x04",), + expected=hdr.SOMEIPHeader( + service_id=0xDEAD, + method_id=0xBEEF, + client_id=0xCCCC, + session_id=0xDDDD, + protocol_version=1, + interface_version=2, + message_type=hdr.SOMEIPMessageType.REQUEST_ACK, + return_code=hdr.SOMEIPReturnCode.E_NOT_READY, + ), + id="someip", + ), + EndPointParam( + datas=( + ( + b"\xde\xad\xbe\xef" + b"\x00\x00\x05\x7c" + b"\xcc\xcc\xdd\xdd" + b"\x01\x02\x20\x04" + b"\x00\x00\x00\x01" + ) + b"\x00" * 1392, + ( + b"\xde\xad\xbe\xef" + b"\x00\x00\x01\x44" + b"\xcc\xcc\xdd\xdd" + b"\x01\x02\x20\x04" + b"\x00\x00\x05\x70" + ) + b"\x00" * 312, + ), + expected=hdr.SOMEIPHeader( + service_id=0xDEAD, + method_id=0xBEEF, + client_id=0xCCCC, + session_id=0xDDDD, + protocol_version=1, + interface_version=2, + message_type=hdr.SOMEIPMessageType.REQUEST, + return_code=hdr.SOMEIPReturnCode.E_NOT_READY, + payload=bytes(1704), + ), + id="someip_tp", + ), +) + + class TestSD(unittest.IsolatedAsyncioTestCase): multi_addr = ("2001:db8::1", 30490, 0, 0) fake_addr = ("2001:db8::2", 30490, 0, 0) - async def _test_endpoint(self, family, host): + async def _test_endpoint(self, family, host, datas, expected): sock = None trsp, prot = await sd.SOMEIPDatagramProtocol.create_unicast_endpoint( local_addr=(host, 0), ) try: prot.message_received = unittest.mock.Mock() + prot.send = unittest.mock.Mock() local_sockname = trsp.get_extra_info("sockname") - - message = hdr.SOMEIPHeader( - service_id=0xDEAD, - method_id=0xBEEF, - client_id=0xCCCC, - session_id=0xDDDD, - protocol_version=1, - interface_version=2, - message_type=hdr.SOMEIPMessageType.REQUEST_ACK, - return_code=hdr.SOMEIPReturnCode.E_NOT_READY, - ) - data = b"\xde\xad\xbe\xef\x00\x00\x00\x08\xcc\xcc\xdd\xdd\x01\x02\x40\x04" - sock = socket.socket(family, socket.SOCK_DGRAM, socket.IPPROTO_UDP) addrs = await asyncio.get_event_loop().getaddrinfo( host, @@ -157,15 +196,21 @@ async def _test_endpoint(self, family, host): bind_addr = addrs[0][4] sock.bind(bind_addr) - sock.sendto(data, local_sockname) + for data in datas: + sock.sendto(data, local_sockname) + sender_sockname = sock.getsockname() - await asyncio.sleep(0.01) + await asyncio.sleep(1) prot.message_received.assert_called_once_with( - message, sender_sockname, False + expected, sender_sockname, False ) prot.message_received.reset_mock() + prot.send_msg(expected) + for call_args, data in zip(prot.send.call_args_list, datas): + assert call_args[0][0] == data + data = b"\xde\xad\xbe\xef\x00\x00\x00\x08\xcc\xcc\xdd\xdd\x00\x02\x40\x04" sock.sendto(data, local_sockname) @@ -178,10 +223,18 @@ async def _test_endpoint(self, family, host): trsp.close() async def test_endpoint_v6(self): - await self._test_endpoint(socket.AF_INET6, "::1") + for data, expected, msg in params: + with self.subTest(msg=msg): + await self._test_endpoint( + socket.AF_INET6, "::1", data, expected + ) async def test_endpoint_v4(self): - await self._test_endpoint(socket.AF_INET, "127.0.0.1") + for data, expected, msg in params: + with self.subTest(msg=msg): + await self._test_endpoint( + socket.AF_INET, "127.0.0.1", data, expected + ) async def test_send_session_id(self): entry = cfg.Service(service_id=0x1234).create_find_entry()