From 636774cb96755abfc39c0fc6e5885c9e760efb3e Mon Sep 17 00:00:00 2001 From: Luthfi Arifin Date: Tue, 1 Oct 2024 19:22:40 +0700 Subject: [PATCH] refactor: Update watch.py to use a while loop for continuous event watching and handling --- app/events/watch.py | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/app/events/watch.py b/app/events/watch.py index 996cbe8..aa440d2 100644 --- a/app/events/watch.py +++ b/app/events/watch.py @@ -1,16 +1,24 @@ from bson.json_util import dumps - from crud import insert_event_vector, update_event_vector, eventCollections + +def watch_events(): + while True: + try: + change_stream = eventCollections.watch() + for change in change_stream: + if change["operationType"] == "insert": + insert_event_vector(change["fullDocument"]) + elif change["operationType"] == "update": + print(change["documentKey"]["_id"]) + update_event_vector( + change["documentKey"]["_id"], + change["updateDescription"]["updatedFields"], + ) + print(dumps(change)) + except Exception as e: + print(f"Error occurred: {e}") + + if __name__ == "__main__": - change_stream = eventCollections.watch() - for change in change_stream: - if change["operationType"] == "insert": - insert_event_vector(change["fullDocument"]) - elif change["operationType"] == "update": - print(change["documentKey"]["_id"]) - update_event_vector( - change["documentKey"]["_id"], - change["updateDescription"]["updatedFields"], - ) - print(dumps(change)) + watch_events()