Skip to content

Commit

Permalink
Improve test_tcp_connection_with_forwarding to ensure connections are…
Browse files Browse the repository at this point in the history
… closed, and tasks are awaited.
  • Loading branch information
garyvdm committed May 21, 2024
1 parent 2531246 commit 71459c7
Showing 1 changed file with 44 additions and 43 deletions.
87 changes: 44 additions & 43 deletions test/test_tcp_address.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

@pytest.mark.asyncio
async def test_tcp_connection_with_forwarding():
closables = []
host = "127.0.0.1"
port = "55556"

Expand All @@ -23,53 +22,55 @@ async def test_tcp_connection_with_forwarding():

assert path

async def handle_connection(tcp_reader, tcp_writer):
async def handle_connection(tcp_reader: asyncio.StreamReader, tcp_writer: asyncio.StreamWriter):
unix_reader, unix_writer = await asyncio.open_unix_connection(path)
closables.append(tcp_writer)
closables.append(unix_writer)

async def handle_read():
while True:
data = await tcp_reader.read(1)
if not data:
break
unix_writer.write(data)
try:
while True:
data = await tcp_reader.read(1024)
if not data:
break
unix_writer.write(data)
finally:
unix_writer.close()

async def handle_write():
while True:
data = await unix_reader.read(1)
if not data:
break
tcp_writer.write(data)

asyncio.run_coroutine_threadsafe(handle_read(), asyncio.get_event_loop())
asyncio.run_coroutine_threadsafe(handle_write(), asyncio.get_event_loop())
try:
while True:
data = await unix_reader.read(1024)
if not data:
break
tcp_writer.write(data)
finally:
tcp_writer.close()

handle_read_task = asyncio.create_task(handle_read())
handle_write_task = asyncio.create_task(handle_write())
await asyncio.wait([handle_read_task, handle_write_task])

server = await asyncio.start_server(handle_connection, host, port)
closables.append(server)

bus = await MessageBus(bus_address=f"tcp:host={host},port={port}").connect()

# basic tests to see if it works
result = await bus.call(
Message(
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus.Peer",
member="Ping",
)
)
assert result

intr = await bus.introspect("org.freedesktop.DBus", "/org/freedesktop/DBus")
obj = bus.get_proxy_object("org.freedesktop.DBus", "/org/freedesktop/DBus", intr)
iface = obj.get_interface("org.freedesktop.DBus.Peer")
await iface.call_ping()

assert bus._sock.getpeername()[0] == host
assert bus._sock.getsockname()[0] == host
assert bus._sock.gettimeout() == 0
assert bus._stream.closed is False

for c in closables:
c.close()
async with server:
print("server started")
async with MessageBus(bus_address=f"tcp:host={host},port={port}") as bus:
# basic tests to see if it works
result = await bus.call(
Message(
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus.Peer",
member="Ping",
)
)
assert result

intr = await bus.introspect("org.freedesktop.DBus", "/org/freedesktop/DBus")
obj = bus.get_proxy_object("org.freedesktop.DBus", "/org/freedesktop/DBus", intr)
iface = obj.get_interface("org.freedesktop.DBus.Peer")
await iface.call_ping()

assert bus._sock.getpeername()[0] == host
assert bus._sock.getsockname()[0] == host
assert bus._sock.gettimeout() == 0
assert bus._stream.closed is False

0 comments on commit 71459c7

Please sign in to comment.