Skip to content

Commit

Permalink
Fix AsyncEmitSource reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
gtopper committed Dec 26, 2023
1 parent 1cb2ad1 commit ebf1bed
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions storey/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,12 +583,11 @@ def __init__(
):
super().__init__(**kwargs)
if buffer_size is None:
buffer_size = 8
self._buffer_size = 8
elif buffer_size <= 0:
raise ValueError("Buffer size must be positive")
else:
kwargs["buffer_size"] = buffer_size
self._q = asyncio.Queue(buffer_size)
kwargs["buffer_size"] = self._buffer_size
self._key_field = key_field
self._max_events_before_commit = max_events_before_commit or 20000
self._max_time_before_commit = max_time_before_commit or 45
Expand All @@ -601,6 +600,7 @@ def _init(self):
super()._init()
self._is_terminated = False
self._outstanding_offsets = defaultdict(list)
self._q = asyncio.Queue(self._buffer_size)

async def _run_loop(self):
committer = None
Expand Down

0 comments on commit ebf1bed

Please sign in to comment.