-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathsimple_semgrep.py
138 lines (111 loc) · 5.34 KB
/
simple_semgrep.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from src.actions.base import BaseAction, ActionSpec, ActionArgument
from src.actions.result import ActionResult
from src.jobs.base import Job, JobResult
from src.util.logging import Logger
from src.jobs.manager import JobManager
from src.models.base import Asset
from src.backend.database import DBSessionMixin
import json
import os
import pathlib
import asyncio
class SemgrepJob(Job):
"""Job that runs a semgrep scan on a specified path"""
def __init__(self, path: str):
super().__init__(job_type="semgrep")
self.path = path
self.logger = Logger("SemgrepJob")
# Get path to rules directory relative to this file
module_dir = pathlib.Path(__file__).parent
self.rules_path = os.path.join(module_dir, "semgrep-rules")
async def start(self) -> None:
"""Start the semgrep scan"""
try:
# Build semgrep command
cmd = ["semgrep", "--config", self.rules_path, "--json", self.path]
# Run semgrep
self.logger.info(f"Running semgrep on {self.path} with rules from {self.rules_path}")
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
error_msg = stderr.decode() if stderr else "Unknown error"
await self.fail(f"Semgrep failed: {error_msg}")
return
# Parse JSON output
try:
results = json.loads(stdout.decode())
findings = results.get("results", [])
total = len(findings)
# Create result message
if total > 0:
message = f"Found {total} potential issue(s) in {self.path}"
# Add severity counts
severity_counts = {}
for finding in findings:
severity = finding.get("extra", {}).get("severity", "unknown")
severity_counts[severity] = severity_counts.get(severity, 0) + 1
if severity_counts:
message += "\nSeverity breakdown:"
for severity, count in severity_counts.items():
message += f"\n- {severity}: {count}"
else:
message = f"No issues found in {self.path}"
# Create job result
result = JobResult(success=True, message=message, data=results)
# Add detailed outputs for each finding
for finding in findings:
output = (
f"Finding in {finding.get('path')}:\n"
f"Rule: {finding.get('check_id')}\n"
f"Severity: {finding.get('extra', {}).get('severity', 'unknown')}\n"
f"Line: {finding.get('start', {}).get('line')}\n"
f"Message: {finding.get('extra', {}).get('message')}\n"
f"Code: {finding.get('extra', {}).get('lines')}\n"
)
result.add_output(output)
await self.complete(result)
except json.JSONDecodeError as e:
await self.fail(f"Failed to parse semgrep output: {e}")
except Exception as e:
self.logger.error(f"Error in semgrep job: {e}")
await self.fail(str(e))
async def stop_handler(self) -> None:
"""Stop the job - nothing to do for semgrep"""
pass
class SemgrepAction(BaseAction, DBSessionMixin):
"""Action that runs a semgrep scan on a specified asset"""
spec = ActionSpec(
name="semgrep",
description="Run a semgrep scan on a specified asset",
help_text="Runs semgrep security scanner on the specified asset using configured rules",
agent_hint="Use this to scan code for potential security issues",
arguments=[ActionArgument(name="asset", description="Asset to scan", required=True)],
)
def __init__(self):
super().__init__()
DBSessionMixin.__init__(self)
self.logger = Logger("SemgrepAction")
self.job_manager = JobManager()
async def execute(self, *args, **kwargs) -> ActionResult:
"""Execute the semgrep scan action"""
try:
# Get asset name from args
asset_id = args[0]
# Get asset from database
with self.get_session() as session:
asset = session.query(Asset).filter(Asset.id == asset_id).first()
if not asset:
return ActionResult.error(f"Asset not found: {asset_id}")
# Get local file path
local_path = asset.local_path
if not local_path:
return ActionResult.error(f"Asset {asset_id} has no local file path")
# Create and submit the semgrep job
job = SemgrepJob(local_path)
job_id = await self.job_manager.submit_job(job)
return ActionResult.text(f"Semgrep scan started with job ID: {job_id}\nUse 'job {job_id}' to check results.")
except Exception as e:
self.logger.error(f"Failed to start semgrep scan: {str(e)}")
return ActionResult.error(f"Failed to start semgrep scan: {str(e)}")