Skip to content

Commit

Permalink
feat: better db - everything
Browse files Browse the repository at this point in the history
  • Loading branch information
g0ldyy committed Feb 26, 2025
1 parent cf69538 commit 7f02c5e
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 183 deletions.
73 changes: 20 additions & 53 deletions comet/api/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def remove_ongoing_search_from_database(media_id: str):
async def is_first_search(media_id: str) -> bool:
try:
await database.execute(
"INSERT INTO first_searches (media_id, timestamp) VALUES (:media_id, :timestamp)",
"INSERT INTO first_searches VALUES (:media_id, :timestamp)",
{"media_id": media_id, "timestamp": time.time()},
)

Expand All @@ -46,7 +46,7 @@ async def background_scrape(
async with aiohttp.ClientSession() as new_session:
await torrent_manager.scrape_torrents(new_session)

if debrid_service != "torrent":
if debrid_service != "torrent" and len(torrent_manager.torrents) > 0:
await torrent_manager.get_and_cache_debrid_availability(new_session)

logger.log(
Expand All @@ -59,20 +59,6 @@ async def background_scrape(
await remove_ongoing_search_from_database(media_id)


async def background_availability_check(torrent_manager: TorrentManager, media_id: str):
try:
async with aiohttp.ClientSession() as new_session:
await torrent_manager.get_and_cache_debrid_availability(new_session)
logger.log(
"SCRAPER",
"📥 Background availability check complete!",
)
except Exception as e:
logger.log("SCRAPER", f"❌ Background availability check failed: {e}")
finally:
await remove_ongoing_search_from_database(media_id)


@streams.get("/stream/{media_type}/{media_id}.json")
@streams.get("/{b64config}/stream/{media_type}/{media_id}.json")
async def stream(
Expand Down Expand Up @@ -173,8 +159,7 @@ async def stream(
"SCRAPER",
f"📥 Scraped torrents: {len(torrent_manager.torrents)}",
)

elif is_first and has_cached_results:
elif is_first:
logger.log(
"SCRAPER",
f"🔄 Starting background scrape + availability check for {log_title}",
Expand All @@ -197,46 +182,30 @@ async def stream(
)

await torrent_manager.get_cached_availability()
if not has_cached_results and debrid_service != "torrent":
if (
(
not has_cached_results
or sum(
1
for torrent in torrent_manager.torrents.values()
if torrent["cached"]
)
== 0
)
and len(torrent_manager.torrents) > 0
and debrid_service != "torrent"
):
logger.log("SCRAPER", "🔄 Checking availability on debrid service...")
await torrent_manager.get_and_cache_debrid_availability(session)

cached_count = 0
if debrid_service != "torrent":
cached_count = sum(
1 for torrent in torrent_manager.torrents.values() if torrent["cached"]
)
logger.log(
"SCRAPER",
f"💾 Available cached torrents on {debrid_service}: {cached_count}/{len(torrent_manager.torrents)}",
)

if (
cached_count == 0
and len(torrent_manager.torrents) > 0
and not is_first
and debrid_service != "torrent"
):
logger.log(
"SCRAPER",
f"🔄 Starting background availability check for {log_title}",
)

await database.execute(
f"INSERT {'OR IGNORE ' if settings.DATABASE_TYPE == 'sqlite' else ''}INTO ongoing_searches VALUES (:media_id, :timestamp){' ON CONFLICT DO NOTHING' if settings.DATABASE_TYPE == 'postgresql' else ''}",
{"media_id": media_id, "timestamp": time.time()},
)

background_tasks.add_task(
background_availability_check, torrent_manager, media_id
)

cached_results.append(
{
"name": "[🔄] Comet",
"description": "Checking debrid availability in background - More results will be available in a few seconds...",
"url": "https://comet.fast",
}
f"💾 Available cached torrents on {debrid_service}: {cached_count}/{len(torrent_manager.torrents)}",
)

initial_torrent_count = len(torrent_manager.torrents)
Expand Down Expand Up @@ -270,7 +239,6 @@ async def stream(
}
)

is_not_offcloud = debrid_service != "offcloud"
result_season = season if season is not None else "n"
result_episode = episode if episode is not None else "n"

Expand Down Expand Up @@ -314,7 +282,7 @@ async def stream(
the_stream["sources"] = torrent["sources"]
else:
the_stream["url"] = (
f"{request.url.scheme}://{request.url.netloc}/{b64config}/playback/{info_hash}/{torrent['fileIndex'] if torrent['cached'] and is_not_offcloud else 'n'}/{title}/{result_season}/{result_episode}"
f"{request.url.scheme}://{request.url.netloc}/{b64config}/playback/{info_hash}/{torrent['fileIndex'] if torrent['cached'] and torrent['fileIndex'] is not None else 'n'}/{title}/{result_season}/{result_episode}"
)

if torrent["cached"]:
Expand Down Expand Up @@ -373,8 +341,8 @@ async def playback(

query = f"""
INSERT {"OR IGNORE " if settings.DATABASE_TYPE == "sqlite" else ""}
INTO download_links_cache (debrid_key, info_hash, name, season, episode, download_url, timestamp)
VALUES (:debrid_key, :info_hash, :name, :season, :episode, :download_url, :timestamp)
INTO download_links_cache
VALUES (:debrid_key, :info_hash, :season, :episode, :download_url, :timestamp)
{" ON CONFLICT DO NOTHING" if settings.DATABASE_TYPE == "postgresql" else ""}
"""

Expand All @@ -383,7 +351,6 @@ async def playback(
{
"debrid_key": config["debridApiKey"],
"info_hash": hash,
"name": name,
"season": season,
"episode": episode,
"download_url": download_url,
Expand Down
75 changes: 30 additions & 45 deletions comet/debrid/stremthru.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import aiohttp
import asyncio
import time
import orjson

from RTN import parse, title_match

from comet.utils.models import settings, database
from comet.utils.general import is_video, default_dump
from comet.utils.models import settings
from comet.utils.general import is_video
from comet.utils.debrid import cache_availability
from comet.utils.logger import logger
from comet.utils.torrent import torrent_update_queue

Expand Down Expand Up @@ -183,60 +182,46 @@ async def generate_download_link(
name_parsed = parse(name)
target_file = None

files = []
for file in magnet["data"]["files"]:
filename = file["name"]
file_parsed = parse(filename)

file_season = file_parsed.seasons[0] if file_parsed.seasons else None
file_episode = file_parsed.episodes[0] if file_parsed.episodes else None

if str(file["index"]) == index:
target_file = file
break
filename_parsed = parse(filename)

if not is_video(filename) or not title_match(
name_parsed.parsed_title, file_parsed.parsed_title
name_parsed.parsed_title, filename_parsed.parsed_title
):
continue

file_season = (
filename_parsed.seasons[0] if filename_parsed.seasons else None
)
file_episode = (
filename_parsed.episodes[0] if filename_parsed.episodes else None
)

file_info = {
"info_hash": hash,
"index": file["index"],
"title": filename,
"size": file["size"],
"season": file_season,
"episode": file_episode,
"parsed": filename_parsed,
}
files.append(file_info)

if str(file["index"]) == index:
target_file = file

if season == file_season and episode == file_episode:
target_file = file
break

if len(files) > 0:
asyncio.create_task(cache_availability(self.real_debrid_name, files))

if not target_file:
return

await database.execute(
f"""
INSERT {"OR IGNORE " if settings.DATABASE_TYPE == "sqlite" else ""}
INTO debrid_availability (debrid_service, info_hash, file_index, title, season, episode, size, parsed, timestamp)
VALUES (:debrid_service, :info_hash, :file_index, :title, :season, :episode, :size, :parsed, :timestamp)
{" ON CONFLICT DO NOTHING" if settings.DATABASE_TYPE == "postgresql" else ""}
""",
{
"debrid_service": self.real_debrid_name,
"info_hash": hash,
"file_index": str(target_file["index"]),
"title": target_file["name"],
"season": season,
"episode": episode,
"size": target_file["size"],
"parsed": orjson.dumps(file_parsed, default=default_dump).decode(
"utf-8"
),
"timestamp": time.time(),
},
)
# await file_index_update_queue.add_update(
# hash,
# season,
# episode,
# target_file["index"],
# target_file["name"],
# target_file["size"],
# parsed,
# )

link = await self.session.post(
f"{self.base_url}/link/generate?client_ip={self.client_ip}",
json={"link": target_file["link"]},
Expand Down
Loading

0 comments on commit 7f02c5e

Please sign in to comment.