Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: improve PyPI data ingestion and database operations
Browse files Browse the repository at this point in the history
stevenlei committed Jan 10, 2025
1 parent 63d8742 commit 0400725
Showing 5 changed files with 78 additions and 36 deletions.
9 changes: 4 additions & 5 deletions core/db.py
Original file line number Diff line number Diff line change
@@ -163,9 +163,7 @@ def _process_version(self, item: Dict[str, str]):
# but the original logic here tries to use the package's import_id as the version's import_id
# this is a temporary fix (for the implementation of pypi)
# to fix this completely, we need to update the logic of the crates transformer too
if item["package_id"]:
package_id = item["package_id"]
else:
if not (package_id := item.get("package_id")):
package_id = self.package_cache.get(item["import_id"])
if not package_id:
self.logger.warn(f"package {item['import_id']} not found")
@@ -228,8 +226,9 @@ def _process_depends_on(self, item: Dict[str, str]):
}

# Add dependency_type_id if provided
if "dependency_type_id" in item:
depends_on["dependency_type_id"] = item["dependency_type_id"]
depends_on.update(
{"dependency_type_id": item["dependency_type_id"]} if "dependency_type_id" in item else {}
)

return DependsOn(**depends_on).to_dict()

2 changes: 1 addition & 1 deletion core/models/__init__.py
Original file line number Diff line number Diff line change
@@ -196,7 +196,7 @@ def to_dict(self):
}

# if dependency_type_id is provided, include it
if hasattr(self, 'dependency_type_id') and self.dependency_type_id is not None:
if self.dependency_type_id is not None:
result["dependency_type_id"] = self.dependency_type_id

return result
73 changes: 52 additions & 21 deletions package_managers/pypi/fetcher.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import json
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Generator, List, Optional, Any
from html.parser import HTMLParser
from dataclasses import dataclass
from urllib.parse import urljoin
from datetime import datetime
from html.parser import HTMLParser
from typing import Generator, List, Optional
import multiprocessing
import json
import time
import os

import requests

@@ -43,7 +44,7 @@ def __init__(self, name: str, config: Config):
self.process_file = os.path.join(self.data_dir, "progress.json") # to store the progress of the fetch
self.packages_file = os.path.join(self.data_dir, "packages.txt") # a list of all package names

