diff --git a/py/client-ticking/src/pydeephaven_ticking/table_listener.py b/py/client-ticking/src/pydeephaven_ticking/table_listener.py index 8397b98149d..be9fce12f7b 100644 --- a/py/client-ticking/src/pydeephaven_ticking/table_listener.py +++ b/py/client-ticking/src/pydeephaven_ticking/table_listener.py @@ -180,7 +180,10 @@ class TableListenerHandle: _table: Table _listener: TableListener + # Set when the user calls stop() _cancelled: bool + # Tracks Whether we've called cancel on the FlightStreamReader + _reader_cancelled: bool _bp: dhc.BarrageProcessor _writer: flight.FlightStreamWriter _reader: flight.FlightStreamReader @@ -195,6 +198,7 @@ def __init__(self, table: Table, listener: TableListener): self._table = table self._listener = listener self._cancelled = False + self._reader_cancelled = False def start(self) -> None: """Subscribes to changes on the table referenced in the constructor. When changes happen, the @@ -211,9 +215,16 @@ def start(self) -> None: def stop(self) -> None: """Cancels the subscription to the table and stops the service thread. By the time this method returns, the - thread servicing the subscription will be destroyed, and the callback will no longer be invoked.""" + thread servicing the subscription will be destroyed, and the callback will no longer be invoked. + This method joins the subscription servicing thread, unless stop() was called from that very thread. + This can happen if the user's callback calls stop().""" self._cancelled = True + if threading.get_ident() == self._thread.ident: + # We are inside the callback, so just setting the 'cancelled' flag suffices. + return + + self._reader_cancelled = True self._reader.cancel() self._thread.join() @@ -223,16 +234,25 @@ def _process_data(self): user-supplied callback with that TableUpdate.""" try: - while True: + while not self._cancelled: data, metadata = self._reader.read_chunk() ticking_update = self._bp.process_next_chunk(data.columns, metadata) if ticking_update is not None: table_update = TableUpdate(ticking_update) self._listener.on_update(table_update) + except StopIteration: + pass except Exception as e: if not self._cancelled: self._listener.on_error(e) + try: + if not self._reader_cancelled: + self._reader_cancelled = True + self._reader.cancel() + self._writer.close() + except Exception as e: + pass def listen(table: Table, listener: Union[Callable, TableListener], on_error: Union[Callable, None] = None) -> TableListenerHandle: