Skip to content

Commit

Permalink
Debugging firehose stopping
Browse files Browse the repository at this point in the history
  • Loading branch information
richardr1126 committed Dec 16, 2024
1 parent a35a4dc commit a8889bd
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 19 deletions.
7 changes: 6 additions & 1 deletion firehose/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ def run(name, operations_callback, stream_stop_event=None):
except Exception as e:
logger.error(f"Unexpected error: {e}")
break
finally:
logger.info("You should not see this ...")


def _run(name, operations_callback, stream_stop_event=None):
Expand Down Expand Up @@ -157,7 +159,10 @@ def on_message_handler(message: firehose_models.MessageFrame) -> None:
client.update_params(models.ComAtprotoSyncSubscribeRepos.Params(cursor=commit.seq))
# Persist the new cursor in the database
with db.atomic():
SubscriptionState.update(cursor=commit.seq).where(SubscriptionState.service == name).execute()
try:
SubscriptionState.update(cursor=commit.seq).where(SubscriptionState.service == name).execute()
except Exception as e:
logger.error(f"Failed to update cursor: {e}")


if not commit.blocks:
Expand Down
5 changes: 3 additions & 2 deletions firehose/db_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ def start():

# Start Scheduler
scheduler = start_scheduler(client, schedule_hydration=True)
for job in scheduler.get_jobs():
job.modify(next_run_time=datetime.now()) # Trigger all jobs immediately

# for job in scheduler.get_jobs():
# job.modify(next_run_time=datetime.now()) # Trigger all jobs immediately

# Postgres database management functions
def clear_old_posts(clear_days: int):
Expand Down
25 changes: 10 additions & 15 deletions firehose/start_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,19 @@ def handle_termination(signum, frame):
stream_stop_event.set()
sys.exit(0)

for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGHUP]:
signal.signal(sig, handle_termination)
signal.signal(signal.SIGINT, handle_termination)

# Initialize Database
if db.is_closed():
db.connect()
db.create_tables([Post, SubscriptionState, SessionState])
logger.info("Database connected and tables created.")
db_scheduler.start()


data_stream.run(config.SERVICE_DID, operations_callback, stream_stop_event)
logger.info("firehose has exited")

while not stream_stop_event.is_set():
try:
# Initialize Database
if db.is_closed():
db.connect()
db.create_tables([Post, SubscriptionState, SessionState])
logger.info("Database connected and tables created.")

db_scheduler.start()
data_stream.run(config.SERVICE_DID, operations_callback, stream_stop_event)
except Exception as e:
logger.error(f"An exception occurred in the firehose: {e}")
logger.info("Restarting the firehose stream...")

if __name__ == '__main__':
main()
Expand Down
6 changes: 5 additions & 1 deletion firehose/utils/logger.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s |%(levelname)s| %(message)s',
datefmt='%H:%M:%S'
)

0 comments on commit a8889bd

Please sign in to comment.