Skip to content

Commit

Permalink
#136 improve typing (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-oleshkevich authored Aug 23, 2024
1 parent 23c9b40 commit 69cf29a
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 7 deletions.
4 changes: 2 additions & 2 deletions broadcaster/_backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ async def connect(self) -> None:
async def disconnect(self) -> None:
raise NotImplementedError()

async def subscribe(self, group: str) -> None:
async def subscribe(self, channel: str) -> None:
raise NotImplementedError()

async def unsubscribe(self, group: str) -> None:
async def unsubscribe(self, channel: str) -> None:
raise NotImplementedError()

async def publish(self, channel: str, message: Any) -> None:
Expand Down
12 changes: 9 additions & 3 deletions broadcaster/_backends/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ def __init__(self, urls: str | list[str]) -> None:
self._ready = asyncio.Event()

async def connect(self) -> None:
self._producer = AIOKafkaProducer(bootstrap_servers=self._servers)
self._consumer = AIOKafkaConsumer(bootstrap_servers=self._servers)
self._producer = AIOKafkaProducer(bootstrap_servers=self._servers) # pyright: ignore
self._consumer = AIOKafkaConsumer(bootstrap_servers=self._servers) # pyright: ignore
await self._producer.start()
await self._consumer.start()

Expand All @@ -41,7 +41,13 @@ async def publish(self, channel: str, message: typing.Any) -> None:
async def next_published(self) -> Event:
await self._ready.wait()
message = await self._consumer.getone()
return Event(channel=message.topic, message=message.value.decode("utf8"))
value = message.value

# for type compatibility:
# we declare Event.message as str, so convert None to empty string
if value is None:
value = b""
return Event(channel=message.topic, message=value.decode("utf8"))

async def _wait_for_assignment(self) -> None:
"""Wait for the consumer to be assigned to the partition."""
Expand Down
2 changes: 1 addition & 1 deletion broadcaster/_backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def _pubsub_listener(self) -> None:
class RedisStreamBackend(BroadcastBackend):
def __init__(self, url: str):
url = url.replace("redis-stream", "redis", 1)
self.streams: dict[str, str] = {}
self.streams: dict[bytes | str | memoryview, int | bytes | str | memoryview] = {}
self._ready = asyncio.Event()
self._producer = redis.Redis.from_url(url)
self._consumer = redis.Redis.from_url(url)
Expand Down
2 changes: 1 addition & 1 deletion broadcaster/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class Subscriber:
def __init__(self, queue: asyncio.Queue[Event | None]) -> None:
self._queue = queue

async def __aiter__(self) -> AsyncGenerator[Event | None, None] | None:
async def __aiter__(self) -> AsyncGenerator[Event | None, None]:
try:
while True:
yield await self.get()
Expand Down

0 comments on commit 69cf29a

Please sign in to comment.