Skip to content

Commit

Permalink
fix: lint
Browse files Browse the repository at this point in the history
  • Loading branch information
kaykhan committed Jan 7, 2025
1 parent 51b7081 commit 0357c20
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions functions/notify_slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

SECURITY_HUB_CLIENT = boto3.client('securityhub', region_name=REGION)


class AwsService(Enum):
"""AWS service supported by function"""

Expand Down Expand Up @@ -141,21 +142,18 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A
compliance_status = finding["Compliance"].get("Status", "UNKNOWN")
workflow_status = finding["Workflow"].get("Status", "UNKNOWN")
if compliance_status == "FAILED" and workflow_status == "NEW":
notified = SECURITY_HUB_CLIENT.batch_update_findings(
FindingIdentifiers=[{
'Id': finding.get('Id'),
'ProductArn': finding.get("ProductArn")
}],
Workflow={"Status": "NOTIFIED"}
)
logging.warning(f"Successfully updated finding status to NOTIFIED: {json.dumps(notified)}")
notified = SECURITY_HUB_CLIENT.batch_update_findings(
FindingIdentifiers=[{
'Id': finding.get('Id'),
'ProductArn': finding.get("ProductArn")
}],
Workflow={"Status": "NOTIFIED"}
)
logging.warning(f"Successfully updated finding status to NOTIFIED: {json.dumps(notified)}")
except Exception as e:
logging.error(f"Failed to update finding status: {str(e)}")
pass




if finding.get("ProductName") == "Inspector":
severity = finding["Severity"].get("Label", "INFORMATIONAL")
compliance_status = finding["Compliance"].get("Status", "UNKNOWN")
Expand All @@ -178,7 +176,7 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A

color = SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value
if compliance_status == "PASSED":
color = "#4BB543"
color = "#4BB543"

slack_message = {
"color": color,
Expand Down Expand Up @@ -225,7 +223,7 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A

color = SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value
if compliance_status == "PASSED":
color = "#4BB543"
color = "#4BB543"

slack_message = {
"color": color,
Expand All @@ -250,9 +248,9 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A

return slack_message


return format_default(message=message)


class SecurityHubSeverity(Enum):
"""Maps Security Hub finding severity to Slack message format color"""

Expand All @@ -269,13 +267,15 @@ def get(name, default):
except KeyError:
return default


class GuardDutyFindingSeverity(Enum):
"""Maps GuardDuty finding severity to Slack message format color"""

Low = "#777777"
Medium = "warning"
High = "danger"


def format_guardduty_finding(message: Dict[str, Any], region: str) -> Dict[str, Any]:
"""
Format GuardDuty finding event into Slack message format
Expand Down

0 comments on commit 0357c20

Please sign in to comment.