From e76510de245b97d52e05535bb4829dbb12da747b Mon Sep 17 00:00:00 2001 From: Stefan Tatschner Date: Thu, 18 Jul 2024 15:25:31 +0200 Subject: [PATCH] chore: Migrate to new context manager asyncio.timeout() --- src/gallia/commands/discover/doip.py | 13 ++++++++----- src/gallia/dumpcap.py | 3 ++- src/gallia/services/uds/ecu.py | 3 ++- src/gallia/transports/base.py | 6 ++++-- src/gallia/transports/can.py | 6 ++++-- src/gallia/transports/doip.py | 28 +++++++++++++-------------- src/gallia/transports/isotp.py | 6 ++++-- src/gallia/transports/tcp.py | 11 ++++++----- src/gallia/transports/unix.py | 6 ++++-- src/opennetzteil/devices/rs/hmc804.py | 15 +++++++------- 10 files changed, 56 insertions(+), 41 deletions(-) diff --git a/src/gallia/commands/discover/doip.py b/src/gallia/commands/discover/doip.py index 31e31c0e7..495200264 100644 --- a/src/gallia/commands/discover/doip.py +++ b/src/gallia/commands/discover/doip.py @@ -242,7 +242,8 @@ async def gather_doip_details( await loop.sock_sendto(sock, hdr.pack(), (tgt_hostname, tgt_port)) try: - data, _ = await asyncio.wait_for(loop.sock_recvfrom(sock, 1024), 2) + async with asyncio.timeout(2): + data, _ = await loop.sock_recvfrom(sock, 1024) except TimeoutError: logger.info("[🐣] No response!") continue @@ -367,12 +368,13 @@ async def enumerate_target_addresses( # noqa: PLR0913 logger.info(f"[⏳] Waiting for reply of target {target_addr:#x}") # Hardcoded loop to detect potential broadcasts while True: - pot_broadcast, data = await asyncio.wait_for( - self.read_diag_request_custom(conn), + timeout = ( TimingAndCommunicationParameters.DiagnosticMessageMessageTimeout / 1000 if timeout is None - else timeout, + else timeout ) + async with asyncio.timeout(timeout): + pot_broadcast, data = await self.read_diag_request_custom(conn) if pot_broadcast is None: break @@ -534,7 +536,8 @@ async def run_udp_discovery(self) -> list[tuple[str, int]]: await loop.sock_sendto(sock, hdr.pack(), (ip.broadcast, 13400)) try: while True: - data, addr = await asyncio.wait_for(loop.sock_recvfrom(sock, 1024), 2) + async with asyncio.timeout(2): + data, addr = await loop.sock_recvfrom(sock, 1024) info = VehicleAnnouncementMessage.unpack(data[8:]) logger.notice(f"[💝]: {addr} responded: {info}") found.append(addr) diff --git a/src/gallia/dumpcap.py b/src/gallia/dumpcap.py index b27c5438a..320c0ddd2 100644 --- a/src/gallia/dumpcap.py +++ b/src/gallia/dumpcap.py @@ -100,7 +100,8 @@ async def start( return cls(proc, artifacts_dir, outfile) async def sync(self, timeout: float = 1) -> None: - await asyncio.wait_for(self.ready_event.wait(), timeout) + async with asyncio.timeout(timeout): + await self.ready_event.wait() async def stop(self) -> None: logger.info(f"Waiting {self.cleanup}s for dumpcap to receive all packets") diff --git a/src/gallia/services/uds/ecu.py b/src/gallia/services/uds/ecu.py index d3bae0699..ec6883c5b 100644 --- a/src/gallia/services/uds/ecu.py +++ b/src/gallia/services/uds/ecu.py @@ -340,7 +340,8 @@ async def wait_for_ecu( t = timeout if timeout is not None else self.timeout try: - await asyncio.wait_for(self._wait_for_ecu(0.5), timeout=t) + async with asyncio.timeout(t): + await self._wait_for_ecu(0.5) return True except TimeoutError: logger.critical("Timeout while waiting for ECU!") diff --git a/src/gallia/transports/base.py b/src/gallia/transports/base.py index 515f72398..0f89eb216 100644 --- a/src/gallia/transports/base.py +++ b/src/gallia/transports/base.py @@ -242,7 +242,8 @@ async def write( writer = self.get_writer() writer.write(binascii.hexlify(data) + b"\n") - await asyncio.wait_for(writer.drain(), timeout) + async with asyncio.timeout(timeout): + await writer.drain() return len(data) async def read( @@ -250,7 +251,8 @@ async def read( timeout: float | None = None, tags: list[str] | None = None, ) -> bytes: - data = await asyncio.wait_for(self.get_reader().readline(), timeout) + async with asyncio.timeout(timeout): + data = await self.get_reader().readline() d = data.decode().strip() t = tags + ["read"] if tags is not None else ["read"] diff --git a/src/gallia/transports/can.py b/src/gallia/transports/can.py index a0be68dcb..36748585d 100644 --- a/src/gallia/transports/can.py +++ b/src/gallia/transports/can.py @@ -189,14 +189,16 @@ async def sendto( logger.trace(f"{dst:03x}#{data.hex()}", extra={"tags": t}) loop = asyncio.get_running_loop() - await asyncio.wait_for(loop.sock_sendall(self._sock, msg.pack()), timeout) + async with asyncio.timeout(timeout): + await loop.sock_sendall(self._sock, msg.pack()) return len(data) async def recvfrom( self, timeout: float | None = None, tags: list[str] | None = None ) -> tuple[int, bytes]: loop = asyncio.get_running_loop() - can_frame = await asyncio.wait_for(loop.sock_recv(self._sock, self.BUFSIZE), timeout) + async with asyncio.timeout(timeout): + can_frame = await loop.sock_recv(self._sock, self.BUFSIZE) msg = CANMessage.unpack(can_frame) t = tags + ["read"] if tags is not None else ["read"] diff --git a/src/gallia/transports/doip.py b/src/gallia/transports/doip.py index f16638068..1d1aac086 100644 --- a/src/gallia/transports/doip.py +++ b/src/gallia/transports/doip.py @@ -661,17 +661,17 @@ async def write_request_raw(self, hdr: GenericHeader, payload: DoIPOutData) -> N match payload: case DiagnosticMessage(): # Now an ACK message is expected. - await asyncio.wait_for( - self._read_ack(payload.UserData), + async with asyncio.timeout( TimingAndCommunicationParameters.DiagnosticMessageMessageAckTimeout / 1000, - ) + ): + await self._read_ack(payload.UserData) case RoutingActivationRequest(): - await asyncio.wait_for( - self._read_routing_activation_response(), - TimingAndCommunicationParameters.RoutingActivationResponseTimeout + async with asyncio.timeout( + TimingAndCommunicationParameters.DiagnosticMessageMessageAckTimeout / 1000, - ) + ): + await self._read_routing_activation_response() except TimeoutError as e: await self.close() raise BrokenPipeError("Timeout while waiting for DoIP ACK message") from e @@ -793,17 +793,15 @@ async def connect( port = t.port if t.port is not None else 13400 config = DoIPConfig(**t.qs_flat) - conn = await asyncio.wait_for( - cls._connect( + async with asyncio.timeout(timeout): + conn = await cls._connect( t.hostname, port, config.src_addr, config.target_addr, config.activation_type, config.protocol_version, - ), - timeout, - ) + ) return cls(t, port, config, conn) async def close(self) -> None: @@ -817,7 +815,8 @@ async def read( timeout: float | None = None, tags: list[str] | None = None, ) -> bytes: - data = await asyncio.wait_for(self._conn.read_diag_request(), timeout) + async with asyncio.timeout(timeout): + data = await self._conn.read_diag_request() t = tags + ["read"] if tags is not None else ["read"] logger.trace(data.hex(), extra={"tags": t}) @@ -833,7 +832,8 @@ async def write( logger.trace(data.hex(), extra={"tags": t}) try: - await asyncio.wait_for(self._conn.write_diag_request(data), timeout) + async with asyncio.timeout(timeout): + await self._conn.write_diag_request(data) except DoIPNegativeAckError as e: if e.nack_code != DiagnosticMessageNegativeAckCodes.TargetUnreachable: raise e diff --git a/src/gallia/transports/isotp.py b/src/gallia/transports/isotp.py index 6335a39c0..4304d3fee 100644 --- a/src/gallia/transports/isotp.py +++ b/src/gallia/transports/isotp.py @@ -185,13 +185,15 @@ async def write( logger.trace(data.hex(), extra={"tags": t}) loop = asyncio.get_running_loop() - await asyncio.wait_for(loop.sock_sendall(self._sock, data), timeout) + async with asyncio.timeout(timeout): + await loop.sock_sendall(self._sock, data) return len(data) async def read(self, timeout: float | None = None, tags: list[str] | None = None) -> bytes: loop = asyncio.get_running_loop() try: - data = await asyncio.wait_for(loop.sock_recv(self._sock, self.BUFSIZE), timeout) + async with asyncio.timeout(timeout): + data = await loop.sock_recv(self._sock, self.BUFSIZE) except OSError as e: if e.errno == errno.ECOMM: raise BrokenPipeError(f"isotp flow control frame missing: {e}") from e diff --git a/src/gallia/transports/tcp.py b/src/gallia/transports/tcp.py index 01fa39f58..731d0dfee 100644 --- a/src/gallia/transports/tcp.py +++ b/src/gallia/transports/tcp.py @@ -29,9 +29,8 @@ async def connect(cls, target: str | TargetURI, timeout: float | None = None) -> t = target if isinstance(target, TargetURI) else TargetURI(target) cls.check_scheme(t) - reader, writer = await asyncio.wait_for( - asyncio.open_connection(t.hostname, t.port), timeout - ) + async with asyncio.timeout(timeout): + reader, writer = await asyncio.open_connection(t.hostname, t.port) return cls(t, reader, writer) async def close(self) -> None: @@ -51,7 +50,8 @@ async def write( logger.trace(data.hex(), extra={"tags": t}) self.writer.write(data) - await asyncio.wait_for(self.writer.drain(), timeout) + async with asyncio.timeout(timeout): + await self.writer.drain() return len(data) async def read( @@ -59,7 +59,8 @@ async def read( timeout: float | None = None, tags: list[str] | None = None, ) -> bytes: - data = await asyncio.wait_for(self.reader.read(self.BUFSIZE), timeout) + async with asyncio.timeout(timeout): + data = await self.reader.read(self.BUFSIZE) t = tags + ["read"] if tags is not None else ["read"] logger.trace(data.hex(), extra={"tags": t}) diff --git a/src/gallia/transports/unix.py b/src/gallia/transports/unix.py index 1b889a5af..f79877315 100644 --- a/src/gallia/transports/unix.py +++ b/src/gallia/transports/unix.py @@ -29,7 +29,8 @@ async def connect(cls, target: str | TargetURI, timeout: float | None = None) -> t = target if isinstance(target, TargetURI) else TargetURI(target) cls.check_scheme(t) - reader, writer = await asyncio.wait_for(asyncio.open_unix_connection(t.path), timeout) + async with asyncio.timeout(timeout): + reader, writer = await asyncio.open_unix_connection(t.path) return cls(t, reader, writer) @@ -48,7 +49,8 @@ async def write( t = tags + ["write"] if tags is not None else ["write"] logger.trace(data.hex(), extra={"tags": t}) self.writer.write(data) - await asyncio.wait_for(self.writer.drain(), timeout) + async with asyncio.timeout(timeout): + await self.writer.drain() return len(data) diff --git a/src/opennetzteil/devices/rs/hmc804.py b/src/opennetzteil/devices/rs/hmc804.py index f7458c3fa..2e79e2499 100644 --- a/src/opennetzteil/devices/rs/hmc804.py +++ b/src/opennetzteil/devices/rs/hmc804.py @@ -23,21 +23,22 @@ class HMC804(BaseNetzteil): async def _connect( self, ) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: - return await asyncio.wait_for( - asyncio.open_connection(self.target.hostname, self.target.port), - self.timeout, - ) + async with asyncio.timeout(self.timeout): + return await asyncio.open_connection(self.target.hostname, self.target.port) async def _send_line(self, writer: asyncio.StreamWriter, data: str) -> None: writer.write(data.encode() + b"\n") - await asyncio.wait_for(writer.drain(), self.timeout) + async with asyncio.timeout(self.timeout): + await writer.drain() async def _recv_line(self, reader: asyncio.StreamReader) -> str: - return (await asyncio.wait_for(reader.readline(), self.timeout)).decode().strip() + async with asyncio.timeout(self.timeout): + return (await reader.readline()).decode().strip() async def _close_conn(self, writer: asyncio.StreamWriter) -> None: writer.close() - await asyncio.wait_for(writer.wait_closed(), self.timeout) + async with asyncio.timeout(self.timeout): + await writer.wait_closed() async def _request(self, data: str) -> str: reader, writer = await self._connect()