From 3262fa5a21e99d0ed3492965da0e3084bbb52d49 Mon Sep 17 00:00:00 2001 From: Alp Kose Date: Tue, 29 Oct 2024 17:26:17 +0300 Subject: [PATCH] feat: ipsec_tunnel_status check Proxy ID support `ipsec_tunnel_status` check to accept a list of `proxy_ids` to check tunnel status against the provided list, checks for all configured Proxy IDs if not provided. Returns success if any Proxy ID is in active state for the ipsec tunnel or `require_all_active` flag can be set to require all Proxy IDs to be in active state. --- .../api/check_firewall.md | 7 +- panos_upgrade_assurance/check_firewall.py | 44 ++++- tests/test_check_firewall.py | 165 ++++++++++++++++++ 3 files changed, 213 insertions(+), 3 deletions(-) diff --git a/docs/panos-upgrade-assurance/api/check_firewall.md b/docs/panos-upgrade-assurance/api/check_firewall.md index 72dc44c..bd1f6d7 100644 --- a/docs/panos-upgrade-assurance/api/check_firewall.md +++ b/docs/panos-upgrade-assurance/api/check_firewall.md @@ -330,7 +330,9 @@ __Returns__ ```python def check_ipsec_tunnel_status( - tunnel_name: Optional[str] = None) -> CheckResult + tunnel_name: Optional[str] = None, + proxy_ids: Optional[List[str]] = None, + require_all_active: Optional[bool] = False) -> CheckResult ``` Check if a given IPSec tunnel is in active state. @@ -339,6 +341,9 @@ __Parameters__ - __tunnel_name__ (`str, optional`): (defaults to `None`) Name of the searched IPSec tunnel. +- __proxy_ids__ (`list(str), optional`): (defaults to `None`) ProxyID names to check. All ProxyIDs are checked if None provided. +- __require_all_active__ (`bool, optional`): (defaults to `False`) If set, all ProxyIDs should be in `active` state. States are + checked only within `proxy_ids` if provided. __Returns__ diff --git a/panos_upgrade_assurance/check_firewall.py b/panos_upgrade_assurance/check_firewall.py index 43edc1c..6825639 100644 --- a/panos_upgrade_assurance/check_firewall.py +++ b/panos_upgrade_assurance/check_firewall.py @@ -593,12 +593,17 @@ def check_arp_entry(self, ip: Optional[str] = None, interface: Optional[str] = N result.reason = "Entry not found in ARP table." return result - def check_ipsec_tunnel_status(self, tunnel_name: Optional[str] = None) -> CheckResult: + def check_ipsec_tunnel_status( + self, tunnel_name: Optional[str] = None, proxy_ids: Optional[List[str]] = None, require_all_active: Optional[bool] = False + ) -> CheckResult: """Check if a given IPSec tunnel is in active state. # Parameters tunnel_name (str, optional): (defaults to `None`) Name of the searched IPSec tunnel. + proxy_ids (list(str), optional): (defaults to `None`) ProxyID names to check. All ProxyIDs are checked if None provided. + require_all_active (bool, optional): (defaults to `False`) If set, all ProxyIDs should be in `active` state. States are + checked only within `proxy_ids` if provided. # Returns @@ -630,6 +635,8 @@ def check_ipsec_tunnel_status(self, tunnel_name: Optional[str] = None) -> CheckR result.status = CheckStatus.ERROR return result + ipsec_proxyids = [] # IPSec ProxyIDs that exist + for name in tunnels["IPSec"]: data = tunnels["IPSec"][name] if name == tunnel_name: @@ -638,8 +645,41 @@ def check_ipsec_tunnel_status(self, tunnel_name: Optional[str] = None) -> CheckR else: result.reason = f"Tunnel {tunnel_name} in state: {data['state']}." return result + elif name.startswith(f"{tunnel_name}:"): + ipsec_proxyids.append(name.split(":")[-1]) + else: + if not ipsec_proxyids: # ipsec tunnel not found with or without proxyids + result.reason = f"Tunnel {tunnel_name} not found." + return result - result.reason = f"Tunnel {tunnel_name} not found." + proxyids_to_check = [] # IPSec ProxyIDs to check + ipsec_proxyids_active = 0 # number of active ProxyIDs within proxyids_to_check + + if proxy_ids: + if set(proxy_ids).issubset(ipsec_proxyids): + proxyids_to_check = proxy_ids + else: + result.reason = f"Tunnel {tunnel_name} has missing ProxyIDs in {proxy_ids}." + return result + else: + proxyids_to_check = ipsec_proxyids + + for proxy_id in proxyids_to_check: + data = tunnels["IPSec"][f"{tunnel_name}:{proxy_id}"] + if data["state"] == "active": + ipsec_proxyids_active += 1 + elif require_all_active: # state not active but we require all active + result.reason = f"Tunnel:ProxyID {tunnel_name}:{proxy_id} in state: {data['state']}." + return result + + if require_all_active: + if proxyids_to_check and (len(proxyids_to_check) == ipsec_proxyids_active): + result.status = CheckStatus.SUCCESS + else: + if ipsec_proxyids_active >= 1: + result.status = CheckStatus.SUCCESS + elif ipsec_proxyids_active == 0: + result.reason = f"No active state for tunnel {tunnel_name} in ProxyIDs {proxyids_to_check}." return result diff --git a/tests/test_check_firewall.py b/tests/test_check_firewall.py index 39e22d2..fc7f8f4 100644 --- a/tests/test_check_firewall.py +++ b/tests/test_check_firewall.py @@ -513,6 +513,171 @@ def test_ipsec_tunnel_status_not_found(self, check_firewall_mock): reason="Tunnel NotMyTunnel not found." ) + def test_ipsec_tunnel_status_proxyids_not_found(self, check_firewall_mock): + """tunnel with proxyids - proxyids given but not found""" + check_firewall_mock._node.get_tunnels.return_value = { + "IPSec": { + "east1-vpn:ProxyID1": {"state": "active"}, + "east1-vpn:ProxyID2": {"state": "active"}, + "central1-vpn:ProxyID1": {"state": "init"}, + } + } + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", proxy_ids=["ProxyID1", "ProxyID3"] + ) == CheckResult(reason="Tunnel east1-vpn has missing ProxyIDs in ['ProxyID1', 'ProxyID3'].") + + @pytest.mark.parametrize( + "require_all_active, expected_status", + [ + (True, CheckStatus.SUCCESS), + (False, CheckStatus.SUCCESS), + ], + ) + def test_ipsec_tunnel_status_proxyids_all_active(self, require_all_active, expected_status, check_firewall_mock): + """tunnel with proxyids - proxyids given and all active + Should return success whether require_all_active is True or False. + """ + check_firewall_mock._node.get_tunnels.return_value = { + "IPSec": { + "east1-vpn:ProxyID1": {"state": "active"}, + "east1-vpn:ProxyID2": {"state": "active"}, + "east1-vpn:ProxyID3": {"state": "active"}, + "central1-vpn:ProxyID1": {"state": "init"}, + } + } + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", proxy_ids=["ProxyID1", "ProxyID2", "ProxyID3"], require_all_active=require_all_active + ) == CheckResult(expected_status) + + @pytest.mark.parametrize( + "require_all_active, expected_status, reason", + [ + (True, CheckStatus.FAIL, "Tunnel:ProxyID east1-vpn:ProxyID3 in state: init."), + (False, CheckStatus.SUCCESS, ""), + ], + ) + def test_ipsec_tunnel_status_proxyids_some_active(self, require_all_active, expected_status, reason, check_firewall_mock): + """tunnel with proxyids - proxyids given and some active + Should return fail by default. Success if require_all_active is False. + """ + check_firewall_mock._node.get_tunnels.return_value = { + "IPSec": { + "east1-vpn:ProxyID1": {"state": "active"}, + "east1-vpn:ProxyID2": {"state": "active"}, + "east1-vpn:ProxyID3": {"state": "init"}, + "central1-vpn:ProxyID1": {"state": "init"}, + } + } + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", proxy_ids=["ProxyID1", "ProxyID2", "ProxyID3"], require_all_active=require_all_active + ) == CheckResult(expected_status, reason=reason) + + @pytest.mark.parametrize( + "require_all_active, expected_status, reason", + [ + (True, CheckStatus.FAIL, "Tunnel:ProxyID east1-vpn:ProxyID1 in state: init."), + (False, CheckStatus.FAIL, "No active state for tunnel east1-vpn in ProxyIDs ['ProxyID1', 'ProxyID2', 'ProxyID3']."), + ], + ) + def test_ipsec_tunnel_status_proxyids_none_active(self, require_all_active, expected_status, reason, check_firewall_mock): + """tunnel with proxyids - proxyids given and all not active + Should return fail whether require_all_active is True or False. + """ + check_firewall_mock._node.get_tunnels.return_value = { + "IPSec": { + "east1-vpn:ProxyID1": {"state": "init"}, + "east1-vpn:ProxyID2": {"state": "init"}, + "east1-vpn:ProxyID3": {"state": "init"}, + "central1-vpn:ProxyID1": {"state": "active"}, + } + } + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", proxy_ids=["ProxyID1", "ProxyID2", "ProxyID3"], require_all_active=require_all_active + ) == CheckResult(expected_status, reason=reason) + + @pytest.mark.parametrize( + "require_all_active, expected_status", + [ + (True, CheckStatus.SUCCESS), + (False, CheckStatus.SUCCESS), + ], + ) + def test_ipsec_tunnel_status_none_proxyids_all_active(self, require_all_active, expected_status, check_firewall_mock): + """tunnel with proxyids - proxyids not given and all active""" + check_firewall_mock._node.get_tunnels.return_value = { + "IPSec": { + "east1-vpn:ProxyID1": {"state": "active"}, + "east1-vpn:ProxyID2": {"state": "active"}, + "east1-vpn:ProxyID3": {"state": "active"}, + "central1-vpn:ProxyID1": {"state": "init"}, + } + } + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", require_all_active=require_all_active + ) == CheckResult(expected_status) + + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", proxy_ids=[], require_all_active=require_all_active + ) == CheckResult(expected_status) + + @pytest.mark.parametrize( + "require_all_active, expected_status, reason", + [ + (True, CheckStatus.FAIL, "Tunnel:ProxyID east1-vpn:ProxyID2 in state: init."), + (False, CheckStatus.SUCCESS, ""), + ], + ) + def test_ipsec_tunnel_status_none_proxyids_some_active( + self, require_all_active, expected_status, reason, check_firewall_mock + ): + """tunnel with proxyids - proxyids not given and some active + Should return fail by default. Success if require_all_active is False. + """ + check_firewall_mock._node.get_tunnels.return_value = { + "IPSec": { + "east1-vpn:ProxyID1": {"state": "active"}, + "east1-vpn:ProxyID2": {"state": "init"}, + "east1-vpn:ProxyID3": {"state": "active"}, + "central1-vpn:ProxyID1": {"state": "init"}, + } + } + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", require_all_active=require_all_active + ) == CheckResult(expected_status, reason=reason) + + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", proxy_ids=[], require_all_active=require_all_active + ) == CheckResult(expected_status, reason=reason) + + @pytest.mark.parametrize( + "require_all_active, expected_status, reason", + [ + (True, CheckStatus.FAIL, "Tunnel:ProxyID east1-vpn:ProxyID1 in state: init."), + (False, CheckStatus.FAIL, "No active state for tunnel east1-vpn in ProxyIDs ['ProxyID1', 'ProxyID2', 'ProxyID3']."), + ], + ) + def test_ipsec_tunnel_status_none_proxyids_none_active( + self, require_all_active, expected_status, reason, check_firewall_mock + ): + """tunnel with proxyids - proxyids not given and not active + Should return fail whether require_all_active is True or False. + """ + check_firewall_mock._node.get_tunnels.return_value = { + "IPSec": { + "east1-vpn:ProxyID1": {"state": "init"}, + "east1-vpn:ProxyID2": {"state": "init"}, + "east1-vpn:ProxyID3": {"state": "init"}, + "central1-vpn:ProxyID1": {"state": "active"}, + } + } + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", require_all_active=require_all_active + ) == CheckResult(expected_status, reason=reason) + + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", proxy_ids=[], require_all_active=require_all_active + ) == CheckResult(expected_status, reason=reason) + def test_check_free_disk_space_ok(self, check_firewall_mock): check_firewall_mock._node.get_disk_utilization.return_value = {"/opt/panrepo": 50000}