Skip to content

Commit

Permalink
fixed UI and added unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthew Fortunka committed Feb 7, 2025
1 parent 1addc5e commit 685c92e
Show file tree
Hide file tree
Showing 7 changed files with 360 additions and 53 deletions.
18 changes: 18 additions & 0 deletions api_app/api/routes/workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@
from models.domain.request_action import RequestAction
from services.logging import logger

from core.config import USER_MANAGEMENT_ENABLED


workspaces_core_router = APIRouter(dependencies=[Depends(get_current_tre_user_or_tre_admin)])
workspaces_shared_router = APIRouter(dependencies=[Depends(get_current_workspace_owner_or_researcher_user_or_airlock_manager_or_tre_admin)])
workspace_services_workspace_router = APIRouter(dependencies=[Depends(get_current_workspace_owner_or_researcher_user_or_airlock_manager)])
user_resources_workspace_router = APIRouter(dependencies=[Depends(get_current_workspace_owner_or_researcher_user_or_airlock_manager)])

def _is_user_management_enabled():
return USER_MANAGEMENT_ENABLED

def validate_user_has_valid_role_for_user_resource(user, user_resource):
if "WorkspaceOwner" in user.roles:
Expand Down Expand Up @@ -547,6 +551,11 @@ async def retrieve_user_resource_history_by_user_resource_id(user_resource=Depen

@workspaces_shared_router.get("/workspaces/{workspace_id}/assignable-users", response_model=AssignableUsersInResponse, name=strings.API_GET_ASSIGNABLE_USERS)
async def get_assignable_users(workspace=Depends(get_workspace_by_id_from_path)) -> AssignableUsersInResponse:
if _is_user_management_enabled() is False:
logger.exception("Getting Assignable Users failed - User management is disabled. Enable via the USER_MANAGEMENT_ENABLED environment variable.")
raise HTTPException(status_code=status.HTTP_405_METHOD_NOT_ALLOWED, detail=strings.USER_MANAGEMENT_DISABLED)


access_service = get_access_service()
assignable_users = access_service.get_assignable_users()
return AssignableUsersInResponse(assignable_users=assignable_users)
Expand All @@ -560,6 +569,10 @@ async def get_workspace_roles(workspace=Depends(get_workspace_by_id_from_path))

@workspaces_shared_router.post("/workspaces/{workspace_id}/users/assign", status_code=status.HTTP_202_ACCEPTED, name=strings.API_ASSIGN_WORKSPACE_USER)
async def assign_workspace_user(response: Response, user_email: str, role_name: str, workspace=Depends(get_workspace_by_id_from_path)) -> UsersInResponse:
if _is_user_management_enabled() is False:
logger.exception("User assignment failed - User management is disabled. Enable via the USER_MANAGEMENT_ENABLED environment variable.")
raise HTTPException(status_code=status.HTTP_405_METHOD_NOT_ALLOWED, detail=strings.USER_MANAGEMENT_DISABLED)

access_service = get_access_service()

user = access_service.get_user_by_email(user_email)
Expand All @@ -581,6 +594,11 @@ async def remove_workspace_user_assignment(user_email: str,
role_name: str,
workspace=Depends(get_workspace_by_id_from_path)) -> UsersInResponse:

if _is_user_management_enabled() is False:
logger.exception("User de-assignment failed - User management is disabled. Enable via the USER_MANAGEMENT_ENABLED environment variable.")
raise HTTPException(status_code=status.HTTP_405_METHOD_NOT_ALLOWED, detail=strings.USER_MANAGEMENT_DISABLED)


access_service = get_access_service()

user = access_service.get_user_by_email(user_email)
Expand Down
3 changes: 3 additions & 0 deletions api_app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,6 @@
ENABLE_AIRLOCK_EMAIL_CHECK: bool = config("ENABLE_AIRLOCK_EMAIL_CHECK", cast=bool, default=False)

API_ROOT_SCOPE: str = f"api://{API_CLIENT_ID}/user_impersonation"

# User Management
USER_MANAGEMENT_ENABLED: bool = config("USER_MANAGEMENT_ENABLED", cast=bool, default=False)
3 changes: 3 additions & 0 deletions api_app/resources/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,6 @@

# Value that a sensitive is replaced with in Cosmos
REDACTED_SENSITIVE_VALUE = "REDACTED"

# User Management
USER_MANAGEMENT_DISABLED = "User management is disabled"
242 changes: 240 additions & 2 deletions api_app/tests_ma/test_api/test_routes/test_workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from db.errors import EntityDoesNotExist
from db.repositories.workspaces import WorkspaceRepository
from db.repositories.workspace_services import WorkspaceServiceRepository
from models.domain.authentication import RoleAssignment
from models.domain.authentication import RoleAssignment, User, Role
from models.domain.operation import Operation, OperationStep, Status
from models.domain.resource import ResourceHistoryItem, ResourceType
from models.domain.user_resource import UserResource
Expand Down Expand Up @@ -88,7 +88,8 @@ def sample_workspace(workspace_id=WORKSPACE_ID, auth_info: dict = {}) -> Workspa
etag="",
properties={
"client_id": "12345",
"scope_id": "test_scope_id"
"scope_id": "test_scope_id",
"sp_id": "test_sp_id"
},
resourcePath=f'/workspaces/{workspace_id}',
updatedWhen=FAKE_CREATE_TIMESTAMP,
Expand Down Expand Up @@ -1677,3 +1678,240 @@ async def test_get_workspace_users_returns_users(self, _, auth_class, app, clien

assert response.status_code == status.HTTP_200_OK
assert response.json()["users"] == users

@pytest.mark.parametrize("auth_class", ["aad_authentication.AzureADAuthorization"])
@patch("api.dependencies.workspaces.WorkspaceRepository.get_workspace_by_id", return_value=sample_workspace())
@patch("api.routes.workspaces._is_user_management_enabled", return_value=False)
async def test_assign_workspace_user_fails_if_feature_is_disabled(self, _, get_workspace_by_id_mock, auth_class, app, client):
with patch(f"services.{auth_class}.get_user_by_email") as get_user_by_email_mock, \
patch(f"services.{auth_class}.get_workspace_role_by_name") as get_workspace_role_by_name_mock, \
patch(f"services.{auth_class}.assign_workspace_user") as assign_workspace_user_mock, \
patch(f"services.{auth_class}.get_workspace_users") as get_workspace_users_mock:

workspace = get_workspace_by_id_mock.return_value

user = {
"id": "123",
"name": "John Doe",
"email": "[email protected]",
"roles": ["WorkspaceOwner", "WorkspaceResearcher"],
"roleAssignments": []
}

users = [user]

role_name_to_assign = "AirlockManager"
role = {"id": "test_role_id"}

get_user_by_email_mock.return_value = User.parse_obj(user)
get_workspace_role_by_name_mock.return_value = role
get_workspace_users_mock.return_value = users

response = await client.post(app.url_path_for(strings.API_ASSIGN_WORKSPACE_USER, workspace_id=WORKSPACE_ID), params={"user_email": user["email"], "role_name": role_name_to_assign})
assert response.status_code == status.HTTP_405_METHOD_NOT_ALLOWED


@pytest.mark.parametrize("auth_class", ["aad_authentication.AzureADAuthorization"])
@patch("api.dependencies.workspaces.WorkspaceRepository.get_workspace_by_id", return_value=sample_workspace())
@patch("api.routes.workspaces._is_user_management_enabled", return_value=True)
async def test_assign_workspace_user_assigns_workspace_user(self, _, get_workspace_by_id_mock, auth_class, app, client):
with patch(f"services.{auth_class}.get_user_by_email") as get_user_by_email_mock, \
patch(f"services.{auth_class}.get_workspace_role_by_name") as get_workspace_role_by_name_mock, \
patch(f"services.{auth_class}.assign_workspace_user") as assign_workspace_user_mock, \
patch(f"services.{auth_class}.get_workspace_users") as get_workspace_users_mock:

workspace = get_workspace_by_id_mock.return_value

user = {
"id": "123",
"name": "John Doe",
"email": "[email protected]",
"roles": ["WorkspaceOwner", "WorkspaceResearcher"],
"roleAssignments": []
}

users = [user]

role_name_to_assign = "AirlockManager"
role = {"id": "test_role_id"}

get_user_by_email_mock.return_value = User.parse_obj(user)
get_workspace_role_by_name_mock.return_value = role
get_workspace_users_mock.return_value = users

response = await client.post(app.url_path_for(strings.API_ASSIGN_WORKSPACE_USER, workspace_id=WORKSPACE_ID), params={"user_email": user["email"], "role_name": role_name_to_assign})
assert response.status_code == status.HTTP_202_ACCEPTED

get_user_by_email_mock.assert_called_once_with(user["email"])
get_workspace_role_by_name_mock.assert_called_once_with(role_name_to_assign, workspace)
assign_workspace_user_mock.assert_called_once_with(user, workspace, role)
get_workspace_users_mock.assert_called_once()

assert response.json()["users"] == users

@pytest.mark.parametrize("auth_class", ["aad_authentication.AzureADAuthorization"])
@patch("api.dependencies.workspaces.WorkspaceRepository.get_workspace_by_id", return_value=sample_workspace())
@patch("api.routes.workspaces._is_user_management_enabled", return_value=False)
async def test_remove_workspace_user_assignment_fails_if_feature_is_disabled(self, _, get_workspace_by_id_mock, auth_class, app, client):
with patch(f"services.{auth_class}.remove_workspace_role_user_assignment") as remove_workspace_role_user_assignment_mock, \
patch(f"services.{auth_class}.get_user_by_email") as get_user_by_email_mock, \
patch(f"services.{auth_class}.get_workspace_role_by_name") as get_workspace_role_by_name_mock, \
patch(f"services.{auth_class}.get_workspace_users") as get_workspace_users_mock:

workspace = get_workspace_by_id_mock.return_value

user = {
"id": "123",
"name": "John Doe",
"email": "[email protected]",
"roles": ["WorkspaceOwner", "WorkspaceResearcher"],
"roleAssignments": []
}

role_name_to_deassign = "WorkspaceResearcher"
role = {"id": "test_role_id"}

get_user_by_email_mock.return_value = User.parse_obj(user)
get_workspace_role_by_name_mock.return_value = role

user["roles"].remove(role_name_to_deassign)
users = [user]

get_workspace_users_mock.return_value = users

response = await client.delete(app.url_path_for(strings.API_ASSIGN_WORKSPACE_USER, workspace_id=WORKSPACE_ID), params={"user_email": user["email"], "role_name": role_name_to_deassign})
assert response.status_code == status.HTTP_405_METHOD_NOT_ALLOWED


@pytest.mark.parametrize("auth_class", ["aad_authentication.AzureADAuthorization"])
@patch("api.dependencies.workspaces.WorkspaceRepository.get_workspace_by_id", return_value=sample_workspace())
@patch("api.routes.workspaces._is_user_management_enabled", return_value=True)
async def test_remove_workspace_user_assignment_removes_workspace_user_assignment(self, _, get_workspace_by_id_mock, auth_class, app, client):
with patch(f"services.{auth_class}.remove_workspace_role_user_assignment") as remove_workspace_role_user_assignment_mock, \
patch(f"services.{auth_class}.get_user_by_email") as get_user_by_email_mock, \
patch(f"services.{auth_class}.get_workspace_role_by_name") as get_workspace_role_by_name_mock, \
patch(f"services.{auth_class}.get_workspace_users") as get_workspace_users_mock:

workspace = get_workspace_by_id_mock.return_value

user = {
"id": "123",
"name": "John Doe",
"email": "[email protected]",
"roles": ["WorkspaceOwner", "WorkspaceResearcher"],
"roleAssignments": []
}

role_name_to_deassign = "WorkspaceResearcher"
role = {"id": "test_role_id"}

get_user_by_email_mock.return_value = User.parse_obj(user)
get_workspace_role_by_name_mock.return_value = role

user["roles"].remove(role_name_to_deassign)
users = [user]

get_workspace_users_mock.return_value = users

response = await client.delete(app.url_path_for(strings.API_ASSIGN_WORKSPACE_USER, workspace_id=WORKSPACE_ID), params={"user_email": user["email"], "role_name": role_name_to_deassign})
assert response.status_code == status.HTTP_202_ACCEPTED

get_user_by_email_mock.assert_called_once_with(user["email"])
get_workspace_role_by_name_mock.assert_called_once_with(role_name_to_deassign, workspace)
remove_workspace_role_user_assignment_mock.assert_called_once_with(get_user_by_email_mock.return_value, role, workspace)
get_workspace_users_mock.assert_called_once()

assert response.json()["users"] == users

@pytest.mark.parametrize("auth_class", ["aad_authentication.AzureADAuthorization"])
@patch("api.dependencies.workspaces.WorkspaceRepository.get_workspace_by_id", return_value=sample_workspace())
@patch("api.routes.workspaces._is_user_management_enabled", return_value=False)
async def test_get_assignable_users_fails_if_feature_is_disabled(self, _, get_workspace_by_id_mock, auth_class, app, client):
with patch(f"services.{auth_class}.get_assignable_users") as get_assignable_users_mock:
assignable_users = [
{
"name": "John Doe",
"email": "[email protected]",
},
{
"name": "Jane Smith",
"email": "[email protected]",
}
]

get_assignable_users_mock.return_value = assignable_users

response = await client.get(app.url_path_for(strings.API_GET_ASSIGNABLE_USERS, workspace_id=WORKSPACE_ID))

assert response.status_code == status.HTTP_405_METHOD_NOT_ALLOWED

@pytest.mark.parametrize("auth_class", ["aad_authentication.AzureADAuthorization"])
@patch("api.dependencies.workspaces.WorkspaceRepository.get_workspace_by_id", return_value=sample_workspace())
@patch("api.routes.workspaces._is_user_management_enabled", return_value=True)
async def test_get_assignable_users_returns_assignable_users(self, _, get_workspace_by_id_mock, auth_class, app, client):
with patch(f"services.{auth_class}.get_assignable_users") as get_assignable_users_mock:
assignable_users = [
{
"name": "John Doe",
"email": "[email protected]",
},
{
"name": "Jane Smith",
"email": "[email protected]",
}
]

get_assignable_users_mock.return_value = assignable_users

response = await client.get(app.url_path_for(strings.API_GET_ASSIGNABLE_USERS, workspace_id=WORKSPACE_ID))

assert response.status_code == status.HTTP_200_OK
assert response.json()["assignable_users"] == assignable_users


@pytest.mark.parametrize("auth_class", ["aad_authentication.AzureADAuthorization"])
@patch("api.dependencies.workspaces.WorkspaceRepository.get_workspace_by_id", return_value=sample_workspace())
async def test_get_workspace_roles_returns_workspace_roles(self, get_workspace_by_id_mock, auth_class, app, client):
with patch(f"services.{auth_class}.get_workspace_roles") as get_workspace_roles_mock:
workspace_roles = [
Role(
id="1",
value="AirlockManager",
isEnabled=True,
email=None,
allowedMemberTypes=["Application", "User"],
description="Provides airlock managers access to the Workspace and ability to review airlock requests.",
displayName="Airlock Manager",
origin="Application",
roleAssignments=[],
),
Role(
id="2",
value="WorkspaceResearcher",
isEnabled=True,
email=None,
allowedMemberTypes=["Application", "User"],
description="Provides researchers access to the Workspace.",
displayName="Workspace Researcher",
origin="Application",
roleAssignments=[],
),
Role(
id="3",
value="WorkspaceOwner",
isEnabled=True,
email=None,
allowedMemberTypes=["Application", "User"],
description="Provides workspace owners access to the Workspace.",
displayName="Workspace Owner",
origin="Application",
roleAssignments=[],
),
]

get_workspace_roles_mock.return_value = workspace_roles

response = await client.get(app.url_path_for(strings.API_GET_WORKSPACE_ROLES, workspace_id=WORKSPACE_ID))

assert response.status_code == status.HTTP_200_OK
assert response.json()["roles"] == workspace_roles
Loading

0 comments on commit 685c92e

Please sign in to comment.