Skip to content

Commit

Permalink
CASMCMS-9269: Improve Python type annotations in session templates co…
Browse files Browse the repository at this point in the history
…ntroller code
  • Loading branch information
mharding-hpe committed Feb 5, 2025
1 parent 059271a commit fd99ce3
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 60 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed
- CASMCMS-9265: Improve Python type annotations in sessions controller code
- CASMCMS-9269: Improve Python type annotations in session templates controller code

### Fixed
- CASMCMS-8965: Update sessions controller to provide error responses consistently and correctly
Expand Down
20 changes: 20 additions & 0 deletions src/bos/server/controllers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from urllib.parse import urlparse, urlunparse

import connexion
from connexion.lifecycle import ConnexionResponse
import flask

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -73,3 +74,22 @@ def url_for(endpoint, **values):
# TODO(CASMCMS-1869): there might be a better way to do this by overriding
# url_adapter in the context or request, see
# https://github.com/pallets/flask/blob/a74864ec229141784374f1998324d2cbac837295/flask/helpers.py#L302


def _400_bad_request(msg: str) -> ConnexionResponse:
"""
ProblemBadRequest
"""
return connexion.problem(
status=400,
title="Bad Request",
detail=msg)

def _404_resource_not_found(resource_type: str, resource_id: str) -> ConnexionResponse:
"""
ProblemResourceNotFound
"""
return connexion.problem(
status=404,
title="The resource was not found",
detail=f"{resource_type} '{resource_id}' does not exist")
30 changes: 8 additions & 22 deletions src/bos/server/controllers/v2/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#
from collections import defaultdict, Counter
from datetime import datetime, timedelta
from functools import partial
import logging
import re
from typing import Literal, Optional
Expand All @@ -37,6 +38,7 @@
from bos.common.utils import exc_type_msg, get_current_time, get_current_timestamp, load_timestamp
from bos.common.values import Phase, Status
from bos.server import redis_db_utils as dbutils
from bos.server.controllers.utils import _400_bad_request, _404_resource_not_found
from bos.server.controllers.v2.boot_set import BootSetStatus, validate_boot_sets
from bos.server.controllers.v2.components import get_v2_components_data
from bos.server.controllers.v2.options import OptionsData
Expand Down Expand Up @@ -167,7 +169,7 @@ def patch_v2_session(session_id: str) -> tuple[JsonDict, Literal[200]] | Connexi
session_key = get_tenant_aware_key(session_id, get_tenant_from_header())
if session_key not in DB:
LOGGER.warning("Could not find v2 session %s", session_id)
return _404_session_not_found(session_id)
return _404_session_not_found(resource_id=session_id) # pylint: disable=redundant-keyword-arg

