From a35a4dcd1390c4fc42404b8c285ce1a0c9728a75 Mon Sep 17 00:00:00 2001 From: Richard Roberson Date: Sun, 15 Dec 2024 02:19:15 -0700 Subject: [PATCH] Remove scheduler from DB code --- firehose/data_filter.py | 3 +- firehose/database.py | 331 +-------------------------------------- firehose/db_scheduler.py | 326 ++++++++++++++++++++++++++++++++++++++ firehose/start_stream.py | 12 ++ 4 files changed, 342 insertions(+), 330 deletions(-) create mode 100644 firehose/db_scheduler.py diff --git a/firehose/data_filter.py b/firehose/data_filter.py index 6b80141..b87ec3e 100644 --- a/firehose/data_filter.py +++ b/firehose/data_filter.py @@ -3,7 +3,8 @@ from collections import defaultdict from atproto import models, Client, IdResolver from utils.logger import logger -from database import db, Post, init_client +from database import db, Post +from db_scheduler import init_client import json from pathlib import Path diff --git a/firehose/database.py b/firehose/database.py index 6cd2a75..4f889cf 100644 --- a/firehose/database.py +++ b/firehose/database.py @@ -1,16 +1,6 @@ from datetime import datetime, timedelta, timezone -from typing import Optional -import time - -from utils.logger import logger -from utils.config import HANDLE, PASSWORD, POSTGRES_DB, POSTGRES_PASSWORD, POSTGRES_USER +from utils.config import POSTGRES_DB, POSTGRES_PASSWORD, POSTGRES_USER import peewee -from apscheduler.schedulers.background import BackgroundScheduler -from apscheduler.jobstores.base import JobLookupError -from apscheduler.triggers.date import DateTrigger -from apscheduler.triggers.interval import IntervalTrigger -from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR -from atproto import Client, SessionEvent, Session, exceptions # Database setup db = peewee.PostgresqlDatabase(POSTGRES_DB, user=POSTGRES_USER, password=POSTGRES_PASSWORD, host='postgres', port=5432) @@ -35,321 +25,4 @@ class SubscriptionState(BaseModel): class SessionState(BaseModel): service = peewee.CharField(unique=True) - session_string = peewee.TextField(null=True) - -# Postgres database management functions -def clear_old_posts(clear_days: int): - try: - with db.connection_context(): - logger.info("Database connection opened for cleanup.") - cutoff_date = datetime.now() - timedelta(days=clear_days) - query = Post.delete().where(Post.indexed_at < cutoff_date) - - with db.atomic(): - num_deleted = query.execute() - - logger.info(f"Deleted {num_deleted} posts older than {cutoff_date}.") - except peewee.PeeweeException as e: - logger.error(f"An error occurred while deleting old posts: {e}") - finally: - if not db.is_closed(): - db.close() - logger.info("Database connection closed after cleanup by force.") - else: - logger.info("Database connection closed after cleanup.") - -def vacuum_database(): - try: - with db.connection_context(): - logger.info("Database connection opened for vacuum.") - db.execute_sql('VACUUM FULL;') - logger.info("Vacuum operation completed.") - except peewee.PeeweeException as e: - logger.error(f"An error occurred while vacuuming the database: {e}") - finally: - if not db.is_closed(): - db.close() - logger.info("Database connection closed after vacuum by force.") - else: - logger.info("Database connection closed after vacuum.") - -def cleanup_db(clear_days: int = 3): - clear_old_posts(clear_days) - vacuum_database() - -# Hydration Function with Rate Limit and Expired Token Handling -def hydrate_posts_with_interactions(client: Client, batch_size: int = 25): - try: - with db.connection_context(): - logger.info("Hydration Database connection opened.") - # get posts with uri and interactions - posts = Post.select(Post.uri, Post.interactions) - uris = [post.uri for post in posts] - - if not uris: - logger.info("No posts found in the database to hydrate.") - return - - # list to collect - posts_to_update = [] - - # Process URIs in batches - for i in range(0, len(uris), batch_size): - batch_uris = uris[i:i + batch_size] - try: - # Fetch posts from the API - fetched_posts = client.get_posts(uris=batch_uris) - fetched_posts = fetched_posts['posts'] - - for fetched_post in fetched_posts: - uri = fetched_post.uri - if not uri: - continue - - # Extract interaction counts - like_count = fetched_post.like_count - reply_count = fetched_post.reply_count - repost_count = fetched_post.repost_count - indexed_at_str = fetched_post.indexed_at - - # Convert indexed_at to datetime object - try: - indexed_at = datetime.fromisoformat(indexed_at_str) - if indexed_at.tzinfo is None: - # Assume UTC if timezone is not provided - indexed_at = indexed_at.replace(tzinfo=timezone.utc) - else: - indexed_at = indexed_at.astimezone(timezone.utc) - except Exception as e: - logger.error(f"Error parsing indexed_at for post {uri}: {e}") - continue - - # Calculate time difference in hours - time_diff = datetime.now(timezone.utc) - indexed_at - time_diff_hours = time_diff.total_seconds() / 3600 - - # Calculate "What's Hot" score - # Formula: hot_score = interactions / ( (age_in_hours + 2) ** 1.5 ) - # Adding 2 to avoid division by zero and to give a slight boost to newer posts - interactions_score = like_count + (reply_count * 2) + (repost_count * 3) - hot_score = interactions_score / ((time_diff_hours + 2) ** 1.5) - - # Round the hot_score to an integer - hot_score *= 100 # Scaling the score - hot_score = int(hot_score) - - # Fetch the current interaction score from the database - current_post = Post.get_or_none(Post.uri == uri) - if current_post and current_post.interactions != hot_score: - # Update the interactions in list for bulk update - current_post.interactions = hot_score - #logger.info(f"{current_post}") - posts_to_update.append(current_post) - - # pause the loop for 3 seconds - time.sleep(3) - - except exceptions.AtProtocolError as api_err: - if api_err.response: - status_code = api_err.response.status_code - if status_code == 429: - # Rate limited during hydration - reset_timestamp = api_err.response.headers.get('RateLimit-Reset') - if reset_timestamp: - reset_time = datetime.fromtimestamp(int(reset_timestamp), timezone.utc) - else: - reset_time = datetime.now(timezone.utc) + timedelta(seconds=60) # Default to 60 seconds - logger.warning(f"Rate limit exceeded during hydration. Next attempt at {reset_time} UTC.") - reschedule_hydration(reset_time, scheduler) - return # Exit to prevent further API calls - elif status_code == 400: - # Handle other specific status codes if necessary - logger.error(f"Hydration failed with status 400. Content: {api_err.response.content}") - # Optionally, implement additional error handling here - else: - logger.error(f"API error while fetching posts without response: {api_err}") - except Exception as e: - logger.error(f"Unexpected error while hydrating posts: {e}") - - if posts_to_update: - try: - with db.atomic(): - updated = Post.bulk_update(posts_to_update, fields=['interactions']) - - logger.info(f"Hydrated {updated} posts with updated hot_scores.") - except Exception as e: - logger.error(f"Failed to bulk update posts: {e}") - else: - logger.info("No posts needed updating based on the latest interactions.") - - except Exception as e: - logger.error(f"Error in hydration process: {e}") - - finally: - if not db.is_closed(): - db.close() - logger.info("Hydration Database connection closed by force.") - else : - logger.info("Hydration Database connection closed.") - -# Rescheduling Functions -def reschedule_hydration(reset_time: datetime, scheduler: BackgroundScheduler): - # Pause the interval hydrate_posts job to prevent further attempts - try: - scheduler.pause_job('hydrate_posts_interval') - logger.info("Paused the interval hydrate_posts job due to rate limiting.") - except JobLookupError: - logger.info("No interval hydrate_posts job to pause.") - - # Schedule a one-time hydrate_posts job at reset_time - try: - scheduler.remove_job('hydrate_posts_once') - except JobLookupError: - pass # No existing one-time job to remove - - # Initialize a new client instance for the scheduled job - client = init_client() - - scheduler.add_job( - hydrate_posts_with_interactions, - trigger=DateTrigger(run_date=reset_time), - args=[client], - id='hydrate_posts_once', - max_instances=1, - replace_existing=True - ) - logger.info(f"One-time hydrate_posts job scheduled to run at {reset_time} UTC.") - -# Scheduler Initialization -def start_scheduler(client: Client, schedule_hydration: bool = False) -> BackgroundScheduler: - scheduler = BackgroundScheduler() - scheduler.start() - logger.info("BackgroundScheduler instance created and started.") - - # Schedule cleanup_db to run daily at 8 AM UTC - scheduler.add_job( - cleanup_db, - trigger='cron', - hour=8, - args=[30], - id='cleanup_db', - max_instances=1, - replace_existing=True - ) - logger.info("Scheduled daily cleanup_db job at 8 AM UTC.") - - if schedule_hydration: - # Schedule hydrate_posts_with_interactions to run every 30 minutes - scheduler.add_job( - hydrate_posts_with_interactions, - trigger=IntervalTrigger(minutes=30), - args=[client], - id='hydrate_posts_interval', - max_instances=1, - coalesce=True, # If job is missed, run it immediately - replace_existing=True - ) - logger.info("Scheduled interval hydrate_posts_with_interactions job every 30 minutes.") - - # Add listener for job events - scheduler.add_listener(lambda event: hydration_job_listener(event, scheduler), EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) - logger.info("Added hydration_job_listener to the scheduler.") - - return scheduler - -# Job Listener -def hydration_job_listener(event, scheduler: BackgroundScheduler): - """ - Listener to detect when the one-time hydrate_posts job completes - and take appropriate actions. - """ - if event.job_id == 'hydrate_posts_once': - if event.exception: - logger.error("One-time hydrate_posts job failed.") - else: - logger.info("One-time hydrate_posts job completed successfully.") - try: - scheduler.resume_job('hydrate_posts_interval') - logger.info("Resumed the interval hydrate_posts job after one-time hydration.") - except JobLookupError: - logger.error("Interval hydrate_posts job not found to resume.") - -# Bsky Client Session Management Functions -def get_session() -> Optional[str]: - try: - session_entry = SessionState.get(SessionState.service == 'atproto') - return session_entry.session_string - except peewee.DoesNotExist: - return None - except peewee.PeeweeException as e: - logger.error(f"Error retrieving session from database: {e}") - return None - -def save_session(session_string: str) -> None: - try: - session_entry, created = SessionState.get_or_create(service='atproto') - session_entry.session_string = session_string - session_entry.save() - if created: - logger.info("New session entry created in the database.") - else: - logger.info("Session entry updated in the database.") - except peewee.PeeweeException as e: - logger.error(f"Error saving session to database: {e}") - -def on_session_change(event: SessionEvent, session: Session) -> None: - if event in (SessionEvent.CREATE, SessionEvent.REFRESH, SessionEvent.IMPORT): - logger.info(f"Session changed and saved: {event}") - save_session(session.export()) - -def init_client() -> Client: - client = Client() - - # Register the session change handler - client.on_session_change(on_session_change) - - # Attempt to load existing session from the database - session_string = get_session() - if session_string: - try: - client.login(session_string=session_string) - logger.info("Reused existing session from the database.") - except exceptions.AtProtocolError as e: - logger.error(f"Failed to login with existing session: {e}") - logger.info("Attempting to create a new session.") - client.login(HANDLE, PASSWORD) - else: - logger.info("No existing session found in the database. Creating a new session.") - client.login(HANDLE, PASSWORD) - - return client - -# Scheduler Shutdown Function -def shutdown_scheduler(scheduler: BackgroundScheduler): - try: - scheduler.shutdown(wait=False) - logger.info("Scheduler shutdown successfully.") - except Exception as e: - logger.error(f"Error during scheduler shutdown: {e}") - - try: - if not db.is_closed(): - db.close() - logger.info("Database connection closed.") - except peewee.PeeweeException as e: - logger.error(f"Error closing database: {e}") - -# Main section -# Initialize Database -if db.is_closed(): - db.connect() - db.create_tables([Post, SubscriptionState, SessionState]) - logger.info("Database connected and tables created.") - -# Initialize Client -client = init_client() - -# Start Scheduler -scheduler = start_scheduler(client, schedule_hydration=True) -for job in scheduler.get_jobs(): - job.modify(next_run_time=datetime.now()) # Trigger all jobs immediately \ No newline at end of file + session_string = peewee.TextField(null=True) \ No newline at end of file diff --git a/firehose/db_scheduler.py b/firehose/db_scheduler.py new file mode 100644 index 0000000..a538b2f --- /dev/null +++ b/firehose/db_scheduler.py @@ -0,0 +1,326 @@ +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.jobstores.base import JobLookupError +from apscheduler.triggers.date import DateTrigger +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR +from atproto import Client, SessionEvent, Session, exceptions +from datetime import datetime, timedelta, timezone +from typing import Optional +import peewee +import time + +from utils.logger import logger +from utils.config import HANDLE, PASSWORD +from database import db, Post, SessionState + +# Main Function +def start(): + # Initialize Client + client = init_client() + + # Start Scheduler + scheduler = start_scheduler(client, schedule_hydration=True) + for job in scheduler.get_jobs(): + job.modify(next_run_time=datetime.now()) # Trigger all jobs immediately + +# Postgres database management functions +def clear_old_posts(clear_days: int): + try: + with db.connection_context(): + logger.info("Database connection opened for cleanup.") + cutoff_date = datetime.now() - timedelta(days=clear_days) + query = Post.delete().where(Post.indexed_at < cutoff_date) + + with db.atomic(): + num_deleted = query.execute() + + logger.info(f"Deleted {num_deleted} posts older than {cutoff_date}.") + except peewee.PeeweeException as e: + logger.error(f"An error occurred while deleting old posts: {e}") + finally: + if not db.is_closed(): + db.close() + logger.info("Database connection closed after cleanup by force.") + else: + logger.info("Database connection closed after cleanup.") + +def vacuum_database(): + try: + with db.connection_context(): + logger.info("Database connection opened for vacuum.") + db.execute_sql('VACUUM FULL;') + logger.info("Vacuum operation completed.") + except peewee.PeeweeException as e: + logger.error(f"An error occurred while vacuuming the database: {e}") + finally: + if not db.is_closed(): + db.close() + logger.info("Database connection closed after vacuum by force.") + else: + logger.info("Database connection closed after vacuum.") + +def cleanup_db(clear_days: int = 3): + clear_old_posts(clear_days) + vacuum_database() + +# Hydration Function with Rate Limit and Expired Token Handling +def hydrate_posts_with_interactions(client: Client, batch_size: int = 25, scheduler: BackgroundScheduler = None): + try: + with db.connection_context(): + logger.info("Hydration Database connection opened.") + # get posts with uri and interactions + posts = Post.select(Post.uri, Post.interactions) + uris = [post.uri for post in posts] + + if not uris: + logger.info("No posts found in the database to hydrate.") + return + + # list to collect + posts_to_update = [] + + # Process URIs in batches + for i in range(0, len(uris), batch_size): + batch_uris = uris[i:i + batch_size] + try: + # Fetch posts from the API + fetched_posts = client.get_posts(uris=batch_uris) + fetched_posts = fetched_posts['posts'] + + for fetched_post in fetched_posts: + uri = fetched_post.uri + if not uri: + continue + + # Extract interaction counts + like_count = fetched_post.like_count + reply_count = fetched_post.reply_count + repost_count = fetched_post.repost_count + indexed_at_str = fetched_post.indexed_at + + # Convert indexed_at to datetime object + try: + indexed_at = datetime.fromisoformat(indexed_at_str) + if indexed_at.tzinfo is None: + # Assume UTC if timezone is not provided + indexed_at = indexed_at.replace(tzinfo=timezone.utc) + else: + indexed_at = indexed_at.astimezone(timezone.utc) + except Exception as e: + logger.error(f"Error parsing indexed_at for post {uri}: {e}") + continue + + # Calculate time difference in hours + time_diff = datetime.now(timezone.utc) - indexed_at + time_diff_hours = time_diff.total_seconds() / 3600 + + # Calculate "What's Hot" score + # Formula: hot_score = interactions / ( (age_in_hours + 2) ** 1.5 ) + # Adding 2 to avoid division by zero and to give a slight boost to newer posts + interactions_score = like_count + (reply_count * 2) + (repost_count * 3) + hot_score = interactions_score / ((time_diff_hours + 2) ** 1.5) + + # Round the hot_score to an integer + hot_score *= 100 # Scaling the score + hot_score = int(hot_score) + + # Fetch the current interaction score from the database + current_post = Post.get_or_none(Post.uri == uri) + if current_post and current_post.interactions != hot_score: + # Update the interactions in list for bulk update + current_post.interactions = hot_score + #logger.info(f"{current_post}") + posts_to_update.append(current_post) + + # pause the loop for 3 seconds + time.sleep(3) + + except exceptions.AtProtocolError as api_err: + if api_err.response: + status_code = api_err.response.status_code + if status_code == 429: + # Rate limited during hydration + reset_timestamp = api_err.response.headers.get('RateLimit-Reset') + if reset_timestamp: + reset_time = datetime.fromtimestamp(int(reset_timestamp), timezone.utc) + else: + reset_time = datetime.now(timezone.utc) + timedelta(seconds=60) # Default to 60 seconds + logger.warning(f"Rate limit exceeded during hydration. Next attempt at {reset_time} UTC.") + reschedule_hydration(reset_time, scheduler) + return # Exit to prevent further API calls + elif status_code == 400: + # Handle other specific status codes if necessary + logger.error(f"Hydration failed with status 400. Content: {api_err.response.content}") + # Optionally, implement additional error handling here + else: + logger.error(f"API error while fetching posts without response: {api_err}") + except Exception as e: + logger.error(f"Unexpected error while hydrating posts: {e}") + + if posts_to_update: + try: + with db.atomic(): + updated = Post.bulk_update(posts_to_update, fields=['interactions']) + + logger.info(f"Hydrated {updated} posts with updated hot_scores.") + except Exception as e: + logger.error(f"Failed to bulk update posts: {e}") + else: + logger.info("No posts needed updating based on the latest interactions.") + + except Exception as e: + logger.error(f"Error in hydration process: {e}") + + finally: + if not db.is_closed(): + db.close() + logger.info("Hydration Database connection closed by force.") + else : + logger.info("Hydration Database connection closed.") + +# Rescheduling Functions +def reschedule_hydration(reset_time: datetime, scheduler: BackgroundScheduler): + # Pause the interval hydrate_posts job to prevent further attempts + try: + scheduler.pause_job('hydrate_posts_interval') + logger.info("Paused the interval hydrate_posts job due to rate limiting.") + except JobLookupError: + logger.info("No interval hydrate_posts job to pause.") + + # Schedule a one-time hydrate_posts job at reset_time + try: + scheduler.remove_job('hydrate_posts_once') + except JobLookupError: + pass # No existing one-time job to remove + + # Initialize a new client instance for the scheduled job + client = init_client() + + scheduler.add_job( + hydrate_posts_with_interactions, + trigger=DateTrigger(run_date=reset_time), + args=[client], + id='hydrate_posts_once', + max_instances=1, + replace_existing=True + ) + logger.info(f"One-time hydrate_posts job scheduled to run at {reset_time} UTC.") + +# Scheduler Initialization +def start_scheduler(client: Client, schedule_hydration: bool = False) -> BackgroundScheduler: + scheduler = BackgroundScheduler() + scheduler.start() + logger.info("BackgroundScheduler instance created and started.") + + # Schedule cleanup_db to run daily at 8 AM UTC + scheduler.add_job( + cleanup_db, + trigger='cron', + hour=8, + args=[30], + id='cleanup_db', + max_instances=1, + replace_existing=True + ) + logger.info("Scheduled daily cleanup_db job at 8 AM UTC.") + + if schedule_hydration: + # Schedule hydrate_posts_with_interactions to run every 30 minutes + scheduler.add_job( + hydrate_posts_with_interactions, + trigger=IntervalTrigger(minutes=30), + args=[client,25,scheduler], + id='hydrate_posts_interval', + max_instances=1, + coalesce=True, # If job is missed, run it immediately + replace_existing=True + ) + logger.info("Scheduled interval hydrate_posts_with_interactions job every 30 minutes.") + + # Add listener for job events + scheduler.add_listener(lambda event: hydration_job_listener(event, scheduler), EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) + logger.info("Added hydration_job_listener to the scheduler.") + + return scheduler + +# Job Listener +def hydration_job_listener(event, scheduler: BackgroundScheduler): + """ + Listener to detect when the one-time hydrate_posts job completes + and take appropriate actions. + """ + if event.job_id == 'hydrate_posts_once': + if event.exception: + logger.error("One-time hydrate_posts job failed.") + else: + logger.info("One-time hydrate_posts job completed successfully.") + try: + scheduler.resume_job('hydrate_posts_interval') + logger.info("Resumed the interval hydrate_posts job after one-time hydration.") + except JobLookupError: + logger.error("Interval hydrate_posts job not found to resume.") + +# Bsky Client Session Management Functions +def get_session() -> Optional[str]: + try: + session_entry = SessionState.get(SessionState.service == 'atproto') + return session_entry.session_string + except peewee.DoesNotExist: + return None + except peewee.PeeweeException as e: + logger.error(f"Error retrieving session from database: {e}") + return None + +def save_session(session_string: str) -> None: + try: + session_entry, created = SessionState.get_or_create(service='atproto') + session_entry.session_string = session_string + session_entry.save() + if created: + logger.info("New session entry created in the database.") + else: + logger.info("Session entry updated in the database.") + except peewee.PeeweeException as e: + logger.error(f"Error saving session to database: {e}") + +def on_session_change(event: SessionEvent, session: Session) -> None: + if event in (SessionEvent.CREATE, SessionEvent.REFRESH, SessionEvent.IMPORT): + logger.info(f"Session changed and saved: {event}") + save_session(session.export()) + +def init_client() -> Client: + client = Client() + + # Register the session change handler + client.on_session_change(on_session_change) + + # Attempt to load existing session from the database + session_string = get_session() + if session_string: + try: + client.login(session_string=session_string) + logger.info("Reused existing session from the database.") + except exceptions.AtProtocolError as e: + logger.error(f"Failed to login with existing session: {e}") + logger.info("Attempting to create a new session.") + client.login(HANDLE, PASSWORD) + else: + logger.info("No existing session found in the database. Creating a new session.") + client.login(HANDLE, PASSWORD) + + return client + +# Scheduler Shutdown Function +def shutdown_scheduler(scheduler: BackgroundScheduler): + try: + scheduler.shutdown(wait=False) + logger.info("Scheduler shutdown successfully.") + except Exception as e: + logger.error(f"Error during scheduler shutdown: {e}") + + try: + if not db.is_closed(): + db.close() + logger.info("Database connection closed by force") + except peewee.PeeweeException as e: + logger.error(f"Error closing database: {e}") \ No newline at end of file diff --git a/firehose/start_stream.py b/firehose/start_stream.py index f0628c8..965b2ab 100644 --- a/firehose/start_stream.py +++ b/firehose/start_stream.py @@ -6,20 +6,32 @@ from utils.logger import logger import data_stream as data_stream from data_filter import operations_callback +from database import db, Post, SubscriptionState, SessionState +import db_scheduler as db_scheduler def main(): stream_stop_event = threading.Event() def handle_termination(signum, frame): logger.info(f'Received termination signal {signum}. Stopping firehose stream...') + db.close() stream_stop_event.set() sys.exit(0) for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGHUP]: signal.signal(sig, handle_termination) + + while not stream_stop_event.is_set(): try: + # Initialize Database + if db.is_closed(): + db.connect() + db.create_tables([Post, SubscriptionState, SessionState]) + logger.info("Database connected and tables created.") + + db_scheduler.start() data_stream.run(config.SERVICE_DID, operations_callback, stream_stop_event) except Exception as e: logger.error(f"An exception occurred in the firehose: {e}")