Skip to content

Commit

Permalink
feat(terraform): Update CKV_AWS_358, add CKV_GCP_125 and CKV_AZURE_24…
Browse files Browse the repository at this point in the history
…9 for OIDC claims analysis for GitHub (#6960)

* feat(terraform): improve CKV_AWS_358, add CKV_GCP_125 and CKV_AZURE_249

Introducing new OIDC checks for TF for GCP(CKV_GCP_125), Azure(CKV_AZURE_249) and AWS(CKV_AWS_358).

* fix: allow granular pick of HCL attributes to evaulate w.r.t the resource type

This commit introduces a change to the variable rendering logic where one can
choose which attributes to ignore with respect to the passed resource type

* ignore: added test residuals to gitignore
  • Loading branch information
aviadhahami authored Jan 22, 2025
1 parent 4e14082 commit 1bfcf06
Show file tree
Hide file tree
Showing 12 changed files with 666 additions and 28 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ __pycache__/
*$py.class
*__pycache__/

# Python tests residuals
tests/sca_package_2/examples/obj*

# Terraform
*.tfstate*
*.terraform*
Expand Down Expand Up @@ -178,4 +181,4 @@ tests/console
# sast go mod
checkov/sast_core/vendor

*.prof
*.prof
4 changes: 4 additions & 0 deletions checkov/common/util/oidc_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import re

gh_repo_regex = re.compile(r"[\w]+/.+")
gh_abusable_claims = ["workflow", "environment", "ref", "context", "head_ref", "base_ref"]
80 changes: 55 additions & 25 deletions checkov/terraform/checks/data/aws/GithubActionsOIDCTrustPolicy.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,86 @@
from typing import Dict, List, Any
import re
from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.common.util.type_forcers import force_list
from checkov.terraform.checks.data.base_check import BaseDataCheck

gh_repo_regex = re.compile(r'repo:[^/]+/[^/]+')
from checkov.common.util.oidc_utils import gh_abusable_claims, gh_repo_regex


class GithubActionsOIDCTrustPolicy(BaseDataCheck):
def __init__(self):
name = 'Ensure GitHub Actions OIDC trust policies only allows actions from a specific known organization'
name = "Ensure AWS GitHub Actions OIDC authorization policies only allow safe claims and claim order"
id = "CKV_AWS_358"
supported_data = ("aws_iam_policy_document",)
categories = [CheckCategories.IAM]
super().__init__(name=name, id=id, categories=categories, supported_data=supported_data)

def scan_data_conf(self, conf: Dict[str, List[Any]]) -> CheckResult:
statements = force_list(conf.get('statement'))
statements = force_list(conf.get("statement"))
for statement in statements:
found_federated_gh_oidc = False
if isinstance(statement, dict):
if statement.get('principals'):
principals = statement['principals']
if statement.get("principals"):
principals = statement["principals"]
for principal in force_list(principals):
if 'type' not in principal and 'identifiers' not in principal:
if "type" not in principal and "identifiers" not in principal:
continue
principal_type = principal['type']
principal_identifiers = principal['identifiers']
if isinstance(principal_type, list) and len(
principal_type) and 'Federated' in principal_type and isinstance(principal_identifiers,
list):
principal_type = principal["type"]
principal_identifiers = principal["identifiers"]
if (
isinstance(principal_type, list)
and len(principal_type)
and "Federated" in principal_type
and isinstance(principal_identifiers, list)
):
for identifier in principal_identifiers:
if isinstance(identifier,
list) and identifier[0] is not None and \
'oidc-provider/token.actions.githubusercontent.com' in identifier[0]:
if (
isinstance(identifier, list)
and len(identifier) > 0
and identifier[0] is not None
and "oidc-provider/token.actions.githubusercontent.com" in identifier[0]
):
found_federated_gh_oidc = True
break
if not found_federated_gh_oidc:
return CheckResult.PASSED
if found_federated_gh_oidc and not statement.get('condition'):
if found_federated_gh_oidc and not statement.get("condition"):
return CheckResult.FAILED
found_sub_condition_variable = False
found_sub_condition_value = False
for condition in statement.get('condition'):
condition_variables = condition.get('variable')
condition_values = condition.get('values')
for condition in statement.get("condition"):
condition_variables = condition.get("variable")
condition_values = condition.get("values")
if isinstance(condition_variables, list):
for condition_variable in condition_variables:
if condition_variable == 'token.actions.githubusercontent.com:sub':
if condition_variable == "token.actions.githubusercontent.com:sub":
found_sub_condition_variable = True
break
for condition_value in condition_values:
if isinstance(condition_value, list) and gh_repo_regex.search(condition_value[0]):
found_sub_condition_value = True
break
if isinstance(condition_values, list):
for condition_value in condition_values:
if isinstance(condition_value, list):
# First -> check if the value is a mere wildcard. If so, it's a fail
# This covers the case where the condition is ['sub':'*']
if len(condition_value) == 1 and condition_value[0] == "*":
return CheckResult.FAILED
# Split the claims by ':' for deeper inspection
split_claims = condition_value[0].split(":")
# The assertion MUST be of the form ['{claim_name_1}:{claim_value_1}:{claim_name_2}:{claim_value_2}...']
# If the length of the split claims is 1, it means that the assertion is ['sub':'{claim_name}'] - this is a fail
if len(split_claims) == 1:
return CheckResult.FAILED
# Second -> Check if the value is a wildcard assertion
# This covers the case where the condition is ['sub':'{claim_name}:*']
if split_claims[1] == "*":
return CheckResult.FAILED
# Third -> Check if the value is an abusable claim
# This covers the case where the condition is ['sub':'{abusable_claim}:{any_value}']
for abusable_claim in gh_abusable_claims:
if split_claims[0].startswith(abusable_claim):
return CheckResult.FAILED
# Fourth -> Check if the value is a repo:org/* -> this is a pass with a warning
if split_claims[0] == "repo" and not gh_repo_regex.match(split_claims[1]):
return CheckResult.FAILED
found_sub_condition_value = True
break
if found_sub_condition_value and found_sub_condition_variable:
return CheckResult.PASSED

Expand All @@ -62,5 +89,8 @@ def scan_data_conf(self, conf: Dict[str, List[Any]]) -> CheckResult:

return CheckResult.PASSED

def get_evaluated_keys(self) -> List[str]:
return ["statement/condition/variable", "statement/condition/values"]


check = GithubActionsOIDCTrustPolicy()
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from typing import Dict, Any, List
from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.util.type_forcers import force_list
from checkov.common.util.oidc_utils import gh_abusable_claims, gh_repo_regex


class AzureGithubActionsOIDCTrustPolicy(BaseResourceCheck):
def __init__(self) -> None:
name = "Ensure Azure GitHub Actions OIDC trust policy is configured securely"
id = "CKV_AZURE_249"
supported_resources = [
"azuread_application_federated_identity_credential",
"azuread_application", # Added to support both approaches
]
categories = (CheckCategories.IAM,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def validate_subject_claim(self, subject: str) -> bool:
"""Validates the subject claim for security concerns"""
if not subject:
return False

# If no colons - invalid format for GitHub Actions claims
if ":" not in subject:
return False

claim_parts = subject.split(":")

# Check for wildcards in critical positions
if claim_parts[0] == "*" or claim_parts[1] == "*":
return False

# Check for abusable claims
if claim_parts[0] in gh_abusable_claims:
return False

# Validate repo format if repo claim is used
if claim_parts[0] == "repo":
if not gh_repo_regex.match(claim_parts[1]):
return False

return True

def scan_resource_conf(self, conf: Dict[str, List[Any]]) -> CheckResult:
"""Scans the configuration for Azure GitHub Actions OIDC trust policy"""
try:
isAzureAdResource = conf.get("identity_federation", [None])[0] is not None
condition = force_list(
conf.get("subject", [None])
if not isAzureAdResource
else conf.get("identity_federation", [None])[0].get("subject", [None])[0]
)[0]
if not condition:
return CheckResult.FAILED

# We should have colon delimited subject claim
if ":" not in condition or condition == "*":
return CheckResult.FAILED

# At this point we know we have a colon delimited subject claim, so length should be at least 2
split_condition = condition.split(":")

# First check -> wildcards
if "*" == split_condition[0] or "*" == split_condition[1]:
return CheckResult.FAILED

# Second check -> abusable claims
if split_condition[0] in gh_abusable_claims:
return CheckResult.FAILED

# Third check -> repo format
if split_condition[0] == "repo" and not gh_repo_regex.match(split_condition[1]):
return CheckResult.FAILED

return CheckResult.PASSED

except Exception:
return CheckResult.FAILED

def get_evaluated_keys(self) -> List[str]:
return ["identity_federation/subject", "subject"]


check = AzureGithubActionsOIDCTrustPolicy()
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from typing import Dict, Any, List
from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.util.oidc_utils import gh_abusable_claims, gh_repo_regex
import re


class GithubActionsOIDCTrustPolicy(BaseResourceCheck):
def __init__(self) -> None:
name = "Ensure GCP GitHub Actions OIDC trust policy is configured securely"
id = "CKV_GCP_125"
supported_resources = ["google_iam_workload_identity_pool_provider"]
categories = (CheckCategories.IAM,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def extract_sub_claim_value(self, condition: str) -> str:
"""Extract the claim value from the condition string."""
if not condition:
return ""

# Handle both single and double quotes
claim_match = re.search(r"assertion\.sub\s*==\s*['\"]([^'\"]+)['\"]", condition)
if claim_match:
return claim_match.group(1)
return ""

def scan_resource_conf(self, conf: Dict[str, List[Any]]) -> CheckResult:
"""Scans the configuration for GitHub Actions OIDC trust policy"""
try:
# Check issuer URI
issuer_uri = conf.get("issuer_uri", [None])[0]
print(f"Issuer URI: {issuer_uri}")
if not issuer_uri or issuer_uri != "https://token.actions.githubusercontent.com":
print("Failed: Invalid issuer URI")
return CheckResult.FAILED

# Check attribute mapping
attribute_mapping = conf.get("attribute_mapping")
if not attribute_mapping or not isinstance(attribute_mapping, list) or not attribute_mapping[0]:
return CheckResult.FAILED
attribute_mapping = attribute_mapping[0]
if not attribute_mapping or "google.subject" not in attribute_mapping:
print("Failed: Missing google.subject in attribute mapping")
return CheckResult.FAILED

# Check attribute condition
attribute_condition = conf.get("attribute_condition", False)[0]
if not attribute_condition:
return CheckResult.FAILED

# Extract claim value
sub_claim_value = self.extract_sub_claim_value(attribute_condition)
if not sub_claim_value:
print("Failed: Could not extract claim value")
return CheckResult.FAILED

# If no colons - it means we assert something the value without the claim name, which is invalid when using GitHub Actions OIDC
if ":" not in sub_claim_value:
print("Failed: Invalid claim value")
return CheckResult.FAILED

# Break by colons; Since we already checked for the presence of colons, we can safely assume that the claim is in the form of claim_name:claim_value
claim_parts = sub_claim_value.split(":")
# Check if the first claim or value are wildcards - if yes, the assertion is checking nothing
if claim_parts[0] == "*" or claim_parts[1] == "*":
print("Failed: Wildcard claims")
return CheckResult.FAILED

# Check if the first claim is an abusable claim - if yes, the whole assertion can be abused
if claim_parts[0] in gh_abusable_claims:
print("Failed: Abusable claim")
return CheckResult.FAILED

# Lastly, check for the classic "repo" claim
if claim_parts[0] == "repo":
# Check if the repo claim is in the form of org/repo
if not gh_repo_regex.match(claim_parts[1]):
print("Failed: Invalid repo claim")
return CheckResult.FAILED

return CheckResult.PASSED

except Exception as e:
print(f"Failed with exception: {str(e)}")
return CheckResult.FAILED

def get_evaluated_keys(self) -> List[str]:
return ["attribute_condition", "attribute_mapping", "issuer_uri"]


check = GithubActionsOIDCTrustPolicy()
13 changes: 13 additions & 0 deletions checkov/terraform/graph_builder/variable_rendering/renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
'map': {}
}

attrsToFilterByResourceType = {
"google_iam_workload_identity_pool_provider": ["attribute_condition"]
}

DYNAMIC_STRING = 'dynamic'
DYNAMIC_BLOCKS_LISTS = 'list'
DYNAMIC_BLOCKS_MAPS = 'map'
Expand Down Expand Up @@ -489,6 +493,14 @@ def _assign_dynamic_value_for_map(
else:
dpath.set(block_conf, dynamic_argument, dynamic_value, separator=DOT_SEPERATOR)

def shouldBeFilteredByConditionAndResourceType(self, attr: str, resource_type: List[str]) -> bool:
if not resource_type:
return False
for resource in resource_type:
if resource in attrsToFilterByResourceType and attr in attrsToFilterByResourceType[resource]:
return True
return False

def evaluate_non_rendered_values(self) -> None:
for index, vertex in enumerate(self.local_graph.vertices):
changed_attributes = {}
Expand All @@ -500,6 +512,7 @@ def evaluate_non_rendered_values(self) -> None:
for attr in vertex.attributes
if attr not in reserved_attribute_names and not attribute_has_nested_attributes(attr, vertex.attributes, attribute_is_leaf)
and not attribute_has_dup_with_dynamic_attributes(attr, vertex.attributes)
and not self.shouldBeFilteredByConditionAndResourceType(attr, vertex.attributes.get("resource_type", []))
]
for attribute in filtered_attributes:
curr_val = vertex.attributes.get(attribute)
Expand Down
Loading

0 comments on commit 1bfcf06

Please sign in to comment.