diff --git a/comet/api/stream.py b/comet/api/stream.py index 8aa39ba..630efca 100644 --- a/comet/api/stream.py +++ b/comet/api/stream.py @@ -30,7 +30,7 @@ async def remove_ongoing_search_from_database(media_id: str): async def is_first_search(media_id: str) -> bool: try: await database.execute( - "INSERT INTO first_searches (media_id, timestamp) VALUES (:media_id, :timestamp)", + "INSERT INTO first_searches VALUES (:media_id, :timestamp)", {"media_id": media_id, "timestamp": time.time()}, ) @@ -46,7 +46,7 @@ async def background_scrape( async with aiohttp.ClientSession() as new_session: await torrent_manager.scrape_torrents(new_session) - if debrid_service != "torrent": + if debrid_service != "torrent" and len(torrent_manager.torrents) > 0: await torrent_manager.get_and_cache_debrid_availability(new_session) logger.log( @@ -59,20 +59,6 @@ async def background_scrape( await remove_ongoing_search_from_database(media_id) -async def background_availability_check(torrent_manager: TorrentManager, media_id: str): - try: - async with aiohttp.ClientSession() as new_session: - await torrent_manager.get_and_cache_debrid_availability(new_session) - logger.log( - "SCRAPER", - "📥 Background availability check complete!", - ) - except Exception as e: - logger.log("SCRAPER", f"❌ Background availability check failed: {e}") - finally: - await remove_ongoing_search_from_database(media_id) - - @streams.get("/stream/{media_type}/{media_id}.json") @streams.get("/{b64config}/stream/{media_type}/{media_id}.json") async def stream( @@ -173,8 +159,7 @@ async def stream( "SCRAPER", f"📥 Scraped torrents: {len(torrent_manager.torrents)}", ) - - elif is_first and has_cached_results: + elif is_first: logger.log( "SCRAPER", f"🔄 Starting background scrape + availability check for {log_title}", @@ -197,46 +182,30 @@ async def stream( ) await torrent_manager.get_cached_availability() - if not has_cached_results and debrid_service != "torrent": + if ( + ( + not has_cached_results + or sum( + 1 + for torrent in torrent_manager.torrents.values() + if torrent["cached"] + ) + == 0 + ) + and len(torrent_manager.torrents) > 0 + and debrid_service != "torrent" + ): logger.log("SCRAPER", "🔄 Checking availability on debrid service...") await torrent_manager.get_and_cache_debrid_availability(session) - cached_count = 0 if debrid_service != "torrent": cached_count = sum( 1 for torrent in torrent_manager.torrents.values() if torrent["cached"] ) - logger.log( - "SCRAPER", - f"💾 Available cached torrents on {debrid_service}: {cached_count}/{len(torrent_manager.torrents)}", - ) - if ( - cached_count == 0 - and len(torrent_manager.torrents) > 0 - and not is_first - and debrid_service != "torrent" - ): logger.log( "SCRAPER", - f"🔄 Starting background availability check for {log_title}", - ) - - await database.execute( - f"INSERT {'OR IGNORE ' if settings.DATABASE_TYPE == 'sqlite' else ''}INTO ongoing_searches VALUES (:media_id, :timestamp){' ON CONFLICT DO NOTHING' if settings.DATABASE_TYPE == 'postgresql' else ''}", - {"media_id": media_id, "timestamp": time.time()}, - ) - - background_tasks.add_task( - background_availability_check, torrent_manager, media_id - ) - - cached_results.append( - { - "name": "[🔄] Comet", - "description": "Checking debrid availability in background - More results will be available in a few seconds...", - "url": "https://comet.fast", - } + f"💾 Available cached torrents on {debrid_service}: {cached_count}/{len(torrent_manager.torrents)}", ) initial_torrent_count = len(torrent_manager.torrents) @@ -270,7 +239,6 @@ async def stream( } ) - is_not_offcloud = debrid_service != "offcloud" result_season = season if season is not None else "n" result_episode = episode if episode is not None else "n" @@ -314,7 +282,7 @@ async def stream( the_stream["sources"] = torrent["sources"] else: the_stream["url"] = ( - f"{request.url.scheme}://{request.url.netloc}/{b64config}/playback/{info_hash}/{torrent['fileIndex'] if torrent['cached'] and is_not_offcloud else 'n'}/{title}/{result_season}/{result_episode}" + f"{request.url.scheme}://{request.url.netloc}/{b64config}/playback/{info_hash}/{torrent['fileIndex'] if torrent['cached'] and torrent['fileIndex'] is not None else 'n'}/{title}/{result_season}/{result_episode}" ) if torrent["cached"]: @@ -373,8 +341,8 @@ async def playback( query = f""" INSERT {"OR IGNORE " if settings.DATABASE_TYPE == "sqlite" else ""} - INTO download_links_cache (debrid_key, info_hash, name, season, episode, download_url, timestamp) - VALUES (:debrid_key, :info_hash, :name, :season, :episode, :download_url, :timestamp) + INTO download_links_cache + VALUES (:debrid_key, :info_hash, :season, :episode, :download_url, :timestamp) {" ON CONFLICT DO NOTHING" if settings.DATABASE_TYPE == "postgresql" else ""} """ @@ -383,7 +351,6 @@ async def playback( { "debrid_key": config["debridApiKey"], "info_hash": hash, - "name": name, "season": season, "episode": episode, "download_url": download_url, diff --git a/comet/debrid/stremthru.py b/comet/debrid/stremthru.py index bca4b99..7df6889 100644 --- a/comet/debrid/stremthru.py +++ b/comet/debrid/stremthru.py @@ -1,12 +1,11 @@ import aiohttp import asyncio -import time -import orjson from RTN import parse, title_match -from comet.utils.models import settings, database -from comet.utils.general import is_video, default_dump +from comet.utils.models import settings +from comet.utils.general import is_video +from comet.utils.debrid import cache_availability from comet.utils.logger import logger from comet.utils.torrent import torrent_update_queue @@ -183,60 +182,46 @@ async def generate_download_link( name_parsed = parse(name) target_file = None + files = [] for file in magnet["data"]["files"]: filename = file["name"] - file_parsed = parse(filename) - - file_season = file_parsed.seasons[0] if file_parsed.seasons else None - file_episode = file_parsed.episodes[0] if file_parsed.episodes else None - - if str(file["index"]) == index: - target_file = file - break + filename_parsed = parse(filename) if not is_video(filename) or not title_match( - name_parsed.parsed_title, file_parsed.parsed_title + name_parsed.parsed_title, filename_parsed.parsed_title ): continue + file_season = ( + filename_parsed.seasons[0] if filename_parsed.seasons else None + ) + file_episode = ( + filename_parsed.episodes[0] if filename_parsed.episodes else None + ) + + file_info = { + "info_hash": hash, + "index": file["index"], + "title": filename, + "size": file["size"], + "season": file_season, + "episode": file_episode, + "parsed": filename_parsed, + } + files.append(file_info) + + if str(file["index"]) == index: + target_file = file + if season == file_season and episode == file_episode: target_file = file - break + + if len(files) > 0: + asyncio.create_task(cache_availability(self.real_debrid_name, files)) if not target_file: return - await database.execute( - f""" - INSERT {"OR IGNORE " if settings.DATABASE_TYPE == "sqlite" else ""} - INTO debrid_availability (debrid_service, info_hash, file_index, title, season, episode, size, parsed, timestamp) - VALUES (:debrid_service, :info_hash, :file_index, :title, :season, :episode, :size, :parsed, :timestamp) - {" ON CONFLICT DO NOTHING" if settings.DATABASE_TYPE == "postgresql" else ""} - """, - { - "debrid_service": self.real_debrid_name, - "info_hash": hash, - "file_index": str(target_file["index"]), - "title": target_file["name"], - "season": season, - "episode": episode, - "size": target_file["size"], - "parsed": orjson.dumps(file_parsed, default=default_dump).decode( - "utf-8" - ), - "timestamp": time.time(), - }, - ) - # await file_index_update_queue.add_update( - # hash, - # season, - # episode, - # target_file["index"], - # target_file["name"], - # target_file["size"], - # parsed, - # ) - link = await self.session.post( f"{self.base_url}/link/generate?client_ip={self.client_ip}", json={"link": target_file["link"]}, diff --git a/comet/scrapers/manager.py b/comet/scrapers/manager.py index 7288095..70679d4 100644 --- a/comet/scrapers/manager.py +++ b/comet/scrapers/manager.py @@ -16,6 +16,7 @@ from comet.utils.models import settings, database, CometSettingsModel from comet.utils.general import default_dump +from comet.utils.debrid import cache_availability, get_cached_availability from comet.debrid.manager import retrieve_debrid_availability from .zilean import get_zilean from .torrentio import get_torrentio @@ -145,27 +146,60 @@ async def get_cached_torrents(self): async def cache_torrents(self): current_time = time.time() - values = [ - { - "media_id": self.media_only_id, - "info_hash": torrent["infoHash"], - "file_index": torrent["fileIndex"], - "season": torrent["parsed"].seasons[0] + values = [] + + for torrent in self.ready_to_cache: + seasons = ( + torrent["parsed"].seasons if torrent["parsed"].seasons - else self.season, - "episode": torrent["parsed"].episodes[0] - if torrent["parsed"].episodes - else None, - "title": torrent["title"], - "seeders": torrent["seeders"], - "size": torrent["size"], - "tracker": torrent["tracker"], - "sources": orjson.dumps(torrent["sources"]).decode("utf-8"), - "parsed": orjson.dumps(torrent["parsed"], default_dump).decode("utf-8"), - "timestamp": current_time, - } - for torrent in self.ready_to_cache - ] + else [self.season] + ) + episodes = torrent["parsed"].episodes + + for season in seasons: + if len(episodes) == 0: + # season-only entry + values.append( + { + "media_id": self.media_only_id, + "info_hash": torrent["infoHash"], + "file_index": torrent["fileIndex"], + "season": season, + "episode": None, + "title": torrent["title"], + "seeders": torrent["seeders"], + "size": torrent["size"], + "tracker": torrent["tracker"], + "sources": orjson.dumps(torrent["sources"]).decode("utf-8"), + "parsed": orjson.dumps( + torrent["parsed"], default_dump + ).decode("utf-8"), + "timestamp": current_time, + } + ) + else: + # season and episode entries + for episode in episodes: + values.append( + { + "media_id": self.media_only_id, + "info_hash": torrent["infoHash"], + "file_index": torrent["fileIndex"], + "season": season, + "episode": episode, + "title": torrent["title"], + "seeders": torrent["seeders"], + "size": torrent["size"], + "tracker": torrent["tracker"], + "sources": orjson.dumps(torrent["sources"]).decode( + "utf-8" + ), + "parsed": orjson.dumps( + torrent["parsed"], default_dump + ).decode("utf-8"), + "timestamp": current_time, + } + ) query = f""" INSERT {"OR IGNORE " if settings.DATABASE_TYPE == "sqlite" else ""} @@ -276,9 +310,6 @@ def rank_torrents( ) async def get_and_cache_debrid_availability(self, session: aiohttp.ClientSession): - if self.debrid_service == "torrent" or len(self.torrents) == 0: - return - info_hashes = list(self.torrents.keys()) seeders_map = {hash: self.torrents[hash]["seeders"] for hash in info_hashes} @@ -300,7 +331,6 @@ async def get_and_cache_debrid_availability(self, session: aiohttp.ClientSession if len(availability) == 0: return - is_not_offcloud = self.debrid_service != "offcloud" for file in availability: season = file["season"] episode = file["episode"] @@ -312,43 +342,16 @@ async def get_and_cache_debrid_availability(self, session: aiohttp.ClientSession info_hash = file["info_hash"] self.torrents[info_hash]["cached"] = True - if is_not_offcloud: + if file["parsed"] is not None: self.torrents[info_hash]["parsed"] = file["parsed"] + if file["index"] is not None: self.torrents[info_hash]["fileIndex"] = file["index"] + if file["title"] is not None: self.torrents[info_hash]["title"] = file["title"] + if file["size"] is not None: self.torrents[info_hash]["size"] = file["size"] - asyncio.create_task(self._background_cache_availability(availability)) - - async def _background_cache_availability(self, availability: list): - current_time = time.time() - - is_not_offcloud = self.debrid_service != "offcloud" - values = [ - { - "debrid_service": self.debrid_service, - "info_hash": file["info_hash"], - "file_index": str(file["index"]) if is_not_offcloud else None, - "title": file["title"], - "season": file["season"], - "episode": file["episode"], - "size": file["size"], - "parsed": orjson.dumps(file["parsed"], default_dump).decode("utf-8") - if is_not_offcloud - else None, - "timestamp": current_time, - } - for file in availability - ] - - query = f""" - INSERT {"OR IGNORE " if settings.DATABASE_TYPE == "sqlite" else ""} - INTO debrid_availability (debrid_service, info_hash, file_index, title, season, episode, size, parsed, timestamp) - VALUES (:debrid_service, :info_hash, :file_index, :title, :season, :episode, :size, :parsed, :timestamp) - {" ON CONFLICT DO NOTHING" if settings.DATABASE_TYPE == "postgresql" else ""} - """ - - await database.execute_many(query, values) + asyncio.create_task(cache_availability(self.debrid_service, availability)) async def get_cached_availability(self): info_hashes = list(self.torrents.keys()) @@ -358,38 +361,21 @@ async def get_cached_availability(self): if self.debrid_service == "torrent" or len(self.torrents) == 0: return - query = f""" - SELECT info_hash, file_index, title, size, parsed - FROM debrid_availability - WHERE info_hash IN (SELECT cast(value as TEXT) FROM {"json_array_elements_text" if settings.DATABASE_TYPE == "postgresql" else "json_each"}(:info_hashes)) - AND debrid_service = :debrid_service - AND timestamp + :cache_ttl >= :current_time - """ - params = { - "info_hashes": orjson.dumps(info_hashes).decode("utf-8"), - "debrid_service": self.debrid_service, - "cache_ttl": settings.DEBRID_CACHE_TTL, - "current_time": time.time(), - } - if self.debrid_service != "offcloud": - query += """ - AND ((cast(:season as INTEGER) IS NULL AND season IS NULL) OR season = cast(:season as INTEGER)) - AND ((cast(:episode as INTEGER) IS NULL AND episode IS NULL) OR episode = cast(:episode as INTEGER)) - """ - params["season"] = self.season - params["episode"] = self.episode - - is_not_offcloud = self.debrid_service != "offcloud" + rows = await get_cached_availability( + self.debrid_service, info_hashes, self.season, self.episode + ) - rows = await database.fetch_all(query, params) for row in rows: info_hash = row["info_hash"] self.torrents[info_hash]["cached"] = True - if is_not_offcloud: + if row["parsed"] is not None: self.torrents[info_hash]["parsed"] = ParsedData( **orjson.loads(row["parsed"]) ) + if row["file_index"] is not None: self.torrents[info_hash]["fileIndex"] = row["file_index"] + if row["title"] is not None: self.torrents[info_hash]["title"] = row["title"] + if row["size"] is not None: self.torrents[info_hash]["size"] = row["size"] diff --git a/comet/utils/database.py b/comet/utils/database.py index 2e5cfb2..f55fa0a 100644 --- a/comet/utils/database.py +++ b/comet/utils/database.py @@ -151,7 +151,6 @@ async def setup_database(): CREATE TABLE IF NOT EXISTS download_links_cache ( debrid_key TEXT, info_hash TEXT, - name TEXT, season INTEGER, episode INTEGER, download_url TEXT, @@ -163,7 +162,7 @@ async def setup_database(): await database.execute( """ CREATE UNIQUE INDEX IF NOT EXISTS download_links_series_both_idx - ON download_links_cache (debrid_key, info_hash, name, season, episode) + ON download_links_cache (debrid_key, info_hash, season, episode) WHERE season IS NOT NULL AND episode IS NOT NULL """ ) @@ -171,7 +170,7 @@ async def setup_database(): await database.execute( """ CREATE UNIQUE INDEX IF NOT EXISTS download_links_season_only_idx - ON download_links_cache (debrid_key, info_hash, name, season) + ON download_links_cache (debrid_key, info_hash, season) WHERE season IS NOT NULL AND episode IS NULL """ ) @@ -179,7 +178,7 @@ async def setup_database(): await database.execute( """ CREATE UNIQUE INDEX IF NOT EXISTS download_links_episode_only_idx - ON download_links_cache (debrid_key, info_hash, name, episode) + ON download_links_cache (debrid_key, info_hash, episode) WHERE season IS NULL AND episode IS NOT NULL """ ) @@ -187,7 +186,7 @@ async def setup_database(): await database.execute( """ CREATE UNIQUE INDEX IF NOT EXISTS download_links_no_season_episode_idx - ON download_links_cache (debrid_key, info_hash, name) + ON download_links_cache (debrid_key, info_hash) WHERE season IS NULL AND episode IS NULL """ ) diff --git a/comet/utils/debrid.py b/comet/utils/debrid.py new file mode 100644 index 0000000..f3b3bff --- /dev/null +++ b/comet/utils/debrid.py @@ -0,0 +1,173 @@ +import time +import orjson + +from comet.utils.models import settings, database +from comet.utils.general import default_dump + + +async def cache_availability(debrid_service: str, availability: list): + current_time = time.time() + + values = [ + { + "debrid_service": debrid_service, + "info_hash": file["info_hash"], + "file_index": str(file["index"]) if file["index"] is not None else None, + "title": file["title"], + "season": file["season"], + "episode": file["episode"], + "size": file["size"] if file["size"] != -1 else None, + "parsed": orjson.dumps(file["parsed"], default_dump).decode("utf-8") + if file["parsed"] is not None + else None, + "timestamp": current_time, + } + for file in availability + ] + + if settings.DATABASE_TYPE == "sqlite": + query = """ + INSERT OR REPLACE + INTO debrid_availability + VALUES (:debrid_service, :info_hash, :file_index, :title, :season, :episode, :size, :parsed, :timestamp) + """ + await database.execute_many(query, values) + elif settings.DATABASE_TYPE == "postgresql": + both_values = [] + season_only_values = [] + episode_only_values = [] + no_season_episode_values = [] + + for val in values: + if val["season"] is not None and val["episode"] is not None: + both_values.append(val) + elif val["season"] is not None and val["episode"] is None: + season_only_values.append(val) + elif val["season"] is None and val["episode"] is not None: + episode_only_values.append(val) + else: + no_season_episode_values.append(val) + + # handle each case separately with appropriate ON CONFLICT clauses + if both_values: + query = """ + INSERT INTO debrid_availability + VALUES (:debrid_service, :info_hash, :file_index, :title, :season, :episode, :size, :parsed, :timestamp) + ON CONFLICT (debrid_service, info_hash, season, episode) + WHERE season IS NOT NULL AND episode IS NOT NULL + DO UPDATE SET + title = EXCLUDED.title, + file_index = EXCLUDED.file_index, + size = EXCLUDED.size, + parsed = EXCLUDED.parsed, + timestamp = EXCLUDED.timestamp + """ + await database.execute_many(query, both_values) + + if season_only_values: + query = """ + INSERT INTO debrid_availability + VALUES (:debrid_service, :info_hash, :file_index, :title, :season, :episode, :size, :parsed, :timestamp) + ON CONFLICT (debrid_service, info_hash, season) + WHERE season IS NOT NULL AND episode IS NULL + DO UPDATE SET + title = EXCLUDED.title, + file_index = EXCLUDED.file_index, + size = EXCLUDED.size, + parsed = EXCLUDED.parsed, + timestamp = EXCLUDED.timestamp + """ + await database.execute_many(query, season_only_values) + + if episode_only_values: + query = """ + INSERT INTO debrid_availability + VALUES (:debrid_service, :info_hash, :file_index, :title, :season, :episode, :size, :parsed, :timestamp) + ON CONFLICT (debrid_service, info_hash, episode) + WHERE season IS NULL AND episode IS NOT NULL + DO UPDATE SET + title = EXCLUDED.title, + file_index = EXCLUDED.file_index, + size = EXCLUDED.size, + parsed = EXCLUDED.parsed, + timestamp = EXCLUDED.timestamp + """ + await database.execute_many(query, episode_only_values) + + if no_season_episode_values: + query = """ + INSERT INTO debrid_availability + VALUES (:debrid_service, :info_hash, :file_index, :title, :season, :episode, :size, :parsed, :timestamp) + ON CONFLICT (debrid_service, info_hash) + WHERE season IS NULL AND episode IS NULL + DO UPDATE SET + title = EXCLUDED.title, + file_index = EXCLUDED.file_index, + size = EXCLUDED.size, + parsed = EXCLUDED.parsed, + timestamp = EXCLUDED.timestamp + """ + await database.execute_many(query, no_season_episode_values) + else: + query = """ + INSERT + INTO debrid_availability + VALUES (:debrid_service, :info_hash, :file_index, :title, :season, :episode, :size, :parsed, :timestamp) + """ + await database.execute_many(query, values) + + +async def get_cached_availability( + debrid_service: str, info_hashes: list, season: int = None, episode: int = None +): + base_query = f""" + SELECT info_hash, file_index, title, size, parsed + FROM debrid_availability + WHERE info_hash IN (SELECT cast(value as TEXT) FROM {"json_array_elements_text" if settings.DATABASE_TYPE == "postgresql" else "json_each"}(:info_hashes)) + AND debrid_service = :debrid_service + AND timestamp + :cache_ttl >= :current_time + """ + + params = { + "info_hashes": orjson.dumps(info_hashes).decode("utf-8"), + "debrid_service": debrid_service, + "cache_ttl": settings.DEBRID_CACHE_TTL, + "current_time": time.time(), + "season": season, + "episode": episode, + } + + if debrid_service == "offcloud": + query = ( + base_query + + """ + AND ((cast(:season as INTEGER) IS NULL AND season IS NULL) OR season = cast(:season as INTEGER)) + AND ((cast(:episode as INTEGER) IS NULL AND episode IS NULL) OR episode = cast(:episode as INTEGER)) + """ + ) + results = await database.fetch_all(query, params) + + found_hashes = {r["info_hash"] for r in results} + remaining_hashes = [h for h in info_hashes if h not in found_hashes] + + if remaining_hashes: + null_title_params = { + "info_hashes": orjson.dumps(remaining_hashes).decode("utf-8"), + "debrid_service": debrid_service, + "cache_ttl": settings.DEBRID_CACHE_TTL, + "current_time": time.time(), + } + null_title_query = base_query + " AND title IS NULL" + null_results = await database.fetch_all(null_title_query, null_title_params) + results.extend(null_results) + else: + query = ( + base_query + + """ + AND ((cast(:season as INTEGER) IS NULL AND season IS NULL) OR season = cast(:season as INTEGER)) + AND ((cast(:episode as INTEGER) IS NULL AND episode IS NULL) OR episode = cast(:episode as INTEGER)) + """ + ) + results = await database.fetch_all(query, params) + + return results