Skip to content

Commit

Permalink
handle stale caches
Browse files Browse the repository at this point in the history
  • Loading branch information
KTachibanaM committed Oct 22, 2024
1 parent a1518e9 commit 7d3902f
Showing 1 changed file with 19 additions and 1 deletion.
20 changes: 19 additions & 1 deletion rss_lambda/abstract_expensive_rss_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import os.path
import logging
import hashlib
import datetime
from typing import Any, List
from multiprocessing import Process
from lxml import etree
from .process_rss_text import process_rss_text, ParsedRssText

stale_cache_threshold_seconds = 5 * 60 # 5 minutes

_cache_root_path = os.path.join('cache')
os.makedirs(_cache_root_path, exist_ok=True)
Expand All @@ -28,6 +30,10 @@ def _read_cache(hash_key: str, suffix: str) -> str:
def _remove_cache(hash_key: str, suffix: str):
os.remove(_get_cache_path(hash_key, suffix))

def _cache_is_stale(hash_key: str, suffix: str) -> bool:
creation_time = datetime.datetime.fromtimestamp(os.path.getmtime(_get_cache_path(hash_key, suffix)))
return (datetime.datetime.now() - creation_time).total_seconds() > stale_cache_threshold_seconds

def _empty_list(rss_text: str) -> str:
def processor(parsed_rss_text: ParsedRssText):
parent = parsed_rss_text.parent
Expand Down Expand Up @@ -64,7 +70,7 @@ def abstract_expensive_rss_lambda(rss_text: str, expensive_operation, hash: str,
hash_key = h.hexdigest()

if not _cache_exists(hash_key, ORIGINAL_CACHE_SUFFIX):
# original cache does not exist, start processing (use processed cache as lock)
# original cache does not exist, start processing (use absence of processed cache as lock)
logging.info(f"(first processing) original cache does not exist for {hash}, start processing")
_write_cache(hash_key, ORIGINAL_CACHE_SUFFIX, rss_text)

Expand All @@ -77,6 +83,12 @@ def _process():
return _empty_list(rss_text)

if not _cache_exists(hash_key, PROCESSED_CACHE_SUFFIX):
if _cache_is_stale(hash_key, ORIGINAL_CACHE_SUFFIX):
# original cache is stale, remove and reprocess
logging.info(f"(first processing) original cache is stale for {hash}, removing")
_remove_cache(hash_key, ORIGINAL_CACHE_SUFFIX)
return _empty_list(rss_text)

# original cache exists but processed cache does not exist. it is being processed, return empty list.
logging.info(f"(first processing) processed cache does not exist for {hash} so it's still processing")
return _empty_list(rss_text)
Expand All @@ -88,6 +100,12 @@ def _process():
return processed_cache

if _cache_exists(hash_key, PROCESSING_LOCK_CACHE_SUFFIX):
if _cache_is_stale(hash_key, PROCESSING_LOCK_CACHE_SUFFIX):
# original cache exists but was updated and processing lock is stale, remove and reprocess
logging.info(f"original cache exists for {hash} but was updated and processing lock is stale, removing")
_remove_cache(hash_key, PROCESSING_LOCK_CACHE_SUFFIX)
return _empty_list(rss_text)

# original cache exists but was updated and is still processing, return processed cache
logging.info(f"original cache exists for {hash} but was updated and is still processing")
return processed_cache
Expand Down

0 comments on commit 7d3902f

Please sign in to comment.