Skip to content

Commit

Permalink
Remove lock from data stream
Browse files Browse the repository at this point in the history
  • Loading branch information
richardr1126 committed Dec 6, 2024
1 parent df8f4fd commit 2d11363
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 8 deletions.
3 changes: 1 addition & 2 deletions firehose/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ def on_message_handler(message: firehose_models.MessageFrame) -> None:
# Update the client's parameters with the new cursor
client.update_params(models.ComAtprotoSyncSubscribeRepos.Params(cursor=commit.seq))
# Persist the new cursor in the database
with db.atomic():
SubscriptionState.update(cursor=commit.seq).where(SubscriptionState.service == name).execute()
SubscriptionState.update(cursor=commit.seq).where(SubscriptionState.service == name).execute()

if not commit.blocks:
# Skip if there are no blocks to process
Expand Down
12 changes: 6 additions & 6 deletions start_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ def handle_termination(signum, frame):
for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGHUP]:
signal.signal(sig, handle_termination)

try:
data_stream.run(config.SERVICE_DID, operations_callback, stream_stop_event)
except Exception as e:
logger.error(f"An exception occurred in the firehose: {e}")
# try to restart the stream
main()
while not stream_stop_event.is_set():
try:
data_stream.run(config.SERVICE_DID, operations_callback, stream_stop_event)
except Exception as e:
logger.error(f"An exception occurred in the firehose: {e}")
logger.info("Restarting the firehose stream...")

if __name__ == '__main__':
main()
Expand Down

0 comments on commit 2d11363

Please sign in to comment.