diff --git a/grizzly/common/bugzilla.py b/grizzly/common/bugzilla.py index 3221e893..15b20d3c 100644 --- a/grizzly/common/bugzilla.py +++ b/grizzly/common/bugzilla.py @@ -8,9 +8,10 @@ from pathlib import Path from shutil import rmtree from tempfile import mkdtemp +from typing import Any, Generator, List, Optional, Tuple from zipfile import ZipFile -from bugsy import Bugsy +from bugsy import Bug, Bugsy from bugsy.errors import BugsyException from requests.exceptions import ConnectionError as RequestsConnectionError @@ -26,15 +27,15 @@ class BugzillaBug: __slots__ = ("_bug", "_data") - def __init__(self, bug): + def __init__(self, bug: Bug) -> None: self._bug = bug self._data = Path(mkdtemp(prefix=f"bug{bug.id}-", dir=grz_tmp("bugzilla"))) self._fetch_attachments() - def __enter__(self): + def __enter__(self) -> "BugzillaBug": return self - def __exit__(self, *exc): + def __exit__(self, *exc: Any) -> None: self.cleanup() def _fetch_attachments(self) -> None: @@ -62,7 +63,7 @@ def _fetch_attachments(self) -> None: continue (self._data / attachment.file_name).write_bytes(data) - def _unpack_archives(self): + def _unpack_archives(self) -> None: """Unpack and remove archives. Arguments: @@ -80,14 +81,16 @@ def _unpack_archives(self): entry.unlink() # TODO: add support for other archive types - def assets(self, ignore=None): + def assets( + self, ignore: Optional[Tuple[str]] = None + ) -> Generator[Tuple[str, Path], None, None]: """Scan files for assets. Arguments: - ignore (list(str)): Assets not to include in output. + ignore: Assets not to include in output. Yields: - tuple(str, Path): Name and path to asset. + Asset name and path. """ for asset, file in KNOWN_ASSETS.items(): if not ignore or asset not in ignore: @@ -95,7 +98,7 @@ def assets(self, ignore=None): if asset_path.is_file(): yield asset, asset_path - def cleanup(self): + def cleanup(self) -> None: """Remove attachment data. Arguments: @@ -107,11 +110,11 @@ def cleanup(self): rmtree(self._data) @classmethod - def load(cls, bug_id): + def load(cls, bug_id: int) -> Optional["BugzillaBug"]: """Load bug information from a Bugzilla instance. Arguments: - bug_id (int): Bug to load. + bug_id: Bug to load. Returns: BugzillaBug @@ -128,14 +131,14 @@ def load(cls, bug_id): LOG.error("Unable to connect to %r (%s)", bugzilla.bugzilla_url, exc) return None - def testcases(self): + def testcases(self) -> List[Path]: """Create a list of potential test cases. Arguments: None Returns: - list(Path): Files and directories that could potentially be test cases. + Files and directories that could potentially be test cases. """ # unpack archives self._unpack_archives() diff --git a/grizzly/common/fuzzmanager.py b/grizzly/common/fuzzmanager.py index 5df915cc..878e9083 100644 --- a/grizzly/common/fuzzmanager.py +++ b/grizzly/common/fuzzmanager.py @@ -9,6 +9,7 @@ from re import search from shutil import copyfileobj, rmtree from tempfile import NamedTemporaryFile, mkdtemp +from typing import Any, Dict, Generator, List, Optional, Tuple, cast from zipfile import BadZipFile, ZipFile from Collector.Collector import Collector @@ -23,177 +24,11 @@ LOG = getLogger(__name__) -class Bucket: - """Get Bucket data for a specified CrashManager bucket.""" - - def __init__(self, bucket_id): - """Initialize a Bucket instance. - - Arguments: - bucket_id (int): ID of the requested bucket on the server side - """ - assert isinstance(bucket_id, int) - self._bucket_id = bucket_id - self._sig_filename = None - self._coll = Collector() - self._url = ( - f"{self._coll.serverProtocol}://{self._coll.serverHost}:" - f"{self._coll.serverPort}/crashmanager/rest/buckets/{bucket_id}/" - ) - self._data = None - - @property - def bucket_id(self): - return self._bucket_id - - def __enter__(self): - return self - - def __exit__(self, *exc): - self.cleanup() - - def __getattr__(self, name): - if self._data is None: - self._data = self._coll.get(self._url).json() - if name not in self._data: - raise AttributeError( - f"'{type(self).__name__,}' object has no attribute '{name}'" - f" (has: {list(self._data)})" - ) - return self._data[name] - - def __setattr__(self, name, value): - if name.startswith("_"): - super().__setattr__(name, value) - return - raise AttributeError("can't set attribute") - - def cleanup(self): - """Cleanup any resources held by this instance. - - Arguments: - None - - Returns: - None - """ - if self._sig_filename is not None: - rmtree(self._sig_filename.parent) - - def iter_crashes(self, quality_filter=None): - """Fetch all crash IDs for this FuzzManager bucket. - Only crashes with testcases are returned. - - Arguments: - quality_filter (int): Filter crashes by quality value (None for all) - - Returns: - generator: generator of CrashEntry - """ - - def _get_results(endpoint, params=None): - """ - Function to get paginated results from FuzzManager - - Args: - endpoint (str): FuzzManager REST API to query (eg. "crashes"). - params (dict): Params to pass through to requests.get - - Returns: - generator: objects returned by FuzzManager (as dicts) - """ - LOG.debug("first request to /%s/", endpoint) - - url = ( - f"{self._coll.serverProtocol}://{self._coll.serverHost}:" - f"{self._coll.serverPort}/crashmanager/rest/{endpoint}/" - ) - - response = self._coll.get(url, params=params).json() - - while True: - LOG.debug( - "got %d/%d %s", - len(response["results"]), - response["count"], - endpoint, - ) - while response["results"]: - yield response["results"].pop() - - if response["next"] is None: - break - - LOG.debug("next request to /%s/", endpoint) - response = self._coll.get(response["next"]).json() - - # Get all crashes for bucket - query_args = [ - ("op", "AND"), - ("bucket", self.bucket_id), - ] - if quality_filter is not None: - query_args.append(("testcase__quality", quality_filter)) - query = json.dumps(dict(query_args)) - - n_yielded = 0 - for crash in _get_results( - "crashes", params={"query": query, "include_raw": "0"} - ): - if not crash["testcase"]: - LOG.warning("crash %d has no testcase, skipping", crash["id"]) - continue - - n_yielded += 1 - LOG.debug("yielding crash #%d", n_yielded) - result = CrashEntry(crash["id"]) - result._data = crash # pylint: disable=protected-access - yield result - - def signature_path(self): - """Download the bucket data from CrashManager. - - Arguments: - None - - Returns: - Path: Path on disk where signature exists. - """ - if self._sig_filename is not None: - return self._sig_filename - - tmpd = Path( - mkdtemp(prefix=f"bucket-{self._bucket_id}-", dir=grz_tmp("fuzzmanager")) - ) - try: - sig_basename = f"{self._bucket_id}.signature" - sig_filename = tmpd / sig_basename - sig_filename.write_text(self.signature) - sigmeta_filename = sig_filename.with_suffix(".metadata") - sigmeta_filename.write_text( - json.dumps( - { - "size": self.size, - "frequent": self.frequent, - "shortDescription": self.shortDescription, - "testcase__quality": self.best_quality, - } - ) - ) - tmpd = None - finally: # pragma: no cover - if tmpd: - rmtree(tmpd) - - self._sig_filename = sig_filename - return self._sig_filename - - class CrashEntry: """Get the CrashEntry data for the specified CrashManager crash. Attributes: - crash_id (int): the server ID for the crash + crash_id: the server ID for the crash see crashmanager.serializers.CrashEntrySerializer """ @@ -209,41 +44,42 @@ class CrashEntry: "_url", ) - def __init__(self, crash_id): + def __init__(self, crash_id: int) -> None: """Initialize CrashEntry. Arguments: - crash_id (int): ID of the requested crash on the server side + crash_id: ID of the requested crash on the server side """ assert isinstance(crash_id, int) self._crash_id = crash_id self._coll = Collector() - self._contents = None - self._data = None - self._storage = None - self._sig_filename = None + self._contents: Optional[List[Path]] = None + self._data: Optional[Dict[str, Any]] = None + self._storage: Optional[Path] = None + self._sig_filename: Optional[Path] = None self._url = ( f"{self._coll.serverProtocol}://{self._coll.serverHost}:" f"{self._coll.serverPort}/crashmanager/rest/crashes/{crash_id}/" ) @property - def crash_id(self): + def crash_id(self) -> int: return self._crash_id - def __enter__(self): + def __enter__(self) -> "CrashEntry": return self - def __exit__(self, *exc): + def __exit__(self, *exc: Any) -> None: self.cleanup() - def __getattr__(self, name): + def __getattr__(self, name: str) -> Any: if self._data is None or (name in self.RAW_FIELDS and name not in self._data): need_raw = "1" if name in self.RAW_FIELDS else "0" # TODO: handle 403 and 404? - self._data = self._coll.get( - self._url, params={"include_raw": need_raw} - ).json() + self._data = cast( + Dict[str, Any], + self._coll.get(self._url, params={"include_raw": need_raw}).json(), + ) if name not in self._data: raise AttributeError( f"'{type(self).__name__}' object has no attribute '{name}' " @@ -251,7 +87,7 @@ def __getattr__(self, name): ) return self._data[name] - def __setattr__(self, name, value): + def __setattr__(self, name: str, value: Any) -> None: if name.startswith("_"): super().__setattr__(name, value) return @@ -261,7 +97,7 @@ def __setattr__(self, name, value): if self._data: self._data[name] = value - def cleanup(self): + def cleanup(self) -> None: """Cleanup any resources held by this instance. Arguments: @@ -276,16 +112,16 @@ def cleanup(self): rmtree(self._sig_filename.parent) @staticmethod - def _subset(tests, subset): + def _subset(tests: List[Path], subset: List[int]) -> List[Path]: """Select a subset of tests directories. Subset values are sanitized to avoid raising. Arguments: - tests list(Path): Directories on disk where testcases exists. - subset list(int): Indices of corresponding directories to select. + tests: Directories on disk where testcases exists. + subset: Indices of corresponding directories to select. Returns: - list(Path): Directories that have been selected. + Directories that have been selected. """ assert isinstance(subset, list) assert tests @@ -296,15 +132,17 @@ def _subset(tests, subset): # build list of items to preserve return [tests[i] for i in sorted(keep)] - def testcases(self, subset=None, ext=None): + def testcases( + self, subset: Optional[List[int]] = None, ext: Optional[str] = None + ) -> List[Path]: """Download the testcase data from CrashManager. Arguments: - subset list(int): Indices of corresponding directories to select. - ext (str): Overwrite file extension used with downloaded testcase. + subset: Indices of corresponding directories to select. + ext: Overwrite file extension used with downloaded testcase. Returns: - list(Path): Directories on disk where testcases exists. + Directories on disk where testcases exists. """ if self._contents is None: assert self._storage is None @@ -355,6 +193,7 @@ def testcases(self, subset=None, ext=None): ext = file_name.split(".")[-1] or "html" else: ext = "html" + assert ext is not None if ext.lower().endswith("zip"): LOG.error("Error loading test case: %s", exc) self._contents = [] @@ -370,19 +209,20 @@ def testcases(self, subset=None, ext=None): return self._subset(self._contents, subset) return self._contents - def create_signature(self, binary): + def create_signature(self, binary: Path) -> Path: """Create a CrashManager signature from this crash. If self.bucket is set, self.bucket.signature_path() should be used instead. Arguments: - binary (Path): binary location, needed to create a program configuration + binary: binary location, needed to create a program configuration Returns: - Path: Path on disk where signature exists, or None if could not create. + Signature file. """ if self._sig_filename is not None: return self._sig_filename + success = False tmpd = Path( mkdtemp(prefix=f"crash-sig-{self._crash_id}-", dir=grz_tmp("fuzzmanager")) ) @@ -410,9 +250,180 @@ def create_signature(self, binary): } ) ) - tmpd = None + success = True + finally: # pragma: no cover + if not success: + rmtree(tmpd) + + self._sig_filename = sig_filename + return self._sig_filename + + +class Bucket: + """Get Bucket data for a specified CrashManager bucket.""" + + def __init__(self, bucket_id: int) -> None: + """Initialize a Bucket instance. + + Arguments: + bucket_id: ID of the requested bucket on the server. + """ + assert isinstance(bucket_id, int) + self._bucket_id = bucket_id + self._sig_filename: Optional[Path] = None + self._coll = Collector() + self._url = ( + f"{self._coll.serverProtocol}://{self._coll.serverHost}:" + f"{self._coll.serverPort}/crashmanager/rest/buckets/{bucket_id}/" + ) + self._data: Optional[Dict[str, Any]] = None + + @property + def bucket_id(self) -> int: + return self._bucket_id + + def __enter__(self) -> "Bucket": + return self + + def __exit__(self, *exc: Any) -> None: + self.cleanup() + + def __getattr__(self, name: str) -> Any: + if self._data is None: + self._data = self._coll.get(self._url).json() + if name not in self._data: + raise AttributeError( + f"'{type(self).__name__,}' object has no attribute '{name}'" + f" (has: {list(self._data)})" + ) + return self._data[name] + + def __setattr__(self, name: str, value: Any) -> None: + if name.startswith("_"): + super().__setattr__(name, value) + return + raise AttributeError("can't set attribute") + + def cleanup(self) -> None: + """Cleanup any resources held by this instance. + + Arguments: + None + + Returns: + None + """ + if self._sig_filename is not None: + rmtree(self._sig_filename.parent) + + def iter_crashes( + self, quality_filter: Optional[int] = None + ) -> Generator[CrashEntry, None, None]: + """Fetch all crash IDs for this FuzzManager bucket. + Only crashes with testcases are returned. + + Arguments: + quality_filter: Filter crashes by quality value (None for all) + + Yields: + CrashEntry objects. + """ + + def _get_results( + endpoint: str, params: Optional[Dict[str, str]] = None + ) -> Generator[Dict[str, Any], None, None]: + """ + Function to get paginated results from FuzzManager + + Args: + endpoint: FuzzManager REST API to query (eg. "crashes"). + params: Params to pass through to requests.get. + + Returns: + Objects (dict) returned by FuzzManager. + """ + LOG.debug("first request to /%s/", endpoint) + + url = ( + f"{self._coll.serverProtocol}://{self._coll.serverHost}:" + f"{self._coll.serverPort}/crashmanager/rest/{endpoint}/" + ) + + response: Dict[str, Any] = self._coll.get(url, params=params).json() + + while True: + LOG.debug( + "got %d/%d %s", + len(response["results"]), + response["count"], + endpoint, + ) + while response["results"]: + yield response["results"].pop() + + if response["next"] is None: + break + + LOG.debug("next request to /%s/", endpoint) + response = self._coll.get(response["next"]).json() + + # Get all crashes for bucket + query_args = [ + ("op", "AND"), + ("bucket", self.bucket_id), + ] + if quality_filter is not None: + query_args.append(("testcase__quality", quality_filter)) + query = json.dumps(dict(query_args)) + + n_yielded = 0 + for crash in _get_results( + "crashes", params={"query": query, "include_raw": "0"} + ): + if not crash["testcase"]: + LOG.warning("crash %d has no testcase, skipping", crash["id"]) + continue + + n_yielded += 1 + LOG.debug("yielding crash #%d", n_yielded) + result = CrashEntry(cast(int, crash["id"])) + result._data = crash # pylint: disable=protected-access + yield result + + def signature_path(self) -> Path: + """Download the bucket data from CrashManager. + + Arguments: + None + + Returns: + Signature file. + """ + if self._sig_filename is not None: + return self._sig_filename + + success = False + tmpd = Path( + mkdtemp(prefix=f"bucket-{self._bucket_id}-", dir=grz_tmp("fuzzmanager")) + ) + try: + sig_basename = f"{self._bucket_id}.signature" + sig_filename = tmpd / sig_basename + sig_filename.write_text(self.signature) + sigmeta_filename = sig_filename.with_suffix(".metadata") + sigmeta_filename.write_text( + json.dumps( + { + "size": self.size, + "frequent": self.frequent, + "shortDescription": self.shortDescription, + "testcase__quality": self.best_quality, + } + ) + ) + success = True finally: # pragma: no cover - if tmpd: + if not success: rmtree(tmpd) self._sig_filename = sig_filename @@ -420,15 +431,17 @@ def create_signature(self, binary): @contextmanager -def load_fm_data(crash_id, load_bucket=False): +def load_fm_data( + crash_id: int, load_bucket: bool = False +) -> Generator[Tuple[CrashEntry, Optional[Bucket]], None, None]: """Load CrashEntry including Bucket from FuzzManager. Arguments: - crash_id (int): Crash ID to load. - load_bucket (bool): Attempt to load bucket. + crash_id: Crash ID to load. + load_bucket: Attempt to load bucket. Yields: - 2-tuple(CrashEntry, Bucket): Data loaded from FuzzManager. + Data loaded from FuzzManager. """ with CrashEntry(crash_id) as crash: # load signature if needed diff --git a/grizzly/reduce/crash.py b/grizzly/reduce/crash.py index 3a718be0..039515f2 100644 --- a/grizzly/reduce/crash.py +++ b/grizzly/reduce/crash.py @@ -1,6 +1,7 @@ # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. +from argparse import Namespace from logging import getLogger from ..common.fuzzmanager import load_fm_data @@ -14,14 +15,14 @@ LOG = getLogger(__name__) -def main(args): +def main(args: Namespace) -> int: """CLI for `grizzly.reduce.crash`. Arguments: - args (argparse.Namespace): Result from `ReduceArgs.parse_args`. + args: Result from `ReduceArgs.parse_args`. Returns: - int: 0 for success. non-0 indicates a problem. + 0 for success. non-0 indicates a problem. """ configure_logging(args.log_level) with load_fm_data(args.input, load_bucket=not args.sig) as (crash, bucket): diff --git a/grizzly/replay/bucket.py b/grizzly/replay/bucket.py index fbfc523e..272a6bff 100644 --- a/grizzly/replay/bucket.py +++ b/grizzly/replay/bucket.py @@ -1,7 +1,9 @@ # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. +from argparse import Namespace from logging import getLogger +from typing import Callable from ..common.fuzzmanager import Bucket from ..common.utils import Exit @@ -12,15 +14,15 @@ LOG = getLogger(__name__) -def bucket_main(args, tool_main): +def bucket_main(args: Namespace, tool_main: Callable[[Namespace], int]) -> int: """CLI for `grizzly.reduce.bucket` and `grizzly.replay.bucket`. Arguments: - args (argparse.Namespace): Result from `parse_args()`. - tool_main (callable): Main function from a supported Grizzly tool. + args: Result from `parse_args()`. + tool_main: Main function from a supported Grizzly tool. Returns: - int: 0 for success. non-0 indicates a problem. + 0 for success. non-0 indicates a problem. """ assert callable(tool_main) configure_logging(args.log_level) diff --git a/grizzly/replay/bugzilla.py b/grizzly/replay/bugzilla.py index af47e7df..d875472f 100644 --- a/grizzly/replay/bugzilla.py +++ b/grizzly/replay/bugzilla.py @@ -1,6 +1,7 @@ # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. +from argparse import Namespace from logging import getLogger from ..common.bugzilla import BugzillaBug @@ -11,14 +12,14 @@ LOG = getLogger(__name__) -def main(args): +def main(args: Namespace) -> int: """CLI for `grizzly.replay.bugzilla`. Arguments: - args (argparse.Namespace): Result from `ReplayArgs.parse_args`. + args: Result from `ReplayArgs.parse_args`. Returns: - int: 0 for success. non-0 indicates a problem. + 0 for success. non-0 indicates a problem. """ configure_logging(args.log_level) bug = BugzillaBug.load(args.input) diff --git a/grizzly/replay/crash.py b/grizzly/replay/crash.py index cf00ea61..f98b8557 100644 --- a/grizzly/replay/crash.py +++ b/grizzly/replay/crash.py @@ -1,10 +1,12 @@ # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. -import json +from argparse import Namespace +from json import loads from logging import getLogger +from typing import Optional -from ..common.fuzzmanager import load_fm_data +from ..common.fuzzmanager import Bucket, CrashEntry, load_fm_data from ..main import configure_logging from .args import ReplayFuzzManagerIDArgs from .replay import ReplayManager @@ -12,14 +14,14 @@ LOG = getLogger(__name__) -def main(args): +def main(args: Namespace) -> int: """CLI for `grizzly.replay.crash`. Arguments: - args (argparse.Namespace): Result from `ReplayArgs.parse_args`. + args: Result from `ReplayArgs.parse_args`. Returns: - int: 0 for success. non-0 indicates a problem. + 0 for success. non-0 indicates a problem. """ configure_logging(args.log_level) with load_fm_data(args.input, load_bucket=not args.sig) as (crash, bucket): @@ -28,21 +30,25 @@ def main(args): return ReplayManager.main(modify_args(args, crash, bucket)) -def modify_args(args, crash, bucket): +def modify_args( + args: Namespace, crash: CrashEntry, bucket: Optional[Bucket] +) -> Namespace: """ Arguments: - args (argparse.Namespace): Result from `ReplayArgs.parse_args`. + args: Result from `ReplayArgs.parse_args`. + crash: Crash entry to process. + bucket: Bucket that contains crash. Returns: - args (argparse.Namespace): Modified arguments. + Modified arguments. """ # if we did not pass --sig, and the crash is not bucketed # auto-generate a signature from crash data so we know what to expect if not args.sig and not bucket: try: fm_sig_file = crash.create_signature(args.binary) - meta = json.loads(fm_sig_file.with_suffix(".metadata").read_text()) + meta = loads(fm_sig_file.with_suffix(".metadata").read_text()) LOG.info( "Using crash data to generate signature: %s", meta["shortDescription"],