From 9a6619ea7aa1ea7a010110a82ab62429a72bb4bb Mon Sep 17 00:00:00 2001 From: Dylan Pulver <35541198+dylanpulver@users.noreply.github.com> Date: Mon, 6 Jan 2025 09:50:01 -0500 Subject: [PATCH] fix/scan-refactor-round-2 (#662) --- safety/scan/command.py | 332 ++++++++++++++++++++++++++++++----------- 1 file changed, 245 insertions(+), 87 deletions(-) diff --git a/safety/scan/command.py b/safety/scan/command.py index 98b93989..4bbba379 100644 --- a/safety/scan/command.py +++ b/safety/scan/command.py @@ -4,7 +4,7 @@ import json import sys -from typing import Any, Dict, List, Optional, Set, Tuple +from typing import Any, Dict, List, Optional, Set, Tuple, Callable from typing_extensions import Annotated from safety.constants import EXIT_CODE_VULNERABILITIES_FOUND @@ -37,6 +37,62 @@ LOG = logging.getLogger(__name__) +# CONSTANTS +# Icons and Emojis +ICON_PENCIL = ":pencil:" +ICON_CHECKMARK = ":white_check_mark:" +ICON_STOP_SIGN = ":stop_sign:" +ICON_UPLOAD = ":arrow_up:" + +# Rich Markup Tags +TAG_FILE_TITLE_START = "[file_title]" +TAG_FILE_TITLE_END = "[/file_title]" +TAG_DEP_NAME_START = "[dep_name]" +TAG_DEP_NAME_END = "[/dep_name]" +TAG_SPECIFIER_START = "[specifier]" +TAG_SPECIFIER_END = "[/specifier]" +TAG_BRIEF_SEVERITY = "[brief_severity]" +TAG_REM_SEVERITY = "[rem_severity]" +TAG_RECOMMENDED_VER = "[recommended_ver]" + +# Thresholds +CRITICAL_VULN_THRESHOLD = 3 +MIN_CRITICAL_COUNT = 0 +MIN_DETAILED_OUTPUT_THRESHOLD = 3 + +# Padding +PADDING_VALUES = (0, 0, 0, 1) + +# Rich Defaults +RICH_DEFAULT_KWARGS = {"emoji": True, "overflow": "crop"} + +# Messages +WAIT_MSG_PROCESSING_REPORT = "Processing report" +WAIT_MSG_FETCHING_DB = "Fetching Safety's vulnerability database..." +WAIT_MSG_SCANNING_DIRECTORY = "Scanning project directory" +WAIT_MSG_ANALYZING_TARGETS = "Analyzing {0} files and environments for security findings" + +MSG_UPLOADING_REPORT = "Uploading report to: {0}" +MSG_REPORT_UPLOADED = "Report uploaded" +MSG_PROJECT_DASHBOARD = "Project dashboard: [link]{0}[/link]" +MSG_SYSTEM_SCAN_REPORT = "System scan report: [link]{0}[/link]" + +MSG_AUTH_REQUIRED = "Authentication required. Please run 'safety auth login' to authenticate before using this command." + +MSG_DEPENDENCY_VULNERABILITIES_DETECTED = "Dependency vulnerabilities detected:" + +MSG_NO_KNOWN_FIX = "No known fix for [dep_name]{0}[/dep_name][specifier]{1}[/specifier] to fix [number]{2}[/number] {3}" +MSG_RECOMMENDED_UPDATE = "[rem_brief]Update {0} to {1}=={2}[/rem_brief] to fix [number]{3}[/number] {4}" +MSG_NO_VULNERABILITIES = "Versions of {0} with no known vulnerabilities: [recommended_ver]{1}[/recommended_ver]" + +MSG_LEARN_MORE = "Learn more: [link]{0}[/link]" + +MSG_NO_ISSUES_FOUND = f"{ICON_CHECKMARK} [file_title]{{0}}: No issues found.[/file_title]" + +MSG_SAFETY_UPDATES_RUNNING = "Safety updates running" +MSG_EXIT_CODE_FAILURE = ":stop_sign: Scan-failing vulnerabilities were found, returning non-zero exit code: {0}" +MSG_EXIT_CODE_SUCCESS = "No scan-failing vulnerabilities were matched, returning success exit code: 0" + cli_apps_opts = {"rich_markup_mode": "rich", "cls": SafetyCLISubGroup} scan_project_app = typer.Typer(**cli_apps_opts) scan_system_app = typer.Typer(**cli_apps_opts) @@ -69,8 +125,7 @@ def process_report( Returns: Optional[str]: The URL of the report if uploaded, otherwise None. """ - wait_msg = "Processing report" - with console.status(wait_msg, spinner=DEFAULT_SPINNER) as status: + with console.status(WAIT_MSG_PROCESSING_REPORT, spinner=DEFAULT_SPINNER) as status: json_format = report.as_v30().json() export_type, export_path = None, None @@ -84,7 +139,7 @@ def process_report( report_to_export = None report_to_output = None - with console.status(wait_msg, spinner=DEFAULT_SPINNER) as status: + with console.status(WAIT_MSG_PROCESSING_REPORT, spinner=DEFAULT_SPINNER) as status: spdx_format, html_format = None, None @@ -129,10 +184,10 @@ def process_report( report_url = None if obj.platform_enabled: - status.update(f"Uploading report to: {SAFETY_PLATFORM_URL}") + status.update(f"{ICON_UPLOAD} {MSG_UPLOADING_REPORT.format(SAFETY_PLATFORM_URL)}") try: result = obj.auth.client.upload_report(json_format) - status.update("Report uploaded") + status.update(MSG_REPORT_UPLOADED) report_url = f"{SAFETY_PLATFORM_URL}{result['url']}" except Exception as e: raise e @@ -153,10 +208,10 @@ def process_report( else: project_url_with_branch = project_url - lines.append(f"Project dashboard: [link]{project_url_with_branch}[/link]") + lines.append(MSG_PROJECT_DASHBOARD.format(project_url_with_branch)) elif report.metadata.scan_type is ScanType.system_scan: - lines.append(f"System scan report: [link]{report_url}[/link]") + lines.append(MSG_SYSTEM_SCAN_REPORT.format(report_url)) for line in lines: console.print(line, emoji=True) @@ -301,7 +356,7 @@ def validate_authentication(ctx: typer.Context) -> None: SafetyError: If the user is not authenticated. """ if not ctx.obj.metadata.authenticated: - raise SafetyError("Authentication required. Please run 'safety auth login' to authenticate before using this command.") + raise SafetyError(MSG_AUTH_REQUIRED) def generate_fixes_target(apply_updates: bool) -> List: @@ -360,8 +415,7 @@ def initialize_file_finder(ctx: typer.Context, target: Path, console: Console, e # Download necessary assets for each handler for handler in file_finder.handlers: if handler.ecosystem: - wait_msg = "Fetching Safety's vulnerability database..." - with console.status(wait_msg, spinner=DEFAULT_SPINNER): + with console.status(WAIT_MSG_FETCHING_DB, spinner=DEFAULT_SPINNER): handler.download_required_assets(ctx.obj.auth.client) return file_finder @@ -378,36 +432,57 @@ def scan_project_directory(file_finder: FileFinder, console: Console) -> Tuple[P Returns: Tuple[Path, Dict]: The base path of the project and a dictionary of file paths grouped by type. """ - wait_msg = "Scanning project directory" - with console.status(wait_msg, spinner=DEFAULT_SPINNER): + with console.status(WAIT_MSG_SCANNING_DIRECTORY, spinner=DEFAULT_SPINNER): path, file_paths = file_finder.search() print_detected_ecosystems_section(console, file_paths, include_safety_prjs=True) return path, file_paths -def detect_dependency_vulnerabilities(console, dependency_vuln_detected): +def detect_dependency_vulnerabilities(console: Console, dependency_vuln_detected: bool) -> bool: """ Prints a message indicating that dependency vulnerabilities were detected. + + Args: + console (Console): The console object for printing. + dependency_vuln_detected (bool): Whether vulnerabilities have been detected. + + Returns: + bool: True if vulnerabilities are detected, False otherwise. """ if not dependency_vuln_detected: console.print() - console.print("Dependency vulnerabilities detected:") + console.print(MSG_DEPENDENCY_VULNERABILITIES_DETECTED) return True return dependency_vuln_detected -def print_file_info(console, path, target): +def print_file_info(console: Console, path: Path, target: Path) -> None: """ Prints the file information for vulnerabilities. + + Args: + console (Console): The console object for printing. + path (Path): The file path of the current file. + target (Path): The base path to which the file path is relative. """ console.print() - msg = f":pencil: [file_title]{path.relative_to(target)}:[/file_title]" + msg = f"{ICON_PENCIL} {TAG_FILE_TITLE_START}{path.relative_to(target)}:{TAG_FILE_TITLE_END}" console.print(msg, emoji=True) -def sort_and_filter_vulnerabilities(vulnerabilities, key_func, reverse=True): +def sort_and_filter_vulnerabilities( + vulnerabilities: List[Any], key_func: Callable[[Any], int], reverse: bool = True +) -> List[Any]: """ Sorts and filters vulnerabilities. + + Args: + vulnerabilities (List[Any]): A list of vulnerabilities to sort and filter. + key_func (Callable[[Any], int]): A function to determine the sort key. + reverse (bool): Whether to sort in descending order (default is True). + + Returns: + List[Any]: The sorted and filtered list of vulnerabilities. """ return sorted( [vuln for vuln in vulnerabilities if not vuln.ignored], @@ -448,9 +523,10 @@ def generate_vulnerability_message(spec_name: str, spec_raw: str, vulns_found: i Returns: str: Formatted vulnerability message. """ - msg = f"[dep_name]{spec_name}[/dep_name][specifier]{spec_raw.replace(spec_name, '')}[/specifier] [{vulns_found} {vuln_word} found" - if vulns_found > 3 and critical_vulns_count > 0: - msg += f", [brief_severity]including {critical_vulns_count} critical severity {pluralize('vulnerability', critical_vulns_count)}[/brief_severity]" + msg = f"{TAG_DEP_NAME_START}{spec_name}{TAG_DEP_NAME_END}{TAG_SPECIFIER_START}{spec_raw.replace(spec_name, '')}{TAG_SPECIFIER_END} [{vulns_found} {vuln_word} found" + + if vulns_found > CRITICAL_VULN_THRESHOLD and critical_vulns_count > MIN_CRITICAL_COUNT: + msg += f", {TAG_BRIEF_SEVERITY}including {critical_vulns_count} critical severity {pluralize('vulnerability', critical_vulns_count)}{TAG_BRIEF_SEVERITY}" return msg @@ -465,10 +541,111 @@ def render_vulnerabilities(vulns_to_report: List[Vulnerability], console: Consol """ for vuln in vulns_to_report: render_to_console( - vuln, console, rich_kwargs={"emoji": True, "overflow": "crop"}, detailed_output=detailed_output + vuln, console, rich_kwargs=RICH_DEFAULT_KWARGS, detailed_output=detailed_output ) +def generate_remediation_details(spec: Any, vuln_word: str, critical_vulns_count: int) -> Tuple[List[str], int, int]: + """ + Generate remediation details for a specific dependency. + + Args: + spec (Any): Dependency specification object. + vuln_word (str): Pluralized word for vulnerabilities. + critical_vulns_count (int): Number of critical vulnerabilities. + + Returns: + Tuple[List[str], int, int]: A tuple containing: + - List of remediation lines. + - Total resolved vulnerabilities. + - Fixes count. + """ + lines = [] + total_resolved_vulns = 0 + fixes_count = 0 + + if not spec.remediation.recommended: + lines.append( + MSG_NO_KNOWN_FIX.format(spec.name, spec.raw.replace(spec.name, ''), spec.remediation.vulnerabilities_found, vuln_word) + ) + else: + total_resolved_vulns += spec.remediation.vulnerabilities_found + msg = MSG_RECOMMENDED_UPDATE.format(spec.raw, spec.name, spec.remediation.recommended, spec.remediation.vulnerabilities_found, vuln_word) + + if spec.remediation.vulnerabilities_found > CRITICAL_VULN_THRESHOLD and critical_vulns_count > MIN_CRITICAL_COUNT: + msg += f", {TAG_REM_SEVERITY}including {critical_vulns_count} critical severity {pluralize('vulnerability', critical_vulns_count)}{TAG_REM_SEVERITY} {ICON_STOP_SIGN}" + + fixes_count += 1 + lines.append(msg) + + if spec.remediation.other_recommended: + other_versions = "[/recommended_ver], [recommended_ver]".join(spec.remediation.other_recommended) + lines.append(MSG_NO_VULNERABILITIES.format(spec.name, other_versions)) + + return lines, total_resolved_vulns, fixes_count + + +def should_display_fix_suggestion(ctx: typer.Context, analyzed_file: Any, affected_specifications: List[Any], apply_updates: bool) -> bool: + """ + Determine whether to display a fix suggestion based on the current context and file analysis. + + Args: + ctx (typer.Context): The Typer context object. + analyzed_file (Any): The file currently being analyzed. + affected_specifications (List[Any]): List of affected specifications. + apply_updates (bool): Whether fixes are being applied. + + Returns: + bool: True if the fix suggestion should be displayed, False otherwise. + """ + return ( + ctx.obj.auth.stage == Stage.development + and analyzed_file.ecosystem == Ecosystem.PYTHON + and analyzed_file.file_type == FileType.REQUIREMENTS_TXT + and any(affected_specifications) + and not apply_updates + ) + + +def process_file_fixes(file_to_fix: FileModel, + specs_to_fix: List[Any], + options: Dict, + policy_limits: List[SecurityUpdates.UpdateLevel], + output: ScanOutput, + no_output: bool, + prompt: bool) -> Any: + """ + Process fixes for a given file and its specifications. + + Args: + file_to_fix (FileModel): The file to fix. + specs_to_fix (List[Any]): The specifications to fix in the file. + options (Dict): Mapping of file types to update limits. + policy_limits (List[SecurityUpdates.UpdateLevel]): Policy-defined update limits. + output (ScanOutput): The scan output format. + no_output (bool): Whether to suppress output. + prompt (bool): Whether to prompt the user for confirmation. + + Returns: + Any: The result of the `process_fixes_scan` function. + """ + try: + limit = options[file_to_fix.file_type] + except KeyError: + try: + limit = options[file_to_fix.file_type.value] + except KeyError: + limit = SecurityUpdates.UpdateLevel("patch") + + # Set defaults + update_limits = [limit.value] + + if any(policy_limits): + update_limits = [policy_limit.value for policy_limit in policy_limits] + + return process_fixes_scan(file_to_fix, specs_to_fix, update_limits, output, no_output=no_output, prompt=prompt) + + @scan_project_app.command( cls=SafetyCLICommand, help=CLI_SCAN_COMMAND_HELP, @@ -546,24 +723,34 @@ def scan(ctx: typer.Context, Scans a project (defaulted to the current directory) for supply-chain security and configuration issues """ + # Step 1: Validate inputs and initialize settings validate_authentication(ctx) - fixes_target = generate_fixes_target(apply_updates) + fixes_target = generate_fixes_target(apply_updates) # Determine targets for updates validate_save_as(ctx, save_as) + # Step 2: Setup console and ecosystems for scanning console = ctx.obj.console ecosystems = [Ecosystem(member.value) for member in list(ScannableEcosystems)] + # Step 3: Initialize file finder and locate project files file_finder = initialize_file_finder(ctx, target, console, ecosystems) path, file_paths = scan_project_directory(file_finder, console) - target_ecosystems = ", ".join([member.value for member in ecosystems]) - wait_msg = f"Analyzing {target_ecosystems} files and environments for security findings" + # Step 4: Prepare metadata for analysis + if ecosystems: + target_ecosystems = ", ".join([member.value for member in ecosystems]) + wait_msg = WAIT_MSG_ANALYZING_TARGETS.format(target_ecosystems) + else: + # Handle the case where no ecosystems are detected + target_ecosystems = "No ecosystems detected" + wait_msg = "Analyzing files and environments for security findings" + # Step 5: Initialize data structures and counters for analysis files: List[FileModel] = [] to_fix_files = [] ignored_vulns_data = iter([]) config = ctx.obj.config - count = 0 + count = 0 # Total dependencies processed affected_count = 0 exit_code = 0 fixes_count = 0 @@ -577,23 +764,26 @@ def scan(ctx: typer.Context, with console.status(wait_msg, spinner=DEFAULT_SPINNER): for path, analyzed_file in process_files(paths=file_paths, config=config, use_server_matching=use_server_matching, obj=ctx.obj, target=target): - count += len(analyzed_file.dependency_results.dependencies) - # Update exit code if vulnerabilities are found + # Update counts and track vulnerabilities + count += len(analyzed_file.dependency_results.dependencies) if exit_code == 0 and analyzed_file.dependency_results.failed: exit_code = EXIT_CODE_VULNERABILITIES_FOUND affected_specifications = analyzed_file.dependency_results.get_affected_specifications() affected_count += len(affected_specifications) + # Sort vulnerabilities by severity def sort_vulns_by_score(vuln: Vulnerability) -> int: if vuln.severity and vuln.severity.cvssv3: return vuln.severity.cvssv3.get("base_score", 0) return 0 + # Prepare to collect files needing fixes to_fix_spec = [] file_matched_for_fix = analyzed_file.file_type.value in fix_file_types + # Handle files with affected specifications if any(affected_specifications): dependency_vuln_detected = detect_dependency_vulnerabilities(console, dependency_vuln_detected) print_file_info(console, path, target) @@ -602,6 +792,7 @@ def sort_vulns_by_score(vuln: Vulnerability) -> int: if file_matched_for_fix: to_fix_spec.append(spec) + # Print vulnerabilities for each specification console.print() vulns_to_report = sort_and_filter_vulnerabilities(spec.vulnerabilities, key_func=sort_vulns_by_score) critical_vulns_count = count_critical_vulnerabilities(vulns_to_report) @@ -609,58 +800,37 @@ def sort_vulns_by_score(vuln: Vulnerability) -> int: vuln_word = pluralize("vulnerability", vulns_found) msg = generate_vulnerability_message(spec.name, spec.raw, vulns_found, critical_vulns_count, vuln_word) - console.print(Padding(f"{msg}]", (0, 0, 0, 1)), emoji=True, overflow="crop") + console.print(Padding(f"{msg}]", PADDING_VALUES), emoji=True, overflow="crop") - if detailed_output or vulns_found < 3: + # Display detailed vulnerability information if applicable + if detailed_output or vulns_found < MIN_DETAILED_OUTPUT_THRESHOLD: render_vulnerabilities(vulns_to_report, console, detailed_output) - lines = [] - - if spec.remediation.recommended: - total_resolved_vulns += spec.remediation.vulnerabilities_found - - # Put remediation here - if not spec.remediation.recommended: - lines.append(f"No known fix for [dep_name]{spec.name}[/dep_name][specifier]{spec.raw.replace(spec.name, '')}[/specifier] to fix " \ - f"[number]{spec.remediation.vulnerabilities_found}[/number] " \ - f"{vuln_word}") - else: - msg = f"[rem_brief]Update {spec.raw} to " \ - f"{spec.name}=={spec.remediation.recommended}[/rem_brief] to fix " \ - f"[number]{spec.remediation.vulnerabilities_found}[/number] " \ - f"{vuln_word}" - - if spec.remediation.vulnerabilities_found > 3 and critical_vulns_count > 0: - msg += f", [rem_severity]including {critical_vulns_count} critical severity {pluralize('vulnerability', critical_vulns_count)}[/rem_severity] :stop_sign:" - - fixes_count += 1 - lines.append(f"{msg}") - if spec.remediation.other_recommended: - other = "[/recommended_ver], [recommended_ver]".join(spec.remediation.other_recommended) - lines.append(f"Versions of {spec.name} with no known vulnerabilities: " \ - f"[recommended_ver]{other}[/recommended_ver]") + # Generate remediation details and print them + lines, resolved_vulns, fixes = generate_remediation_details(spec, vuln_word, critical_vulns_count) + total_resolved_vulns += resolved_vulns + fixes_count += fixes for line in lines: - console.print(Padding(line, (0, 0, 0, 1)), emoji=True) + console.print(Padding(line, PADDING_VALUES), emoji=True) + # Provide a link for additional information console.print( - Padding(f"Learn more: [link]{spec.remediation.more_info_url}[/link]", - (0, 0, 0, 1)), emoji=True) + Padding(MSG_LEARN_MORE.format(spec.remediation.more_info_url), PADDING_VALUES), emoji=True) else: + # Handle files with no issues console.print() - console.print(f":white_check_mark: [file_title]{path.relative_to(target)}: No issues found.[/file_title]", + console.print(f"{ICON_CHECKMARK} [file_title]{path.relative_to(target)}: No issues found.[/file_title]", emoji=True) - if(ctx.obj.auth.stage == Stage.development - and analyzed_file.ecosystem == Ecosystem.PYTHON - and analyzed_file.file_type == FileType.REQUIREMENTS_TXT - and any(affected_specifications) - and not apply_updates): - display_apply_fix_suggestion = True + # Track whether to suggest applying fixes + display_apply_fix_suggestion = should_display_fix_suggestion(ctx, analyzed_file, affected_specifications, apply_updates) + # Track if a requirements.txt file was found if not requirements_txt_found and analyzed_file.file_type is FileType.REQUIREMENTS_TXT: requirements_txt_found = True + # Save file data for further processing file = FileModel(location=path, file_type=analyzed_file.file_type, results=analyzed_file.dependency_results) @@ -670,10 +840,12 @@ def sort_vulns_by_score(vuln: Vulnerability) -> int: files.append(file) + # Suggest fixes if applicable if display_apply_fix_suggestion: console.print() print_fixes_section(console, requirements_txt_found, detailed_output) + # Finalize report metadata and print summary console.print() version = ctx.obj.schema metadata = ctx.obj.metadata @@ -686,6 +858,7 @@ def sort_vulns_by_score(vuln: Vulnerability) -> int: files=[], projects=[ctx.obj.project]) + # Generate and print vulnerability summary total_issues_with_duplicates, total_ignored_issues = get_vulnerability_summary(report.as_v30()) print_summary( @@ -700,6 +873,7 @@ def sort_vulns_by_score(vuln: Vulnerability) -> int: ignored_vulns_data=ignored_vulns_data ) + # Process report and upload if required report_url = process_report( obj=ctx.obj, console=console, @@ -713,11 +887,10 @@ def sort_vulns_by_score(vuln: Vulnerability) -> int: project_url = f"{SAFETY_PLATFORM_URL}{ctx.obj.project.url_path}" + # Handle fix application if enabled if apply_updates: options = dict(fixes_target) - update_limits = [] policy_limits = ctx.obj.config.depedendency_vulnerability.security_updates.auto_security_updates_limit - no_output = output is not ScanOutput.SCREEN prompt = output is ScanOutput.SCREEN @@ -725,40 +898,25 @@ def sort_vulns_by_score(vuln: Vulnerability) -> int: if not no_output: console.print() console.print("-" * console.size.width) - console.print("Safety updates running") + console.print(MSG_SAFETY_UPDATES_RUNNING) console.print("-" * console.size.width) for file_to_fix, specs_to_fix in to_fix_files: - try: - limit = options[file_to_fix.file_type] - except KeyError: - try: - limit = options[file_to_fix.file_type.value] - except KeyError: - limit = SecurityUpdates.UpdateLevel("patch") - - # Set defaults - update_limits = [limit.value] - - if any(policy_limits): - update_limits = [policy_limit.value for policy_limit in policy_limits] - - fixes = process_fixes_scan(file_to_fix, - specs_to_fix, update_limits, output, no_output=no_output, - prompt=prompt) + fixes = process_file_fixes(file_to_fix, specs_to_fix, options, policy_limits, output, no_output, prompt) if not no_output: console.print("-" * console.size.width) + # Print final exit messages and handle exit code if output is ScanOutput.SCREEN: run_easter_egg(console, exit_code) if output is not ScanOutput.NONE: if detailed_output: if exit_code > 0: - console.print(f":stop_sign: Scan-failing vulnerabilities were found, returning non-zero exit code: {exit_code}") + console.print(MSG_EXIT_CODE_FAILURE.format(exit_code)) else: - console.print("No scan-failing vulnerabilities were matched, returning success exit code: 0") + console.print(MSG_EXIT_CODE_SUCCESS) sys.exit(exit_code) return project_url, report, report_url