Skip to content

Commit

Permalink
Fix: Ignore disabled clients in monitors when enabled (#64)
Browse files Browse the repository at this point in the history
* Refactor out logic for ignoring disabled clients
* Adapt monitors to ignore disabled clients when demanded. Closes #46
* bugfix: add missing return value for default case
* Add tests for ServiceAccountWithSensitiveRole Monitor
* Fix inconsistent handling of get_name vs. get_username
* Refactor service_account_with_group to check for ignored accounts first
* Extend and refactor tests
* inMemoryDB: Use instance instead of class vars
  • Loading branch information
malexmave authored Jan 9, 2025
1 parent 8f7fd3d commit addcc02
Show file tree
Hide file tree
Showing 19 changed files with 1,439 additions and 42 deletions.
24 changes: 17 additions & 7 deletions kcwarden/api/auditor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,30 @@ def _get_ignore_list(self) -> list[str]:
assert isinstance(ignore_dict, dict)
return ignore_dict.get(self.get_classname(), [])

# Check if the provided object is a Client.
# If so, check if the client is disabled and if the settings say that
# inactive clients should be ignored.
def is_ignored_disabled_client(self, keycloak_object: Dataclass) -> bool:
if isinstance(keycloak_object, Client):
# If it is enabled, it should always be considered
if keycloak_object.is_enabled():
return False
# Otherwise, it should be considered if "ignore disabled clients" is not set
return self.get_config(config_keys.IGNORE_DISABLED_CLIENTS)
# If it is not a client, return False either way
return False

# More generic ignores (also calls specific ignore list from config)
def is_not_ignored(self, keycloak_object: Dataclass) -> bool:
# Check if the provided object should be considered, based on the audit configuration.
# If the object is in the explicit ignore list for the auditor, it should always be ignored.
if helper.matches_list_of_regexes(keycloak_object.get_name(), self._get_ignore_list()):
return False

# Checks for clients:
if isinstance(keycloak_object, Client):
# If it is enabled, it should always be considered
if keycloak_object.is_enabled():
return True
# Otherwise, it should be considered if "ignore disabled clients" is not set
return not self.get_config(config_keys.IGNORE_DISABLED_CLIENTS)
# Checks if it is a client that should be ignored because it is disabled
if self.is_ignored_disabled_client(keycloak_object):
return False

# Anything that doesn't have specific ignore rules associated with it is always considered.
return True

Expand Down
1 change: 0 additions & 1 deletion kcwarden/database/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ def regex_matches_list_entry(pattern_string: str, string_list: list[str]) -> boo
def retrieve_roles_from_db_with_regex(
db: Database, role_client: str, role_name: str
) -> list[RealmRole] | list[ClientRole]:
# TODO rewrite with regex support
if role_client is None or role_client.lower() == "realm":
return [role for role in db.get_all_realm_roles() if matches_as_string_or_regex(role.get_name(), role_name)]
return [
Expand Down
17 changes: 9 additions & 8 deletions kcwarden/database/in_memory_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@


class InMemoryDatabase(Database):
CLIENTS = {}
SCOPES = {}
SERVICE_ACCOUNTS = {}
GROUPS = {}
REALMS = {}
REALM_ROLES = {}
CLIENT_ROLES = {}
IDENTITY_PROVIDERS = {}
def __init__(self):
self.CLIENTS = {}
self.SCOPES = {}
self.SERVICE_ACCOUNTS = {}
self.GROUPS = {}
self.REALMS = {}
self.REALM_ROLES = {}
self.CLIENT_ROLES = {}
self.IDENTITY_PROVIDERS = {}

### "Adders"
def add_realm(self, realm: Realm):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ def _generate_additional_details(
)
return additional_details

def _should_consider_client(self, client: Client) -> bool:
# Ignore clients that are disabled, if the global setting says so
return not self.is_ignored_disabled_client(client)

def audit(self):
custom_config = self.get_custom_config()
for monitor_definition in custom_config:
Expand All @@ -87,6 +91,8 @@ def audit(self):
for client in self._DB.get_all_clients():
if helper.matches_list_of_regexes(client.get_name(), allowed_clients):
continue
if not self._should_consider_client(client):
continue
# First, find all directly defined ProtocolMappers
for mapper in client.get_protocol_mappers():
if self._protocol_mapper_matches_config(mapper, monitored_mapper_type, matched_config):
Expand Down
22 changes: 16 additions & 6 deletions kcwarden/monitors/service_account/service_account_with_group.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from kcwarden.api import Monitor
from kcwarden.custom_types.result import Severity
from kcwarden.custom_types.keycloak_object import ServiceAccount, Client
from kcwarden.database import helper


Expand All @@ -21,6 +22,14 @@ class ServiceAccountWithGroup(Monitor):
HAS_CUSTOM_CONFIG = True
CUSTOM_CONFIG_TEMPLATE = {"group": "/group path or regular expression", "allow_no_group": True}

def _should_consider_service_account(
self, service_account: ServiceAccount, allowed_service_accounts: list[str]
) -> bool:
if not helper.matches_list_of_regexes(service_account.get_username(), allowed_service_accounts):
client: Client = self._DB.get_client(service_account.get_client_id())
return not self.is_ignored_disabled_client(client)
return False

def audit(self):
custom_config = self.get_custom_config()
for monitor_definition in custom_config:
Expand All @@ -34,6 +43,8 @@ def audit(self):
continue

for saccount in self._DB.get_all_service_accounts():
if not self._should_consider_service_account(saccount, allowed_service_accounts):
continue
assigned_groups = saccount.get_groups()

if not allow_no_group and assigned_groups == []:
Expand All @@ -45,9 +56,8 @@ def audit(self):
continue

if helper.regex_matches_list_entry(monitored_group, assigned_groups):
if not helper.matches_list_of_regexes(saccount.get_username(), allowed_service_accounts):
yield self.generate_finding_with_severity_from_config(
saccount,
monitor_definition,
additional_details={"monitored_group": monitored_group, "assigned_groups": assigned_groups},
)
yield self.generate_finding_with_severity_from_config(
saccount,
monitor_definition,
additional_details={"monitored_group": monitored_group, "assigned_groups": assigned_groups},
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from kcwarden.api import Monitor
from kcwarden.custom_types.result import Severity
from kcwarden.custom_types.keycloak_object import Client, ServiceAccount
from kcwarden.database import helper


Expand All @@ -25,6 +26,14 @@ class ServiceAccountWithSensitiveRole(Monitor):
"role-client": "Client name (set to 'realm' for realm roles). No regular expression support",
}

def _should_consider_service_account(
self, serviceaccount: ServiceAccount, allowed_service_accounts: list[str]
) -> bool:
if not helper.matches_list_of_regexes(serviceaccount.get_username(), allowed_service_accounts):
client: Client = self._DB.get_client(serviceaccount.get_client_id())
return not self.is_ignored_disabled_client(client)
return False

def audit(self):
custom_config = self.get_custom_config()
for monitor_definition in custom_config:
Expand All @@ -50,7 +59,7 @@ def audit(self):
# Next, we need to find all service accounts that have at least one of these roles
for considered_role in final_roles:
for serviceaccount in helper.get_service_accounts_with_role(self._DB, considered_role):
if not helper.matches_list_of_regexes(serviceaccount.get_name(), allowed_service_accounts):
if self._should_consider_service_account(serviceaccount, allowed_service_accounts):
yield self.generate_finding_with_severity_from_config(
serviceaccount,
monitor_definition,
Expand All @@ -66,7 +75,7 @@ def audit(self):
# And all service accounts that are in at least one of these groups
for considered_group in groups:
for serviceaccount in helper.get_service_accounts_in_group(self._DB, considered_group):
if not helper.matches_list_of_regexes(serviceaccount.get_name(), allowed_service_accounts):
if self._should_consider_service_account(serviceaccount, allowed_service_accounts):
yield self.generate_finding_with_severity_from_config(
serviceaccount,
monitor_definition,
Expand Down
145 changes: 130 additions & 15 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,21 @@
from unittest.mock import Mock

import pytest

from kcwarden.custom_types.database import Database
from kcwarden.custom_types.keycloak_object import Realm
import os

from kcwarden.custom_types.keycloak_object import (
Realm,
Client,
RealmRole,
ClientRole,
ProtocolMapper,
ClientScope,
Group,
ServiceAccount,
)
from kcwarden.database.in_memory_db import InMemoryDatabase
from kcwarden.custom_types.config_keys import AUDITOR_CONFIG
from kcwarden.database.importer import load_realm_dump

# Adapted from
# https://docs.pytest.org/en/latest/example/simple.html#control-skipping-of-tests-according-to-command-line-option
Expand Down Expand Up @@ -111,7 +121,7 @@ def database():

@pytest.fixture
def mock_database():
return mock.create_autospec(spec=Database, instance=True)
return mock.create_autospec(spec=InMemoryDatabase, instance=True)


# Mocked data objects
Expand All @@ -129,21 +139,38 @@ def mock_realm():


@pytest.fixture
def mock_scope():
scope = Mock()
return scope
def mock_scope(create_mock_scope):
return create_mock_scope(name="sensitive-scope")


@pytest.fixture
def create_mock_scope():
def _create_mock_scope(name="sensitive-scope", protocol_mappers=[], realm_roles=[], client_roles={}):
scope = Mock(spec=ClientScope)
scope.get_name.return_value = name
scope.get_protocol_mappers.return_value = protocol_mappers
scope.get_realm_roles.return_value = realm_roles
scope.get_client_roles.return_value = client_roles
return scope

return _create_mock_scope


@pytest.fixture
def mock_client(mock_realm):
client = Mock()
client = Mock(spec=Client)
client.get_name.return_value = "mock-test-client"
client.is_enabled.return_value = True
client.get_realm.return_value = mock_realm
client.get_default_client_scopes.return_value = []
client.get_optional_client_scopes.return_value = []
client.is_oidc_client.return_value = True
client.is_realm_specific_client.return_value = False
client.get_protocol_mappers.return_value = []
client.has_service_account_enabled.return_value = False
client.has_full_scope_allowed.return_value = False
client.get_directly_assigned_realm_roles.return_value = []
client.get_directly_assigned_client_roles.return_value = {}
return client


Expand All @@ -160,13 +187,53 @@ def confidential_client(mock_client):


@pytest.fixture
def mock_role():
role = Mock()
role.is_client_role.return_value = False
role.is_composite_role.return_value = False
role.get_composite_roles.return_value = {}
role.get_client_name.return_value = "realm"
return role
def mock_service_account():
service_account = Mock(spec=ServiceAccount)
service_account.get_username.return_value = "test-service-account"
service_account.get_client_id.return_value = "test-client-id"
service_account.get_realm_roles.return_value = ["test-realm-role"]
service_account.get_client_roles.return_value = {"test-client": ["test-client-role"]}
service_account.get_groups.return_value = ["test-group"]
return service_account


@pytest.fixture
def mock_role(create_mock_role):
return create_mock_role(role_name="mock-role", client="realm")


@pytest.fixture
def create_mock_role(mock_realm):
# Fixture factory pattern, so we can create more than one role in our tests
def _create_mock_role(role_name, client="realm", composite=[]):
if client == "realm":
role = Mock(spec=RealmRole)
role.is_client_role.return_value = False
else:
role = Mock(spec=ClientRole)
role.is_client_role.return_value = True
role.get_client_name.return_value = client
if len(composite) > 0:
role.is_composite_role.return_value = True
comp_map = {}
for c_role in composite:
if c_role.is_client_role():
if c_role.get_client_name() not in comp_map:
comp_map[c_role.get_client_name()] = []
comp_map[c_role.get_client_name()].append(c_role.get_name())
else:
if "realm" not in comp_map:
comp_map["realm"] = []
comp_map["realm"].append(c_role.get_name())
role.get_composite_roles.return_value = comp_map
else:
role.is_composite_role.return_value = False
role.get_composite_roles.return_value = {}
role.get_name.return_value = role_name
role.get_realm.return_value = mock_realm
return role

return _create_mock_role


@pytest.fixture
Expand All @@ -180,3 +247,51 @@ def mock_client_role(mock_role):
def mock_composite_role(mock_role):
mock_role.is_composite_role.return_value = True
return mock_role


@pytest.fixture
def mock_protocol_mapper(create_mock_protocol_mapper):
return create_mock_protocol_mapper(
mapper_type="oidc-usermodel-attribute-mapper",
config={"userinfo.token.claim": "true", "user.attribute": "email"},
)


@pytest.fixture
def create_mock_protocol_mapper():
def _create_mock_protocol_mapper(
mapper_type="oidc-usermodel-attribute-mapper",
config={"userinfo.token.claim": "true", "user.attribute": "email"},
):
mapper = Mock(spec=ProtocolMapper)
mapper.get_protocol_mapper.return_value = mapper_type
mapper.get_config.return_value = config
return mapper

return _create_mock_protocol_mapper


@pytest.fixture
def mock_group(mock_realm):
group = Mock(spec=Group)
group.get_path.return_value = "/test-group"
group.get_name.return_value = "test-group"
group.get_realm_roles.return_value = []
group.get_client_roles.return_value = {}
group.get_effective_realm_roles.return_value = []
group.get_effective_client_roles.return_value = {}
group.get_realm.return_value = mock_realm
return group


# Loader for example realm dump
@pytest.fixture
def example_db():
# Load example realm file from disk
current_dir = os.path.dirname(os.path.abspath(__file__))
test_json_path = os.path.normpath(os.path.join(current_dir, "fixtures", "test-realm-with-client.json"))

# Create database and import realm into it
db = InMemoryDatabase()
load_realm_dump(test_json_path, db)
return db
Loading

0 comments on commit addcc02

Please sign in to comment.