component = DB.patch(session_key, patch_data_json)
return component, 200
Expand All @@ -187,7 +189,7 @@ def get_v2_session(
session_key = get_tenant_aware_key(session_id, get_tenant_from_header())
if session_key not in DB:
LOGGER.warning("Could not find v2 session %s", session_id)
return _404_session_not_found(session_id)
return _404_session_not_found(resource_id=session_id) # pylint: disable=redundant-keyword-arg
session = DB.get(session_key)
return session, 200

Expand Down Expand Up @@ -223,7 +225,7 @@ def delete_v2_session(
session_key = get_tenant_aware_key(session_id, get_tenant_from_header())
if session_key not in DB:
LOGGER.warning("Could not find v2 session %s", session_id)
return _404_session_not_found(session_id)
return _404_session_not_found(resource_id=session_id) # pylint: disable=redundant-keyword-arg
if session_key in STATUS_DB:
STATUS_DB.delete(session_key)
return DB.delete(session_key), 204
Expand Down Expand Up @@ -270,7 +272,7 @@ def get_v2_session_status(
session_key = get_tenant_aware_key(session_id, get_tenant_from_header())
if session_key not in DB:
LOGGER.warning("Could not find v2 session %s", session_id)
return _404_session_not_found(session_id)
return _404_session_not_found(resource_id=session_id) # pylint: disable=redundant-keyword-arg
session = DB.get(session_key)
if session.get(
"status",
Expand All @@ -296,7 +298,7 @@ def save_v2_session_status(
session_key = get_tenant_aware_key(session_id, get_tenant_from_header())
if session_key not in DB:
LOGGER.warning("Could not find v2 session %s", session_id)
return _404_session_not_found(session_id)
return _404_session_not_found(resource_id=session_id) # pylint: disable=redundant-keyword-arg
return STATUS_DB.put(session_key, _get_v2_session_status(session_key)), 200


Expand Down Expand Up @@ -440,24 +442,8 @@ def _age_to_timestamp(age: str) -> datetime:
return get_current_time() - delta


def _400_bad_request(msg: str) -> ConnexionResponse:
"""
ProblemBadRequest
"""
return connexion.problem(
status=400,
title="Bad Request",
detail=msg)

_404_session_not_found = partial(_404_resource_not_found, resource_type="Session")

def _404_session_not_found(session_id: str) -> ConnexionResponse:
"""
ProblemResourceNotFound
"""
return connexion.problem(
status=404,
title="The resource was not found",
detail=f"Session '{session_id}' does not exist")

def _409_session_already_exists(session_id: str) -> ConnexionResponse:
"""
Expand Down
65 changes: 27 additions & 38 deletions src/bos/server/controllers/v2/sessiontemplates.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# MIT License
#
# (C) Copyright 2021-2024 Hewlett Packard Enterprise Development LP
# (C) Copyright 2021-2025 Hewlett Packard Enterprise Development LP
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
Expand All @@ -21,13 +21,18 @@
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
from functools import partial
import logging
import connexion
from typing import Literal, Optional

from connexion.lifecycle import ConnexionResponse

from bos.common.tenant_utils import get_tenant_from_header, get_tenant_aware_key, \
reject_invalid_tenant
from bos.common.types import JsonDict
from bos.common.utils import exc_type_msg
from bos.server import redis_db_utils as dbutils
from bos.server.controllers.utils import _400_bad_request, _404_resource_not_found
from bos.server.schema import validator
from bos.server.utils import get_request_json
from .boot_set import validate_boot_sets, validate_sanitize_boot_sets
Expand Down Expand Up @@ -63,7 +68,7 @@

@reject_invalid_tenant
@dbutils.redis_error_handler
def put_v2_sessiontemplate(session_template_id): # noqa: E501
def put_v2_sessiontemplate(session_template_id: str) -> tuple[JsonDict, Literal[200]] | ConnexionResponse: # noqa: E501
"""PUT /v2/sessiontemplates
Creates a new session template. # noqa: E501
Expand All @@ -75,20 +80,15 @@ def put_v2_sessiontemplate(session_template_id): # noqa: E501
except Exception as err:
LOGGER.error("Error parsing PUT '%s' request data: %s",
session_template_id, exc_type_msg(err))
return connexion.problem(status=400,
title="Error parsing the data provided.",
detail=str(err))
return _400_bad_request(f"Error parsing the data provided: {err}")

try:
validate_sanitize_session_template(session_template_id, template_data)
except Exception as err:
LOGGER.error("Error creating session template '%s': %s",
session_template_id, exc_type_msg(err))
LOGGER.debug("Full template: %s", template_data)
return connexion.problem(
status=400,
title="The session template could not be created.",
detail=str(err))
return _400_bad_request(f"The session template could not be created: {err}")

tenant = get_tenant_from_header()
template_data['tenant'] = tenant
Expand All @@ -97,7 +97,7 @@ def put_v2_sessiontemplate(session_template_id): # noqa: E501


@dbutils.redis_error_handler
def get_v2_sessiontemplates(): # noqa: E501
def get_v2_sessiontemplates() -> tuple[list[JsonDict], Literal[200]]: # noqa: E501
"""
GET /v2/sessiontemplates
Expand All @@ -111,7 +111,7 @@ def get_v2_sessiontemplates(): # noqa: E501


@dbutils.redis_error_handler
def get_v2_sessiontemplate(session_template_id):
def get_v2_sessiontemplate(session_template_id: str) -> tuple[JsonDict, Literal[200]] | ConnexionResponse:
"""
GET /v2/sessiontemplates
Expand All @@ -123,16 +123,13 @@ def get_v2_sessiontemplate(session_template_id):
get_tenant_from_header())
if template_key not in DB:
LOGGER.warning("Session template not found: %s", session_template_id)
return connexion.problem(
status=404,
title="Sessiontemplate could not found.",
detail=f"Sessiontemplate {session_template_id} could not be found")
return _404_template_not_found(resource_id=session_template_id) # pylint: disable=redundant-keyword-arg
template = DB.get(template_key)
return template, 200


@dbutils.redis_error_handler
def get_v2_sessiontemplatetemplate():
def get_v2_sessiontemplatetemplate() -> tuple[JsonDict, Literal[200]]:
"""
GET /v2/sessiontemplatetemplate
Expand All @@ -145,7 +142,7 @@ def get_v2_sessiontemplatetemplate():


@dbutils.redis_error_handler
def delete_v2_sessiontemplate(session_template_id):
def delete_v2_sessiontemplate(session_template_id: str) -> tuple[None, Literal[204]] | ConnexionResponse:
"""
DELETE /v2/sessiontemplates
Expand All @@ -158,15 +155,12 @@ def delete_v2_sessiontemplate(session_template_id):
get_tenant_from_header())
if template_key not in DB:
LOGGER.warning("Session template not found: %s", session_template_id)
return connexion.problem(
status=404,
title="Sessiontemplate could not found.",
detail=f"Sessiontemplate {session_template_id} could not be found")
return _404_template_not_found(resource_id=session_template_id) # pylint: disable=redundant-keyword-arg
return DB.delete(template_key), 204


@dbutils.redis_error_handler
def patch_v2_sessiontemplate(session_template_id):
def patch_v2_sessiontemplate(session_template_id: str) -> tuple[JsonDict, Literal[200]] | ConnexionResponse:
"""
PATCH /v2/sessiontemplates
Expand All @@ -179,35 +173,27 @@ def patch_v2_sessiontemplate(session_template_id):
get_tenant_from_header())
if template_key not in DB:
LOGGER.warning("Session template not found: %s", session_template_id)
return connexion.problem(
status=404,
title="Sessiontemplate could not found.",
detail=f"Sessiontemplate {session_template_id} could not be found")
return _404_template_not_found(resource_id=session_template_id) # pylint: disable=redundant-keyword-arg

try:
template_data = get_request_json()
except Exception as err:
LOGGER.error("Error parsing PATCH '%s' request data: %s",
session_template_id, exc_type_msg(err))
return connexion.problem(status=400,
title="Error parsing the data provided.",
detail=str(err))
return _400_bad_request(f"Error parsing the data provided: {err}")

try:
validate_sanitize_session_template(session_template_id, template_data)
except Exception as err:
LOGGER.error("Error patching session template '%s': %s",
session_template_id, exc_type_msg(err))
return connexion.problem(
status=400,
title="The session template could not be patched.",
detail=str(err))
return _400_bad_request(f"The session template could not be patched: {err}")

return DB.patch(template_key, template_data), 200


@dbutils.redis_error_handler
def validate_v2_sessiontemplate(session_template_id: str):
def validate_v2_sessiontemplate(session_template_id: str) -> tuple[str, Literal[200]] | ConnexionResponse:
"""
Validate a V2 session template. Look for missing elements or errors that would prevent
a session from being launched using this template.
Expand All @@ -230,20 +216,20 @@ def validate_v2_sessiontemplate(session_template_id: str):
return msg, 200


def _get_filtered_templates(tenant):
def _get_filtered_templates(tenant: Optional[str]) -> list[JsonDict]:
response = DB.get_all()
if any([tenant]):
response = [r for r in response if _matches_filter(r, tenant)]
return response


def _matches_filter(data, tenant):
def _matches_filter(data: JsonDict, tenant: str) -> bool:
if tenant and tenant != data.get("tenant"):
return False
return True


def validate_sanitize_session_template(session_template_id, template_data):
def validate_sanitize_session_template(session_template_id: str, template_data: JsonDict) -> None:
"""
Used when creating or patching session templates
"""
Expand All @@ -259,3 +245,6 @@ def validate_sanitize_session_template(session_template_id, template_data):
# validate_sanitize_boot_sets()
for bs in template_data["boot_sets"].values():
del bs["name"]


_404_template_not_found = partial(_404_resource_not_found, resource_type="Session template")

0 comments on commit fd99ce3

Please sign in to comment.