Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Migrate to new context manager asyncio.timeout() #559

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/gallia/commands/discover/doip.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ async def gather_doip_details(
await loop.sock_sendto(sock, hdr.pack(), (tgt_hostname, tgt_port))

try:
data, _ = await asyncio.wait_for(loop.sock_recvfrom(sock, 1024), 2)
async with asyncio.timeout(2):
data, _ = await loop.sock_recvfrom(sock, 1024)
except TimeoutError:
logger.info("[🐣] No response!")
continue
Expand Down Expand Up @@ -367,12 +368,13 @@ async def enumerate_target_addresses( # noqa: PLR0913
logger.info(f"[⏳] Waiting for reply of target {target_addr:#x}")
# Hardcoded loop to detect potential broadcasts
while True:
pot_broadcast, data = await asyncio.wait_for(
self.read_diag_request_custom(conn),
timeout = (
TimingAndCommunicationParameters.DiagnosticMessageMessageTimeout / 1000
if timeout is None
else timeout,
else timeout
)
async with asyncio.timeout(timeout):
pot_broadcast, data = await self.read_diag_request_custom(conn)
if pot_broadcast is None:
break

Expand Down Expand Up @@ -534,7 +536,8 @@ async def run_udp_discovery(self) -> list[tuple[str, int]]:
await loop.sock_sendto(sock, hdr.pack(), (ip.broadcast, 13400))
try:
while True:
data, addr = await asyncio.wait_for(loop.sock_recvfrom(sock, 1024), 2)
async with asyncio.timeout(2):
data, addr = await loop.sock_recvfrom(sock, 1024)
info = VehicleAnnouncementMessage.unpack(data[8:])
logger.notice(f"[💝]: {addr} responded: {info}")
found.append(addr)
Expand Down
3 changes: 2 additions & 1 deletion src/gallia/dumpcap.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ async def start(
return cls(proc, artifacts_dir, outfile)

async def sync(self, timeout: float = 1) -> None:
await asyncio.wait_for(self.ready_event.wait(), timeout)
async with asyncio.timeout(timeout):
await self.ready_event.wait()

async def stop(self) -> None:
logger.info(f"Waiting {self.cleanup}s for dumpcap to receive all packets")
Expand Down
3 changes: 2 additions & 1 deletion src/gallia/services/uds/ecu.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ async def wait_for_ecu(

t = timeout if timeout is not None else self.timeout
try:
await asyncio.wait_for(self._wait_for_ecu(0.5), timeout=t)
async with asyncio.timeout(t):
await self._wait_for_ecu(0.5)
return True
except TimeoutError:
logger.critical("Timeout while waiting for ECU!")
Expand Down
6 changes: 4 additions & 2 deletions src/gallia/transports/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,17 @@ async def write(

writer = self.get_writer()
writer.write(binascii.hexlify(data) + b"\n")
await asyncio.wait_for(writer.drain(), timeout)
async with asyncio.timeout(timeout):
await writer.drain()
return len(data)

async def read(
self: TransportProtocol,
timeout: float | None = None,
tags: list[str] | None = None,
) -> bytes:
data = await asyncio.wait_for(self.get_reader().readline(), timeout)
async with asyncio.timeout(timeout):
data = await self.get_reader().readline()
d = data.decode().strip()

t = tags + ["read"] if tags is not None else ["read"]
Expand Down
6 changes: 4 additions & 2 deletions src/gallia/transports/can.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,16 @@ async def sendto(
logger.trace(f"{dst:03x}#{data.hex()}", extra={"tags": t})

loop = asyncio.get_running_loop()
await asyncio.wait_for(loop.sock_sendall(self._sock, msg.pack()), timeout)
async with asyncio.timeout(timeout):
await loop.sock_sendall(self._sock, msg.pack())
return len(data)

async def recvfrom(
self, timeout: float | None = None, tags: list[str] | None = None
) -> tuple[int, bytes]:
loop = asyncio.get_running_loop()
can_frame = await asyncio.wait_for(loop.sock_recv(self._sock, self.BUFSIZE), timeout)
async with asyncio.timeout(timeout):
can_frame = await loop.sock_recv(self._sock, self.BUFSIZE)
msg = CANMessage.unpack(can_frame)

t = tags + ["read"] if tags is not None else ["read"]
Expand Down
26 changes: 13 additions & 13 deletions src/gallia/transports/doip.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,17 +661,17 @@ async def write_request_raw(self, hdr: GenericHeader, payload: DoIPOutData) -> N
match payload:
case DiagnosticMessage():
# Now an ACK message is expected.
await asyncio.wait_for(
self._read_ack(payload.UserData),
async with asyncio.timeout(
TimingAndCommunicationParameters.DiagnosticMessageMessageAckTimeout
/ 1000,
)
):
await self._read_ack(payload.UserData)
case RoutingActivationRequest():
await asyncio.wait_for(
self._read_routing_activation_response(),
async with asyncio.timeout(
TimingAndCommunicationParameters.RoutingActivationResponseTimeout
/ 1000,
)
):
await self._read_routing_activation_response()
except TimeoutError as e:
await self.close()
raise BrokenPipeError("Timeout while waiting for DoIP ACK message") from e
Expand Down Expand Up @@ -793,17 +793,15 @@ async def connect(

port = t.port if t.port is not None else 13400
config = DoIPConfig(**t.qs_flat)
conn = await asyncio.wait_for(
cls._connect(
async with asyncio.timeout(timeout):
conn = await cls._connect(
t.hostname,
port,
config.src_addr,
config.target_addr,
config.activation_type,
config.protocol_version,
),
timeout,
)
)
return cls(t, port, config, conn)

async def close(self) -> None:
Expand All @@ -817,7 +815,8 @@ async def read(
timeout: float | None = None,
tags: list[str] | None = None,
) -> bytes:
data = await asyncio.wait_for(self._conn.read_diag_request(), timeout)
async with asyncio.timeout(timeout):
data = await self._conn.read_diag_request()

t = tags + ["read"] if tags is not None else ["read"]
logger.trace(data.hex(), extra={"tags": t})
Expand All @@ -833,7 +832,8 @@ async def write(
logger.trace(data.hex(), extra={"tags": t})

try:
await asyncio.wait_for(self._conn.write_diag_request(data), timeout)
async with asyncio.timeout(timeout):
await self._conn.write_diag_request(data)
except DoIPNegativeAckError as e:
if e.nack_code != DiagnosticMessageNegativeAckCodes.TargetUnreachable:
raise e
Expand Down
6 changes: 4 additions & 2 deletions src/gallia/transports/isotp.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,15 @@ async def write(
logger.trace(data.hex(), extra={"tags": t})

loop = asyncio.get_running_loop()
await asyncio.wait_for(loop.sock_sendall(self._sock, data), timeout)
async with asyncio.timeout(timeout):
await loop.sock_sendall(self._sock, data)
return len(data)

async def read(self, timeout: float | None = None, tags: list[str] | None = None) -> bytes:
loop = asyncio.get_running_loop()
try:
data = await asyncio.wait_for(loop.sock_recv(self._sock, self.BUFSIZE), timeout)
async with asyncio.timeout(timeout):
data = await loop.sock_recv(self._sock, self.BUFSIZE)
except OSError as e:
if e.errno == errno.ECOMM:
raise BrokenPipeError(f"isotp flow control frame missing: {e}") from e
Expand Down
11 changes: 6 additions & 5 deletions src/gallia/transports/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ async def connect(cls, target: str | TargetURI, timeout: float | None = None) ->
t = target if isinstance(target, TargetURI) else TargetURI(target)
cls.check_scheme(t)

reader, writer = await asyncio.wait_for(
asyncio.open_connection(t.hostname, t.port), timeout
)
async with asyncio.timeout(timeout):
reader, writer = await asyncio.open_connection(t.hostname, t.port)
return cls(t, reader, writer)

async def close(self) -> None:
Expand All @@ -51,15 +50,17 @@ async def write(
logger.trace(data.hex(), extra={"tags": t})

self.writer.write(data)
await asyncio.wait_for(self.writer.drain(), timeout)
async with asyncio.timeout(timeout):
await self.writer.drain()
return len(data)

async def read(
self,
timeout: float | None = None,
tags: list[str] | None = None,
) -> bytes:
data = await asyncio.wait_for(self.reader.read(self.BUFSIZE), timeout)
async with asyncio.timeout(timeout):
data = await self.reader.read(self.BUFSIZE)

t = tags + ["read"] if tags is not None else ["read"]
logger.trace(data.hex(), extra={"tags": t})
Expand Down
6 changes: 4 additions & 2 deletions src/gallia/transports/unix.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ async def connect(cls, target: str | TargetURI, timeout: float | None = None) ->
t = target if isinstance(target, TargetURI) else TargetURI(target)
cls.check_scheme(t)

reader, writer = await asyncio.wait_for(asyncio.open_unix_connection(t.path), timeout)
async with asyncio.timeout(timeout):
reader, writer = await asyncio.open_unix_connection(t.path)

return cls(t, reader, writer)

Expand All @@ -48,7 +49,8 @@ async def write(
t = tags + ["write"] if tags is not None else ["write"]
logger.trace(data.hex(), extra={"tags": t})
self.writer.write(data)
await asyncio.wait_for(self.writer.drain(), timeout)
async with asyncio.timeout(timeout):
await self.writer.drain()

return len(data)

Expand Down
15 changes: 8 additions & 7 deletions src/opennetzteil/devices/rs/hmc804.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,22 @@ class HMC804(BaseNetzteil):
async def _connect(
self,
) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
return await asyncio.wait_for(
asyncio.open_connection(self.target.hostname, self.target.port),
self.timeout,
)
async with asyncio.timeout(self.timeout):
return await asyncio.open_connection(self.target.hostname, self.target.port)

async def _send_line(self, writer: asyncio.StreamWriter, data: str) -> None:
writer.write(data.encode() + b"\n")
await asyncio.wait_for(writer.drain(), self.timeout)
async with asyncio.timeout(self.timeout):
await writer.drain()

async def _recv_line(self, reader: asyncio.StreamReader) -> str:
return (await asyncio.wait_for(reader.readline(), self.timeout)).decode().strip()
async with asyncio.timeout(self.timeout):
return (await reader.readline()).decode().strip()

async def _close_conn(self, writer: asyncio.StreamWriter) -> None:
writer.close()
await asyncio.wait_for(writer.wait_closed(), self.timeout)
async with asyncio.timeout(self.timeout):
await writer.wait_closed()

async def _request(self, data: str) -> str:
reader, writer = await self._connect()
Expand Down