From 998c3d4eb97ae9daed35e9c12d022029101b0862 Mon Sep 17 00:00:00 2001 From: Yuval Yaron <43217306+yuvalyaron@users.noreply.github.com> Date: Tue, 11 Feb 2025 00:46:57 +0200 Subject: [PATCH 1/2] Remove public IP from TRE's firewall when forced tunneling is configured (#4346) * remove public IP from TRE's firewall when forced tunneling is configured * update changelog * update version as this change is not backward compatible * update template version --- CHANGELOG.md | 1 + templates/shared_services/firewall/porter.yaml | 2 +- templates/shared_services/firewall/terraform/firewall.tf | 5 +++-- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a3ea27a0ac..9979b55d2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ENHANCEMENTS: * Allow workspace App Service Plan SKU to be updated ([#4331](https://github.com/microsoft/AzureTRE/issues/4331)) +* Remove public IP from TRE's firewall when forced tunneling is configured ([#4346](https://github.com/microsoft/AzureTRE/pull/4346)) BUG FIXES: diff --git a/templates/shared_services/firewall/porter.yaml b/templates/shared_services/firewall/porter.yaml index ffba80504b..d5759218a7 100644 --- a/templates/shared_services/firewall/porter.yaml +++ b/templates/shared_services/firewall/porter.yaml @@ -1,7 +1,7 @@ --- schemaVersion: 1.0.0 name: tre-shared-service-firewall -version: 1.3.0 +version: 1.3.1 description: "An Azure TRE Firewall shared service" dockerfile: Dockerfile.tmpl registry: azuretre diff --git a/templates/shared_services/firewall/terraform/firewall.tf b/templates/shared_services/firewall/terraform/firewall.tf index 6697a359b6..3f42237a0f 100644 --- a/templates/shared_services/firewall/terraform/firewall.tf +++ b/templates/shared_services/firewall/terraform/firewall.tf @@ -1,4 +1,5 @@ resource "azurerm_public_ip" "fwtransit" { + count = var.firewall_force_tunnel_ip != "" ? 0 : 1 name = "pip-fw-${var.tre_id}" resource_group_name = local.core_resource_group_name location = data.azurerm_resource_group.rg.location @@ -11,7 +12,7 @@ resource "azurerm_public_ip" "fwtransit" { moved { from = azurerm_public_ip.fwpip - to = azurerm_public_ip.fwtransit + to = azurerm_public_ip.fwtransit[0] } resource "azurerm_public_ip" "fwmanagement" { @@ -38,7 +39,7 @@ resource "azurerm_firewall" "fw" { ip_configuration { name = "fw-ip-configuration" subnet_id = data.azurerm_subnet.firewall.id - public_ip_address_id = azurerm_public_ip.fwtransit.id + public_ip_address_id = var.firewall_force_tunnel_ip != "" ? null : azurerm_public_ip.fwtransit[0].id } dynamic "management_ip_configuration" { From a6d85e2600ffac2a3dcc9576607a637911e330ec Mon Sep 17 00:00:00 2001 From: Marcus Robinson Date: Mon, 10 Feb 2025 23:34:44 +0000 Subject: [PATCH 2/2] Fix upgrade if porter install has failed and add tests to resource processor (#4338) --- .devcontainer/devcontainer.json | 1 + CHANGELOG.md | 3 + airlock_processor/_version.py | 2 +- .../tests/shared_code}/__init__.py | 0 resource_processor/_version.py | 2 +- resource_processor/helpers/__init__.py | 0 .../{resources => helpers}/commands.py | 29 +- .../{resources => helpers}/httpserver.py | 0 .../{resources => helpers}/statuses.py | 2 +- .../{resources => helpers}/strings.py | 0 resource_processor/resources/helpers.py | 5 - resource_processor/tests_rp/test_commands.py | 94 ++++++ resource_processor/tests_rp/test_runner.py | 284 ++++++++++++++++++ resource_processor/vmss_porter/runner.py | 37 ++- 14 files changed, 427 insertions(+), 32 deletions(-) rename {resource_processor/resources => airlock_processor/tests/shared_code}/__init__.py (100%) create mode 100644 resource_processor/helpers/__init__.py rename resource_processor/{resources => helpers}/commands.py (89%) rename resource_processor/{resources => helpers}/httpserver.py (100%) rename resource_processor/{resources => helpers}/statuses.py (96%) rename resource_processor/{resources => helpers}/strings.py (100%) delete mode 100644 resource_processor/resources/helpers.py create mode 100644 resource_processor/tests_rp/test_commands.py create mode 100644 resource_processor/tests_rp/test_runner.py diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 90b080ddb1..e8966910e5 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -275,6 +275,7 @@ "extensions": [ "ms-python.python", "ms-python.pylance", + "ms-python.flake8", "hashicorp.terraform", "github.vscode-pull-request-github", "gitHub.copilot", diff --git a/CHANGELOG.md b/CHANGELOG.md index 9979b55d2c..9cf9862269 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ ENHANCEMENTS: * Remove public IP from TRE's firewall when forced tunneling is configured ([#4346](https://github.com/microsoft/AzureTRE/pull/4346)) BUG FIXES: +* Fix upgrade when porter install has failed ([#4338](https://github.com/microsoft/AzureTRE/pull/4338)) + + COMPONENTS: diff --git a/airlock_processor/_version.py b/airlock_processor/_version.py index deded3247f..732155f8df 100644 --- a/airlock_processor/_version.py +++ b/airlock_processor/_version.py @@ -1 +1 @@ -__version__ = "0.8.2" +__version__ = "0.8.3" diff --git a/resource_processor/resources/__init__.py b/airlock_processor/tests/shared_code/__init__.py similarity index 100% rename from resource_processor/resources/__init__.py rename to airlock_processor/tests/shared_code/__init__.py diff --git a/resource_processor/_version.py b/resource_processor/_version.py index fee46bd8ce..def467e071 100644 --- a/resource_processor/_version.py +++ b/resource_processor/_version.py @@ -1 +1 @@ -__version__ = "0.11.1" +__version__ = "0.12.1" diff --git a/resource_processor/helpers/__init__.py b/resource_processor/helpers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/resource_processor/resources/commands.py b/resource_processor/helpers/commands.py similarity index 89% rename from resource_processor/resources/commands.py rename to resource_processor/helpers/commands.py index 0111b52358..59c0eaea82 100644 --- a/resource_processor/resources/commands.py +++ b/resource_processor/helpers/commands.py @@ -4,14 +4,13 @@ import logging from urllib.parse import urlparse -from resources.helpers import get_installation_id from shared.logging import logger, shell_output_logger def azure_login_command(config): set_cloud_command = f"az cloud set --name {config['azure_environment']} >/dev/null " - if config["vmss_msi_id"]: + if config.get("vmss_msi_id"): # Use the Managed Identity when in VMSS context login_command = f"az login --identity -u {config['vmss_msi_id']} >/dev/null " @@ -23,7 +22,7 @@ def azure_login_command(config): def apply_porter_credentials_sets_command(config): - if config["vmss_msi_id"]: + if config.get("vmss_msi_id"): # Use the Managed Identity when in VMSS context porter_credential_sets = "porter credentials apply vmss_porter/arm_auth_local_debugging.json >/dev/null 2>&1 && porter credentials apply vmss_porter/aad_auth.json >/dev/null 2>&1" @@ -80,25 +79,31 @@ async def build_porter_command(config, msg_body, custom_action=False): val_base64_bytes = base64.b64encode(val_bytes) parameter_value = val_base64_bytes.decode("ascii") - porter_parameters = porter_parameters + f" --param {parameter_name}=\"{parameter_value}\"" + porter_parameters += f" --param {parameter_name}=\"{parameter_value}\"" - installation_id = get_installation_id(msg_body) + installation_id = msg_body['id'] command_line = [f"porter" # If a custom action (i.e. not install, uninstall, upgrade) we need to use 'invoke' - f"{' invoke --action' if custom_action else ''}" - f" {msg_body['action']} \"{installation_id}\"" - f" --reference {config['registry_server']}/{msg_body['name']}:v{msg_body['version']}" - f" {porter_parameters} --force" - f" --credential-set arm_auth" - f" --credential-set aad_auth" + f"{' invoke --action' if custom_action else ''} " + f"{msg_body['action']} \"{installation_id}\" " + f"--reference {config['registry_server']}/{msg_body['name']}:v{msg_body['version']}" + f"{porter_parameters} " + f"--force " + f"--credential-set arm_auth " + f"--credential-set aad_auth " ] + if msg_body['action'] == 'upgrade': + command_line[0] = command_line[0] + "--force-upgrade " + + command_line[0] = command_line[0].strip() + return command_line async def build_porter_command_for_outputs(msg_body): - installation_id = get_installation_id(msg_body) + installation_id = msg_body['id'] command_line = [f"porter installations output list --installation {installation_id} --output json"] return command_line diff --git a/resource_processor/resources/httpserver.py b/resource_processor/helpers/httpserver.py similarity index 100% rename from resource_processor/resources/httpserver.py rename to resource_processor/helpers/httpserver.py diff --git a/resource_processor/resources/statuses.py b/resource_processor/helpers/statuses.py similarity index 96% rename from resource_processor/resources/statuses.py rename to resource_processor/helpers/statuses.py index 952dcef24b..2e1941f482 100644 --- a/resource_processor/resources/statuses.py +++ b/resource_processor/helpers/statuses.py @@ -1,5 +1,5 @@ from collections import defaultdict -from resources import strings +from helpers import strings # Specify pass and fail status strings so we can return the right statuses to the api depending on the action type (with a default of custom action) diff --git a/resource_processor/resources/strings.py b/resource_processor/helpers/strings.py similarity index 100% rename from resource_processor/resources/strings.py rename to resource_processor/helpers/strings.py diff --git a/resource_processor/resources/helpers.py b/resource_processor/resources/helpers.py deleted file mode 100644 index 98ef4d2e0d..0000000000 --- a/resource_processor/resources/helpers.py +++ /dev/null @@ -1,5 +0,0 @@ -def get_installation_id(msg_body): - """ - This is used to identify each bundle install within the porter state store. - """ - return msg_body['id'] diff --git a/resource_processor/tests_rp/test_commands.py b/resource_processor/tests_rp/test_commands.py new file mode 100644 index 0000000000..bb2f9f20e0 --- /dev/null +++ b/resource_processor/tests_rp/test_commands.py @@ -0,0 +1,94 @@ +import json +import pytest +from unittest.mock import patch, AsyncMock +from helpers.commands import azure_login_command, apply_porter_credentials_sets_command, azure_acr_login_command, build_porter_command, build_porter_command_for_outputs, get_porter_parameter_keys + + +@pytest.fixture +def mock_get_porter_parameter_keys(): + with patch("helpers.commands.get_porter_parameter_keys", new_callable=AsyncMock) as mock: + yield mock + + +@pytest.mark.parametrize("config, expected_command", [ + ({"azure_environment": "AzureCloud", "vmss_msi_id": "msi_id"}, "az cloud set --name AzureCloud >/dev/null && az login --identity -u msi_id >/dev/null "), + ({"azure_environment": "AzureCloud", "arm_client_id": "client_id", "arm_client_secret": "client_secret", "arm_tenant_id": "tenant_id"}, "az cloud set --name AzureCloud >/dev/null && az login --service-principal --username client_id --password client_secret --tenant tenant_id >/dev/null") +]) +def test_azure_login_command(config, expected_command): + """Test azure_login_command function.""" + assert azure_login_command(config) == expected_command + + +@pytest.mark.parametrize("config, expected_command", [ + ({"vmss_msi_id": "msi_id"}, "porter credentials apply vmss_porter/arm_auth_local_debugging.json >/dev/null 2>&1 && porter credentials apply vmss_porter/aad_auth.json >/dev/null 2>&1"), + ({}, "porter credentials apply vmss_porter/arm_auth_local_debugging.json >/dev/null 2>&1 && porter credentials apply vmss_porter/aad_auth_local_debugging.json >/dev/null 2>&1") +]) +def test_apply_porter_credentials_sets_command(config, expected_command): + """Test apply_porter_credentials_sets_command function.""" + assert apply_porter_credentials_sets_command(config) == expected_command + + +@pytest.mark.parametrize("config, expected_command", [ + ({"registry_server": "myregistry.azurecr.io"}, "az acr login --name myregistry >/dev/null ") +]) +def test_azure_acr_login_command(config, expected_command): + """Test azure_acr_login_command function.""" + assert azure_acr_login_command(config) == expected_command + + +@pytest.mark.asyncio +async def test_build_porter_command(mock_get_porter_parameter_keys): + """Test build_porter_command function.""" + config = {"registry_server": "myregistry.azurecr.io"} + msg_body = {"id": "guid", "action": "install", "name": "mybundle", "version": "1.0.0", "parameters": {"param1": "value1"}} + mock_get_porter_parameter_keys.return_value = ["param1"] + + expected_command = [ + "porter install \"guid\" --reference myregistry.azurecr.io/mybundle:v1.0.0 --param param1=\"value1\" --force --credential-set arm_auth --credential-set aad_auth" + ] + + command = await build_porter_command(config, msg_body) + assert command == expected_command + + +@pytest.mark.asyncio +async def test_build_porter_command_for_upgrade(mock_get_porter_parameter_keys): + """Test build_porter_command function for upgrade action.""" + config = {"registry_server": "myregistry.azurecr.io"} + msg_body = {"id": "guid", "action": "upgrade", "name": "mybundle", "version": "1.0.0", "parameters": {"param1": "value1"}} + mock_get_porter_parameter_keys.return_value = ["param1"] + + expected_command = [ + "porter upgrade \"guid\" --reference myregistry.azurecr.io/mybundle:v1.0.0 --param param1=\"value1\" --force --credential-set arm_auth --credential-set aad_auth --force-upgrade" + ] + + command = await build_porter_command(config, msg_body) + assert command == expected_command + + +@pytest.mark.asyncio +async def test_build_porter_command_for_outputs(): + """Test build_porter_command_for_outputs function.""" + msg_body = {"id": "guid", "action": "install", "name": "mybundle", "version": "1.0.0"} + expected_command = ["porter installations output list --installation guid --output json"] + + command = await build_porter_command_for_outputs(msg_body) + assert command == expected_command + + +@pytest.mark.asyncio +@patch("helpers.commands.azure_login_command", return_value="az login command") +@patch("helpers.commands.azure_acr_login_command", return_value="az acr login command") +@patch("asyncio.create_subprocess_shell") +async def test_get_porter_parameter_keys(mock_create_subprocess_shell, mock_azure_acr_login_command, mock_azure_login_command): + """Test get_porter_parameter_keys function.""" + config = {"registry_server": "myregistry.azurecr.io", "porter_env": {}} + msg_body = {"name": "mybundle", "version": "1.0.0"} + mock_proc = AsyncMock() + mock_proc.communicate.return_value = (json.dumps({"parameters": [{"name": "param1"}]}).encode(), b"") + mock_create_subprocess_shell.return_value = mock_proc + + expected_keys = ["param1"] + + keys = await get_porter_parameter_keys(config, msg_body) + assert keys == expected_keys diff --git a/resource_processor/tests_rp/test_runner.py b/resource_processor/tests_rp/test_runner.py new file mode 100644 index 0000000000..7c1dc6d2e4 --- /dev/null +++ b/resource_processor/tests_rp/test_runner.py @@ -0,0 +1,284 @@ +import json +from unittest.mock import patch, AsyncMock, Mock +import pytest +from resource_processor.vmss_porter.runner import ( + set_up_config, receive_message, invoke_porter_action, get_porter_outputs, check_runners, runner +) +from azure.servicebus.aio import ServiceBusClient +from azure.servicebus import ServiceBusSessionFilter + + +@pytest.fixture +def mock_service_bus_client(): + with patch("resource_processor.vmss_porter.runner.ServiceBusClient") as mock: + yield mock + + +@pytest.fixture +def mock_default_credential(): + with patch("resource_processor.vmss_porter.runner.default_credentials") as mock: + yield mock + + +@pytest.fixture +def mock_auto_lock_renewer(): + with patch("resource_processor.vmss_porter.runner.AutoLockRenewer") as mock: + yield mock + + +@pytest.fixture +def mock_logger(): + with patch("resource_processor.vmss_porter.runner.logger") as mock: + yield mock + + +@pytest.mark.asyncio +@patch("resource_processor.vmss_porter.runner.get_config", return_value={"resource_request_queue": "test_queue", "service_bus_namespace": "test_namespace", "vmss_msi_id": "test_msi_id", "porter_env": {}}) +async def test_set_up_config(mock_get_config): + """Test setting up configuration.""" + config = set_up_config() + assert config == {"resource_request_queue": "test_queue", "service_bus_namespace": "test_namespace", "vmss_msi_id": "test_msi_id", "porter_env": {}} + + +async def setup_service_bus_client_and_credential(mock_service_bus_client, mock_default_credential, msi_id): + mock_credential = AsyncMock() + mock_default_credential.return_value.__aenter__.return_value = mock_credential + mock_service_bus_client_instance = mock_service_bus_client.return_value + return mock_service_bus_client_instance, mock_credential + + +@pytest.mark.asyncio +@patch("resource_processor.vmss_porter.runner.receive_message") +async def test_runner(mock_receive_message, mock_service_bus_client, mock_default_credential): + """Test runner with valid MSI ID.""" + mock_service_bus_client_instance, mock_credential = await setup_service_bus_client_and_credential(mock_service_bus_client, mock_default_credential, 'test_msi_id') + + config = {"vmss_msi_id": "test_msi_id", "service_bus_namespace": "test_namespace"} + + await runner(0, config) + + mock_default_credential.assert_called_once_with('test_msi_id') + mock_service_bus_client.assert_called_once_with("test_namespace", mock_credential) + mock_receive_message.assert_called_once_with(mock_service_bus_client_instance, config) + + +@pytest.mark.asyncio +@patch("resource_processor.vmss_porter.runner.receive_message") +async def test_runner_no_msi_id(mock_receive_message, mock_service_bus_client, mock_default_credential): + """Test runner with no MSI ID.""" + mock_service_bus_client_instance, mock_credential = await setup_service_bus_client_and_credential(mock_service_bus_client, mock_default_credential, None) + + config = {"vmss_msi_id": None, "service_bus_namespace": "test_namespace"} + + await runner(0, config) + + mock_default_credential.assert_called_once_with(None) + mock_service_bus_client.assert_called_once_with("test_namespace", mock_credential) + mock_receive_message.assert_called_once_with(mock_service_bus_client_instance, config) + + +@pytest.mark.asyncio +@patch("resource_processor.vmss_porter.runner.receive_message") +async def test_runner_exception(mock_receive_message, mock_service_bus_client, mock_default_credential): + """Test runner with an exception.""" + mock_service_bus_client_instance, mock_credential = await setup_service_bus_client_and_credential(mock_service_bus_client, mock_default_credential, 'test_msi_id') + mock_receive_message.side_effect = Exception("Test Exception") + + config = {"vmss_msi_id": "test_msi_id", "service_bus_namespace": "test_namespace"} + + with pytest.raises(Exception, match="Test Exception"): + await runner(0, config) + + mock_default_credential.assert_called_once_with('test_msi_id') + mock_service_bus_client.assert_called_once_with("test_namespace", mock_credential) + mock_receive_message.assert_called_once_with(mock_service_bus_client_instance, config) + + +@pytest.mark.asyncio +@patch("resource_processor.vmss_porter.runner.invoke_porter_action", return_value=True) +async def test_receive_message(mock_invoke_porter_action, mock_service_bus_client, mock_auto_lock_renewer): + mock_service_bus_client_instance = mock_service_bus_client.return_value + mock_auto_lock_renewer.return_value = AsyncMock() + + mock_receiver = AsyncMock() + mock_receiver.__aenter__.return_value = mock_receiver + mock_receiver.__aexit__.return_value = None + mock_receiver.session.session_id = "test_session_id" + mock_receiver.__aiter__.return_value = [AsyncMock()] + mock_receiver.__aiter__.return_value[0] = json.dumps({"id": "test_id", "action": "install", "stepId": "test_step_id", "operationId": "test_operation_id"}) + + mock_service_bus_client_instance.get_queue_receiver.return_value.__aenter__.return_value = mock_receiver + + run_once = Mock(side_effect=[True, False]) + + config = {"resource_request_queue": "test_queue"} + + await receive_message(mock_service_bus_client_instance, config, keep_running=run_once) + mock_receiver.complete_message.assert_called_once() + mock_service_bus_client_instance.get_queue_receiver.assert_called_once_with(queue_name="test_queue", max_wait_time=1, session_id=ServiceBusSessionFilter.NEXT_AVAILABLE) + + +@pytest.mark.asyncio +async def test_receive_message_unknown_exception(mock_auto_lock_renewer, mock_service_bus_client, mock_logger): + """Test receiving a message with an unknown exception.""" + mock_service_bus_client_instance = mock_service_bus_client.return_value + mock_auto_lock_renewer.return_value = AsyncMock() + + mock_receiver = AsyncMock() + mock_receiver.__aenter__.return_value = mock_receiver + mock_receiver.__aexit__.return_value = None + mock_receiver.session.session_id = "test_session_id" + mock_receiver.__aiter__.return_value = [AsyncMock()] + mock_receiver.__aiter__.return_value[0] = json.dumps({"id": "test_id", "action": "install", "stepId": "test_step_id", "operationId": "test_operation_id"}) + + mock_service_bus_client_instance.get_queue_receiver.return_value.__aenter__.return_value = mock_receiver + + run_once = Mock(side_effect=[True, False]) + + config = {"resource_request_queue": "test_queue"} + + with patch("resource_processor.vmss_porter.runner.receive_message", side_effect=Exception("Test Exception")): + await receive_message(mock_service_bus_client_instance, config, keep_running=run_once) + mock_logger.exception.assert_any_call("Unknown exception. Will retry...") + + +@pytest.mark.asyncio +@patch("resource_processor.vmss_porter.runner.build_porter_command", return_value=["porter install"]) +@patch("resource_processor.vmss_porter.runner.run_porter", return_value=(0, "stdout", "stderr")) +@patch("resource_processor.vmss_porter.runner.service_bus_message_generator", return_value="test_message") +async def test_invoke_porter_action(mock_service_bus_message_generator, mock_run_porter, mock_build_porter_command, mock_service_bus_client): + """Test invoking a porter action.""" + mock_sb_sender = AsyncMock() + mock_service_bus_client.get_queue_sender.return_value = mock_sb_sender + + config = {"deployment_status_queue": "test_queue"} + msg_body = {"id": "test_id", "action": "install", "stepId": "test_step_id", "operationId": "test_operation_id"} + + result = await invoke_porter_action(msg_body, mock_service_bus_client, config) + + assert result is True + mock_sb_sender.send_messages.assert_called() + + +@pytest.mark.asyncio +@patch("resource_processor.vmss_porter.runner.build_porter_command", return_value=["porter install"]) +@patch("resource_processor.vmss_porter.runner.run_porter", return_value=(1, "", "error")) +@patch("resource_processor.vmss_porter.runner.service_bus_message_generator", return_value="test_message") +async def test_invoke_porter_action_failure(mock_service_bus_message_generator, mock_run_porter, mock_build_porter_command, mock_service_bus_client): + """Test invoking a porter action with failure.""" + mock_sb_client = AsyncMock(spec=ServiceBusClient) + mock_sb_sender = AsyncMock() + mock_sb_client.get_queue_sender.return_value = mock_sb_sender + + config = {"deployment_status_queue": "test_queue"} + msg_body = {"id": "test_id", "action": "install", "stepId": "test_step_id", "operationId": "test_operation_id"} + + result = await invoke_porter_action(msg_body, mock_sb_client, config) + + assert result is False + mock_sb_sender.send_messages.assert_called() + + +@pytest.mark.asyncio +@patch("resource_processor.vmss_porter.runner.build_porter_command", return_value=["porter install"]) +@patch("resource_processor.vmss_porter.runner.run_porter", side_effect=[(1, "", "could not find installation"), (0, "", "")]) +@patch("resource_processor.vmss_porter.runner.service_bus_message_generator", return_value="test_message") +async def test_invoke_porter_action_upgrade_failure_install_success(mock_service_bus_message_generator, mock_run_porter, mock_build_porter_command, mock_service_bus_client): + """Test invoking a porter action with upgrade failure and install success.""" + mock_sb_client = AsyncMock(spec=ServiceBusClient) + mock_sb_sender = AsyncMock() + mock_sb_client.get_queue_sender.return_value = mock_sb_sender + + config = {"deployment_status_queue": "test_queue"} + msg_body = {"id": "test_id", "action": "upgrade", "stepId": "test_step_id", "operationId": "test_operation_id"} + + result = await invoke_porter_action(msg_body, mock_sb_client, config) + + assert result is True + mock_sb_sender.send_messages.assert_called() + + +@pytest.mark.asyncio +@patch("resource_processor.vmss_porter.runner.build_porter_command", return_value=["porter install"]) +@patch("resource_processor.vmss_porter.runner.run_porter", side_effect=[(1, "", "could not find installation"), (1, "", "installation failed")]) +@patch("resource_processor.vmss_porter.runner.service_bus_message_generator", return_value="test_message") +async def test_invoke_porter_action_upgrade_failure_install_failure(mock_service_bus_message_generator, mock_run_porter, mock_build_porter_command, mock_service_bus_client): + """Test invoking a porter action with upgrade and install failure.""" + mock_sb_client = AsyncMock(spec=ServiceBusClient) + mock_sb_sender = AsyncMock() + mock_sb_client.get_queue_sender.return_value = mock_sb_sender + + config = {"deployment_status_queue": "test_queue"} + msg_body = {"id": "test_id", "action": "upgrade", "stepId": "test_step_id", "operationId": "test_operation_id"} + + result = await invoke_porter_action(msg_body, mock_sb_client, config) + + assert result is False + mock_sb_sender.send_messages.assert_called() + + +@pytest.mark.asyncio +@patch("resource_processor.vmss_porter.runner.build_porter_command", return_value=["porter install"]) +@patch("resource_processor.vmss_porter.runner.run_porter", return_value=(1, "", "could not find installation")) +@patch("resource_processor.vmss_porter.runner.service_bus_message_generator", return_value="test_message") +async def test_invoke_porter_action_uninstall_failure(mock_service_bus_message_generator, mock_run_porter, mock_build_porter_command, mock_service_bus_client): + """Test invoking a porter action with uninstall failure.""" + mock_sb_client = AsyncMock(spec=ServiceBusClient) + mock_sb_sender = AsyncMock() + mock_sb_client.get_queue_sender.return_value = mock_sb_sender + + config = {"deployment_status_queue": "test_queue"} + msg_body = {"id": "test_id", "action": "uninstall", "stepId": "test_step_id", "operationId": "test_operation_id"} + + result = await invoke_porter_action(msg_body, mock_sb_client, config) + + assert result is True + mock_sb_sender.send_messages.assert_called() + + +@pytest.mark.asyncio +@patch("resource_processor.vmss_porter.runner.build_porter_command", return_value=["porter custom-action"]) +@patch("resource_processor.vmss_porter.runner.run_porter", return_value=(0, "stdout", "stderr")) +@patch("resource_processor.vmss_porter.runner.service_bus_message_generator", return_value="test_message") +async def test_invoke_porter_action_custom_action(mock_service_bus_message_generator, mock_run_porter, mock_build_porter_command, mock_service_bus_client): + """Test invoking a porter custom action.""" + mock_sb_client = AsyncMock(spec=ServiceBusClient) + mock_sb_sender = AsyncMock() + mock_sb_client.get_queue_sender.return_value = mock_sb_sender + + config = {"deployment_status_queue": "test_queue"} + msg_body = {"id": "test_id", "action": "custom-action", "stepId": "test_step_id", "operationId": "test_operation_id"} + + result = await invoke_porter_action(msg_body, mock_sb_client, config) + + assert result is True + mock_sb_sender.send_messages.assert_called() + + +@pytest.mark.asyncio +@patch("resource_processor.vmss_porter.runner.build_porter_command_for_outputs", return_value=["porter installations output list"]) +@patch("resource_processor.vmss_porter.runner.run_porter", return_value=(0, json.dumps([{"name": "output1", "value": "value1"}]), "stderr")) +async def test_get_porter_outputs(mock_run_porter, mock_build_porter_command_for_outputs): + """Test getting porter outputs.""" + config = {} + msg_body = {"id": "test_id", "action": "install"} + + success, outputs = await get_porter_outputs(msg_body, config) + + assert success is True + assert outputs == [{"name": "output1", "value": "value1"}] + + +@pytest.mark.asyncio +@patch("asyncio.sleep", new_callable=AsyncMock) +async def test_check_runners(_): + """Test checking runners.""" + mock_process = Mock() + mock_process.is_alive.return_value = False + processes = [mock_process] + mock_httpserver = AsyncMock() + + run_once = Mock(side_effect=[True, False]) + + await check_runners(processes, mock_httpserver, keep_running=run_once) + mock_httpserver.kill.assert_called_once() diff --git a/resource_processor/vmss_porter/runner.py b/resource_processor/vmss_porter/runner.py index 3de4ac06fe..6f6d6e21e1 100644 --- a/resource_processor/vmss_porter/runner.py +++ b/resource_processor/vmss_porter/runner.py @@ -4,14 +4,13 @@ import asyncio import logging import sys -from resources.commands import azure_acr_login_command, azure_login_command, build_porter_command, build_porter_command_for_outputs, apply_porter_credentials_sets_command +from helpers.commands import azure_acr_login_command, azure_login_command, build_porter_command, build_porter_command_for_outputs, apply_porter_credentials_sets_command from shared.config import get_config -from resources.helpers import get_installation_id -from resources.httpserver import start_server +from helpers.httpserver import start_server from shared.logging import initialize_logging, logger, shell_output_logger, tracer from shared.config import VERSION -from resources import statuses +from helpers import statuses from contextlib import asynccontextmanager from azure.servicebus import ServiceBusMessage, NEXT_AVAILABLE_SESSION from azure.servicebus.exceptions import OperationTimeoutError, ServiceBusConnectionError @@ -38,7 +37,7 @@ async def default_credentials(msi_id): await credential.close() -async def receive_message(service_bus_client, config: dict): +async def receive_message(service_bus_client, config: dict, keep_running=lambda: True): """ This method is run per process. Each process will connect to service bus and try to establish a session. If messages are there, the process will continue to receive all the messages associated with that session. @@ -46,7 +45,7 @@ async def receive_message(service_bus_client, config: dict): """ q_name = config["resource_request_queue"] - while True: + while keep_running(): try: logger.info("Looking for new session...") # max_wait_time=1 -> don't hold the session open after processing of the message has finished @@ -94,6 +93,7 @@ async def receive_message(service_bus_client, config: dict): except Exception: # Catch all other exceptions, log them via .exception to get the stack trace, sleep, and reconnect + logger.exception("Unknown exception. Will retry...") @@ -135,7 +135,7 @@ def service_bus_message_generator(sb_message: dict, status: str, deployment_mess """ Generate a resource request message """ - installation_id = get_installation_id(sb_message) + installation_id = sb_message["id"] message_dict = { "operationId": sb_message["operationId"], "stepId": sb_message["stepId"], @@ -156,7 +156,7 @@ async def invoke_porter_action(msg_body: dict, sb_client: ServiceBusClient, conf Handle resource message by invoking specified porter action (i.e. install, uninstall) """ - installation_id = get_installation_id(msg_body) + installation_id = msg_body["id"] action = msg_body["action"] logger.info(f"{action} action starting for {installation_id}...") sb_sender = sb_client.get_queue_sender(queue_name=config["deployment_status_queue"]) @@ -173,13 +173,25 @@ async def invoke_porter_action(msg_body: dict, sb_client: ServiceBusClient, conf logger.debug("Starting to run porter execution command...") returncode, _, err = await run_porter(porter_command, config) logger.debug("Finished running porter execution command.") - action_completed_without_error = True + + action_completed_without_error = False + + if returncode == 0: + action_completed_without_error = True # Handle command output if returncode != 0 and err is not None: error_message = "Error message: " + " ".join(err.split('\n')) + "; Command executed: " + " ".join(porter_command) action_completed_without_error = False + if "upgrade" == action and ("could not find installation" in err or "The installation cannot be upgraded, because it is not installed." in err): + logger.warning("Upgrade failed, attempting install...") + msg_body['action'] = "install" + porter_command = await build_porter_command(config, msg_body, False) + returncode, _, err = await run_porter(porter_command, config) + if returncode == 0: + action_completed_without_error = True + if "uninstall" == action and "could not find installation" in err: logger.warning("The installation doesn't exist. Treating as a successful action to allow the flow to proceed.") action_completed_without_error = True @@ -227,7 +239,8 @@ async def get_porter_outputs(msg_body: dict, config: dict): if returncode != 0: error_message = "Error context message = " + " ".join(err.split('\n')) - logger.info(f"{get_installation_id(msg_body)}: Failed to get outputs with error = {error_message}") + installation_id = msg_body["id"] + logger.info(f"{installation_id}: Failed to get outputs with error = {error_message}") return False, {} else: outputs_json = {} @@ -253,10 +266,10 @@ async def runner(process_number: int, config: dict): await receive_message(service_bus_client, config) -async def check_runners(processes: list, httpserver: Process): +async def check_runners(processes: list, httpserver: Process, keep_running=lambda: True): logger.info("Starting runners check...") - while True: + while keep_running(): await asyncio.sleep(30) if all(not process.is_alive() for process in processes): logger.error("All runner processes have failed!")