diff --git a/test/test_source.py b/test/test_source.py index 95b7d00..2094cdd 100644 --- a/test/test_source.py +++ b/test/test_source.py @@ -1270,12 +1270,12 @@ def test_aqua_convert( db6.clear_all() aqualatest.store(cve_data) cve_data_count, cve_index_count = db6.stats() - assert cve_data_count == 1 - assert cve_index_count == 1 + assert cve_data_count == 4 + assert cve_index_count == 4 results_count = len(list(search.search_by_any("CVE-2020-8022"))) assert results_count == 0 results_count = len(list(search.search_by_any("CVE-2021-3618"))) - assert results_count == 1 + assert results_count == 4 # suse1 cve_data = aqualatest.convert(test_aqua_suse_json1) diff --git a/vdb/cli.py b/vdb/cli.py index fa5f118..c4571bd 100644 --- a/vdb/cli.py +++ b/vdb/cli.py @@ -226,7 +226,7 @@ def main(): else: sources = [OSVSource(), GitHubSource()] # AquaSource also includes NVD - if args.cache_os: + if args.cache_os and not args.only_aqua: sources.insert(0, AquaSource()) for s in sources: LOG.info("Refreshing %s", s.__class__.__name__) diff --git a/vdb/lib/aqua.py b/vdb/lib/aqua.py index 5a79555..64485de 100644 --- a/vdb/lib/aqua.py +++ b/vdb/lib/aqua.py @@ -110,13 +110,15 @@ def is_supported_source(zfname): "oval", "glad", "mariner", + f"cvrf{os.sep}suse{os.sep}suse", ): if distro in zfname: return False nvd_start_year = config.NVD_START_YEAR for year in range(1999, nvd_start_year): - if f"CVE-{year}-" in zfname: - return False + for pat in (f"CVE-{year}-", f"{os.sep}{year}{os.sep}", f"ALAS-{year}-", f"ALAS2-{year}-", f"openSUSE-SU-{year}-"): + if pat in zfname: + return False if zfname.endswith(".json"): return True return False @@ -616,6 +618,8 @@ def product_ref_to_name_version(product_ref): def suse_to_vuln(self, cve_data): """Suse Linux""" ret_data = [] + tracking_id = cve_data.get("Tracking", {}).get("ID", "") + assigner = "opensuse" if "opensuse" in tracking_id.lower() else "suse" packages = cve_data.get("ProductTree", {}).get("Relationships", []) if not packages or not len(packages) > 0: return ret_data @@ -643,7 +647,6 @@ def suse_to_vuln(self, cve_data): references = orjson.dumps(references, option=orjson.OPT_NAIVE_UTC) if isinstance(references, bytes): references = references.decode("utf-8", "ignore") - assigner = "suse" ( score, severity, @@ -651,56 +654,63 @@ def suse_to_vuln(self, cve_data): attack_complexity, ) = get_default_cve_data(severity) done_pkgs = {} - for pref in packages: - pkg_key = pref.get("ProductReference") - if done_pkgs.get(pkg_key): + product_statuses = avuln.get("ProductStatuses", []) + for aps in product_statuses: + if aps.get("Type") != "Fixed": continue - pkg_name, version = self.product_ref_to_name_version(pkg_key) - version_start_including = "" - version_end_including = "" - version_start_excluding = "" - version_end_excluding = version - fix_version_end_including = "" - fix_version_start_excluding = "" - fix_version_end_excluding = "" - fix_version_start_including = version - if pkg_name and version: - tdata = config.CVE_TPL % dict( - cve_id=cve_id, - cwe_id=cwe_id, - assigner=assigner, - references=references, - description="", - vectorString=vector_string, - vendor="rpm", - product=f"suse/{pkg_name}", - version="*", - edition="*", - version_start_including=version_start_including, - version_end_including=version_end_including, - version_start_excluding=version_start_excluding, - version_end_excluding=version_end_excluding, - fix_version_start_including=fix_version_start_including, - fix_version_end_including=fix_version_end_including, - fix_version_start_excluding=fix_version_start_excluding, - fix_version_end_excluding=fix_version_end_excluding, - severity=severity, - attackComplexity=attack_complexity, - score=score, - userInteraction="REQUIRED", - exploitabilityScore=score, - publishedDate=published_date, - lastModifiedDate=last_modified_date, - ) - try: - vuln = NvdSource.convert_vuln(orjson.loads(tdata)) - if vuln is None: - continue - vuln.description = compress_str(description) - ret_data.append(vuln) - done_pkgs[pkg_key] = True - except Exception: - pass + product_ids = aps.get("ProductID") + if not product_ids: + continue + for pkg_key in product_ids: + if done_pkgs.get(pkg_key): + continue + pkg_name, version = self.product_ref_to_name_version(pkg_key) + pkg_name = pkg_name.lower().replace(" ", "-").replace(":", "/") + version_start_including = "" + version_end_including = "" + version_start_excluding = "" + version_end_excluding = version + fix_version_end_including = "" + fix_version_start_excluding = "" + fix_version_end_excluding = "" + fix_version_start_including = version + if pkg_name and version: + tdata = config.CVE_TPL % dict( + cve_id=cve_id, + cwe_id=cwe_id, + assigner=assigner, + references=references, + description="", + vectorString=vector_string, + vendor="rpm", + product=pkg_name, + version="*", + edition="*", + version_start_including=version_start_including, + version_end_including=version_end_including, + version_start_excluding=version_start_excluding, + version_end_excluding=version_end_excluding, + fix_version_start_including=fix_version_start_including, + fix_version_end_including=fix_version_end_including, + fix_version_start_excluding=fix_version_start_excluding, + fix_version_end_excluding=fix_version_end_excluding, + severity=severity, + attackComplexity=attack_complexity, + score=score, + userInteraction="REQUIRED", + exploitabilityScore=score, + publishedDate=published_date, + lastModifiedDate=last_modified_date, + ) + try: + vuln = NvdSource.convert_vuln(orjson.loads(tdata)) + if vuln is None: + continue + vuln.description = compress_str(description) + ret_data.append(vuln) + done_pkgs[pkg_key] = True + except Exception: + pass return ret_data @staticmethod diff --git a/vdb/lib/db6.py b/vdb/lib/db6.py index 74fed7b..1449e8b 100644 --- a/vdb/lib/db6.py +++ b/vdb/lib/db6.py @@ -19,10 +19,12 @@ def ensure_schemas(db_conn_obj: apsw.Connection, index_conn_obj: apsw.Connection "CREATE TABLE if not exists cve_data(cve_id TEXT NOT NULL, type TEXT NOT NULL, namespace TEXT, name TEXT NOT NULL, source_data BLOB NOT NULL, override_data BLOB, source_data_hash TEXT NOT NULL, purl_prefix TEXT NOT NULL);") db_conn_obj.pragma("synchronous", "OFF") db_conn_obj.pragma("journal_mode", "MEMORY") + db_conn_obj.pragma("temp_store", "MEMORY") index_conn_obj.execute( "CREATE TABLE if not exists cve_index(cve_id TEXT NOT NULL, type TEXT NOT NULL, namespace TEXT, name TEXT NOT NULL, vers TEXT NOT NULL, purl_prefix TEXT NOT NULL);") index_conn_obj.pragma("synchronous", "OFF") index_conn_obj.pragma("journal_mode", "MEMORY") + index_conn_obj.pragma("temp_store", "MEMORY") def get(db_file: str = config.VDB_BIN_FILE, index_file: str = config.VDB_BIN_INDEX, read_only=False) -> (