def _save_process(self, batch_num: int, downloaded: int, fetched: int, total: int):
def _save_process(self, batch_num: int, downloaded: int, fetched: int, total: int) -> None:
"""Save current process to progress.json"""
process = {
"batch_num": batch_num,
@@ -52,28 +53,36 @@ def _save_process(self, batch_num: int, downloaded: int, fetched: int, total: in
"total": total,
"timestamp": datetime.now().isoformat()
}
with open(self.process_file, 'w') as f:
json.dump(process, f, indent=2)
try:
with open(self.process_file, 'w') as f:
json.dump(process, f, indent=2)
except (IOError, OSError) as e:
self.logger.error(f"Failed to open process file for writing: {e}")
except json.JSONEncodeError as e:
self.logger.error(f"Failed to serialize process data to JSON: {e}")

def _load_process(self) -> tuple[int, int, int, int]:
"""Load process from progress.json if exists"""
try:
with open(self.process_file, 'r') as f:
with open(self.process_file) as f:
process = json.load(f)
return (
process["batch_num"],
process.get("downloaded", 0),
process.get("fetched", 0),
process["total"]
)
except (FileNotFoundError, json.JSONDecodeError):
except (FileNotFoundError, json.JSONDecodeError) as e:
self.logger.error(f"Failed to load process file: {e}")
return 0, 0, 0, 0

def _save_package_list(self, packages: List[str]):
"""Save package list to packages.txt"""
with open(self.packages_file, 'w') as f:
for package in packages:
f.write(f"{package}\n")
try:
with open(self.packages_file, 'w') as f:
f.writelines(f"{package}\n" for package in packages)
except (IOError, OSError) as e:
self.logger.error(f"Failed to open package list file for writing: {e}")

def _load_package_list(self) -> List[str]:
"""Load package list from packages.txt if exists"""
@@ -82,24 +91,37 @@ def _load_package_list(self) -> List[str]:
return [line.strip() for line in f if line.strip()]
except FileNotFoundError:
return []
except (IOError, OSError) as e:
self.logger.error(f"Failed to load package list file: {e}")
return []

def _get_package_list(self) -> List[str]:
"""Fetch list of all packages from PyPI simple index."""
url = f"{self.base_url}/simple/" # PyPI simple index
response = self.session.get(url)
response.raise_for_status()
url = urljoin(self.base_url, "simple") # PyPI simple index
try:
response = self.session.get(url)
response.raise_for_status()
except requests.RequestException as e:
self.logger.error(f"Failed to fetch package list from {url}")
self.logger.error(f"Status code: {response.status_code if 'response' in locals() else 'N/A'}")
self.logger.error(f"Response body: {response.text if 'response' in locals() else 'N/A'}")
self.logger.error(f"Error: {str(e)}")
return []

parser = SimpleIndexParser()
parser.feed(response.text)
return parser.packages

def _get_package_data(self, package_name: str) -> Optional[dict]:
def _get_package_data(self, package_name: str) -> dict[str, Any] | None:
"""Fetch JSON data for a specific package."""
# Remove any /simple/ prefix if present
package_name = package_name.replace('/simple/', '')
# Remove any trailing slash
package_name = package_name.rstrip('/')
url = f"{self.base_url}/pypi/{package_name}/json" # PyPI JSON API

base_api_url = urljoin(self.base_url, "pypi/")
package_url = urljoin(base_api_url, f"{package_name}/")
url = urljoin(package_url, "json")

try:
response = self.session.get(url)
@@ -138,7 +160,11 @@ def fetch(self) -> Generator[Data, None, None]:
6. Track progress in progress.json
"""
# Create data directory if needed
os.makedirs(self.data_dir, exist_ok=True)
try:
os.makedirs(self.data_dir, exist_ok=True)
except (IOError, OSError) as e:
self.logger.error(f"Failed to create data directory: {e}")
return

# Check if we need to resume
current_batch, downloaded, fetched, total = self._load_process()
@@ -193,8 +219,13 @@ def fetch(self) -> Generator[Data, None, None]:
file_name = f"{batch_num}.json"
file_path = os.path.join(self.data_dir, file_name)

with open(file_path, 'w') as f:
json.dump(results, f)
try:
with open(file_path, 'w') as f:
json.dump(results, f)
except (IOError, OSError) as e:
self.logger.error(f"Failed to open batch file for writing: {e}")
except json.JSONEncodeError as e:
self.logger.error(f"Failed to serialize batch data to JSON: {e}")

# Update progress
self._save_process(batch_num, downloaded, fetched, total_packages)
4 changes: 2 additions & 2 deletions package_managers/pypi/main.py
Original file line number Diff line number Diff line change
@@ -18,10 +18,10 @@ def fetch(config: Config) -> Generator[Data, None, None]:

if config.exec_config.fetch:
logger.log("📥 Fetching new data from PyPI...")
return fetcher.fetch()
yield from fetcher.fetch()
else:
logger.log("ℹ️ Skipping fetch (FETCH=false)")
return iter([]) # Return empty iterator
return # Empty generator


def run_pipeline(db: DB, config: Config) -> None:
26 changes: 19 additions & 7 deletions package_managers/pypi/transformer.py
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ def _read_batch_files(self) -> Generator[Dict, None, None]:
for batch_num in range(1, self.total_batches + 1):
file_path = os.path.join(self.data_dir, f"{batch_num}.json")
try:
with open(file_path, 'r') as f:
with open(file_path) as f:
packages = json.load(f)
self.logger.log(f"Processing batch {batch_num} with {len(packages)} packages")
for package in packages:
@@ -68,7 +68,10 @@ def packages(self) -> Generator[Dict[str, Any], None, None]:
for package_data in self._read_batch_files():
try:
info = package_data.get("info")
if not info:
try:
assert info, "Package info is missing"
except AssertionError as e:
self.logger.warn(str(e))
continue

name = info.get("name")
@@ -221,7 +224,10 @@ def versions(self) -> Generator[Dict[str, Any], None, None]:
for package_data in self._read_batch_files():
try:
info = package_data.get("info")
if not info:
try:
assert info, "Package info is missing"
except AssertionError as e:
self.logger.warn(str(e))
continue

name = info.get("name")
@@ -290,7 +296,10 @@ def dependencies(self) -> Generator[Dict[str, Any], None, None]:
for package_data in self._read_batch_files():
try:
info = package_data.get("info")
if not info:
try:
assert info, "Package info is missing"
except AssertionError as e:
self.logger.warn(str(e))
continue

name = info.get("name")
@@ -348,15 +357,18 @@ def dependencies(self) -> Generator[Dict[str, Any], None, None]:

def users(self) -> Generator[Dict[str, Any], None, None]:
"""Skip user data as we can't get GitHub info from PyPI API."""
return iter(())
if False: # This ensures it's a generator but never yields anything
yield {}

def user_packages(self) -> Generator[Dict[str, Any], None, None]:
"""Skip user-package relationships as we can't get GitHub info from PyPI API."""
return iter(())
if False:
yield {}

def user_versions(self) -> Generator[Dict[str, Any], None, None]:
"""Skip user-version relationships as we can't get GitHub info from PyPI API."""
return iter(())
if False:
yield {}

def _parse_dependency(self, req: str) -> tuple[str, str, str]:
"""Parse a dependency string into name, clean version, and version range.

0 comments on commit 0400725

Please sign in to comment.