Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python Client: avoid thread self-join on stop() #5005

Merged
merged 1 commit into from
Jan 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions py/client-ticking/src/pydeephaven_ticking/table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ class TableListenerHandle:

_table: Table
_listener: TableListener
# Set when the user calls stop()
_cancelled: bool
# Tracks Whether we've called cancel on the FlightStreamReader
_reader_cancelled: bool
_bp: dhc.BarrageProcessor
_writer: flight.FlightStreamWriter
_reader: flight.FlightStreamReader
Expand All @@ -195,6 +198,7 @@ def __init__(self, table: Table, listener: TableListener):
self._table = table
self._listener = listener
self._cancelled = False
self._reader_cancelled = False

def start(self) -> None:
"""Subscribes to changes on the table referenced in the constructor. When changes happen, the
Expand All @@ -211,9 +215,16 @@ def start(self) -> None:

def stop(self) -> None:
"""Cancels the subscription to the table and stops the service thread. By the time this method returns, the
thread servicing the subscription will be destroyed, and the callback will no longer be invoked."""
thread servicing the subscription will be destroyed, and the callback will no longer be invoked.
This method joins the subscription servicing thread, unless stop() was called from that very thread.
This can happen if the user's callback calls stop()."""

self._cancelled = True
if threading.get_ident() == self._thread.ident:
# We are inside the callback, so just setting the 'cancelled' flag suffices.
return

self._reader_cancelled = True
self._reader.cancel()
self._thread.join()

Expand All @@ -223,16 +234,25 @@ def _process_data(self):
user-supplied callback with that TableUpdate."""

try:
while True:
while not self._cancelled:
data, metadata = self._reader.read_chunk()
ticking_update = self._bp.process_next_chunk(data.columns, metadata)
if ticking_update is not None:
table_update = TableUpdate(ticking_update)
self._listener.on_update(table_update)
except StopIteration:
pass
except Exception as e:
if not self._cancelled:
self._listener.on_error(e)

try:
if not self._reader_cancelled:
self._reader_cancelled = True
self._reader.cancel()
self._writer.close()
except Exception as e:
pass

def listen(table: Table, listener: Union[Callable, TableListener],
on_error: Union[Callable, None] = None) -> TableListenerHandle:
Expand Down
Loading