diff --git a/docs/panos-upgrade-assurance/api/check_firewall.md b/docs/panos-upgrade-assurance/api/check_firewall.md index 72dc44c..bd1f6d7 100644 --- a/docs/panos-upgrade-assurance/api/check_firewall.md +++ b/docs/panos-upgrade-assurance/api/check_firewall.md @@ -330,7 +330,9 @@ __Returns__ ```python def check_ipsec_tunnel_status( - tunnel_name: Optional[str] = None) -> CheckResult + tunnel_name: Optional[str] = None, + proxy_ids: Optional[List[str]] = None, + require_all_active: Optional[bool] = False) -> CheckResult ``` Check if a given IPSec tunnel is in active state. @@ -339,6 +341,9 @@ __Parameters__ - __tunnel_name__ (`str, optional`): (defaults to `None`) Name of the searched IPSec tunnel. +- __proxy_ids__ (`list(str), optional`): (defaults to `None`) ProxyID names to check. All ProxyIDs are checked if None provided. +- __require_all_active__ (`bool, optional`): (defaults to `False`) If set, all ProxyIDs should be in `active` state. States are + checked only within `proxy_ids` if provided. __Returns__ diff --git a/panos_upgrade_assurance/check_firewall.py b/panos_upgrade_assurance/check_firewall.py index 43edc1c..6825639 100644 --- a/panos_upgrade_assurance/check_firewall.py +++ b/panos_upgrade_assurance/check_firewall.py @@ -593,12 +593,17 @@ def check_arp_entry(self, ip: Optional[str] = None, interface: Optional[str] = N result.reason = "Entry not found in ARP table." return result - def check_ipsec_tunnel_status(self, tunnel_name: Optional[str] = None) -> CheckResult: + def check_ipsec_tunnel_status( + self, tunnel_name: Optional[str] = None, proxy_ids: Optional[List[str]] = None, require_all_active: Optional[bool] = False + ) -> CheckResult: """Check if a given IPSec tunnel is in active state. # Parameters tunnel_name (str, optional): (defaults to `None`) Name of the searched IPSec tunnel. + proxy_ids (list(str), optional): (defaults to `None`) ProxyID names to check. All ProxyIDs are checked if None provided. + require_all_active (bool, optional): (defaults to `False`) If set, all ProxyIDs should be in `active` state. States are + checked only within `proxy_ids` if provided. # Returns @@ -630,6 +635,8 @@ def check_ipsec_tunnel_status(self, tunnel_name: Optional[str] = None) -> CheckR result.status = CheckStatus.ERROR return result + ipsec_proxyids = [] # IPSec ProxyIDs that exist + for name in tunnels["IPSec"]: data = tunnels["IPSec"][name] if name == tunnel_name: @@ -638,8 +645,41 @@ def check_ipsec_tunnel_status(self, tunnel_name: Optional[str] = None) -> CheckR else: result.reason = f"Tunnel {tunnel_name} in state: {data['state']}." return result + elif name.startswith(f"{tunnel_name}:"): + ipsec_proxyids.append(name.split(":")[-1]) + else: + if not ipsec_proxyids: # ipsec tunnel not found with or without proxyids + result.reason = f"Tunnel {tunnel_name} not found." + return result - result.reason = f"Tunnel {tunnel_name} not found." + proxyids_to_check = [] # IPSec ProxyIDs to check + ipsec_proxyids_active = 0 # number of active ProxyIDs within proxyids_to_check + + if proxy_ids: + if set(proxy_ids).issubset(ipsec_proxyids): + proxyids_to_check = proxy_ids + else: + result.reason = f"Tunnel {tunnel_name} has missing ProxyIDs in {proxy_ids}." + return result + else: + proxyids_to_check = ipsec_proxyids + + for proxy_id in proxyids_to_check: + data = tunnels["IPSec"][f"{tunnel_name}:{proxy_id}"] + if data["state"] == "active": + ipsec_proxyids_active += 1 + elif require_all_active: # state not active but we require all active + result.reason = f"Tunnel:ProxyID {tunnel_name}:{proxy_id} in state: {data['state']}." + return result + + if require_all_active: + if proxyids_to_check and (len(proxyids_to_check) == ipsec_proxyids_active): + result.status = CheckStatus.SUCCESS + else: + if ipsec_proxyids_active >= 1: + result.status = CheckStatus.SUCCESS + elif ipsec_proxyids_active == 0: + result.reason = f"No active state for tunnel {tunnel_name} in ProxyIDs {proxyids_to_check}." return result diff --git a/tests/test_check_firewall.py b/tests/test_check_firewall.py index 39e22d2..fc7f8f4 100644 --- a/tests/test_check_firewall.py +++ b/tests/test_check_firewall.py @@ -513,6 +513,171 @@ def test_ipsec_tunnel_status_not_found(self, check_firewall_mock): reason="Tunnel NotMyTunnel not found." ) + def test_ipsec_tunnel_status_proxyids_not_found(self, check_firewall_mock): + """tunnel with proxyids - proxyids given but not found""" + check_firewall_mock._node.get_tunnels.return_value = { + "IPSec": { + "east1-vpn:ProxyID1": {"state": "active"}, + "east1-vpn:ProxyID2": {"state": "active"}, + "central1-vpn:ProxyID1": {"state": "init"}, + } + } + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", proxy_ids=["ProxyID1", "ProxyID3"] + ) == CheckResult(reason="Tunnel east1-vpn has missing ProxyIDs in ['ProxyID1', 'ProxyID3'].") + + @pytest.mark.parametrize( + "require_all_active, expected_status", + [ + (True, CheckStatus.SUCCESS), + (False, CheckStatus.SUCCESS), + ], + ) + def test_ipsec_tunnel_status_proxyids_all_active(self, require_all_active, expected_status, check_firewall_mock): + """tunnel with proxyids - proxyids given and all active + Should return success whether require_all_active is True or False. + """ + check_firewall_mock._node.get_tunnels.return_value = { + "IPSec": { + "east1-vpn:ProxyID1": {"state": "active"}, + "east1-vpn:ProxyID2": {"state": "active"}, + "east1-vpn:ProxyID3": {"state": "active"}, + "central1-vpn:ProxyID1": {"state": "init"}, + } + } + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", proxy_ids=["ProxyID1", "ProxyID2", "ProxyID3"], require_all_active=require_all_active + ) == CheckResult(expected_status) + + @pytest.mark.parametrize( + "require_all_active, expected_status, reason", + [ + (True, CheckStatus.FAIL, "Tunnel:ProxyID east1-vpn:ProxyID3 in state: init."), + (False, CheckStatus.SUCCESS, ""), + ], + ) + def test_ipsec_tunnel_status_proxyids_some_active(self, require_all_active, expected_status, reason, check_firewall_mock): + """tunnel with proxyids - proxyids given and some active + Should return fail by default. Success if require_all_active is False. + """ + check_firewall_mock._node.get_tunnels.return_value = { + "IPSec": { + "east1-vpn:ProxyID1": {"state": "active"}, + "east1-vpn:ProxyID2": {"state": "active"}, + "east1-vpn:ProxyID3": {"state": "init"}, + "central1-vpn:ProxyID1": {"state": "init"}, + } + } + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", proxy_ids=["ProxyID1", "ProxyID2", "ProxyID3"], require_all_active=require_all_active + ) == CheckResult(expected_status, reason=reason) + + @pytest.mark.parametrize( + "require_all_active, expected_status, reason", + [ + (True, CheckStatus.FAIL, "Tunnel:ProxyID east1-vpn:ProxyID1 in state: init."), + (False, CheckStatus.FAIL, "No active state for tunnel east1-vpn in ProxyIDs ['ProxyID1', 'ProxyID2', 'ProxyID3']."), + ], + ) + def test_ipsec_tunnel_status_proxyids_none_active(self, require_all_active, expected_status, reason, check_firewall_mock): + """tunnel with proxyids - proxyids given and all not active + Should return fail whether require_all_active is True or False. + """ + check_firewall_mock._node.get_tunnels.return_value = { + "IPSec": { + "east1-vpn:ProxyID1": {"state": "init"}, + "east1-vpn:ProxyID2": {"state": "init"}, + "east1-vpn:ProxyID3": {"state": "init"}, + "central1-vpn:ProxyID1": {"state": "active"}, + } + } + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", proxy_ids=["ProxyID1", "ProxyID2", "ProxyID3"], require_all_active=require_all_active + ) == CheckResult(expected_status, reason=reason) + + @pytest.mark.parametrize( + "require_all_active, expected_status", + [ + (True, CheckStatus.SUCCESS), + (False, CheckStatus.SUCCESS), + ], + ) + def test_ipsec_tunnel_status_none_proxyids_all_active(self, require_all_active, expected_status, check_firewall_mock): + """tunnel with proxyids - proxyids not given and all active""" + check_firewall_mock._node.get_tunnels.return_value = { + "IPSec": { + "east1-vpn:ProxyID1": {"state": "active"}, + "east1-vpn:ProxyID2": {"state": "active"}, + "east1-vpn:ProxyID3": {"state": "active"}, + "central1-vpn:ProxyID1": {"state": "init"}, + } + } + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", require_all_active=require_all_active + ) == CheckResult(expected_status) + + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", proxy_ids=[], require_all_active=require_all_active + ) == CheckResult(expected_status) + + @pytest.mark.parametrize( + "require_all_active, expected_status, reason", + [ + (True, CheckStatus.FAIL, "Tunnel:ProxyID east1-vpn:ProxyID2 in state: init."), + (False, CheckStatus.SUCCESS, ""), + ], + ) + def test_ipsec_tunnel_status_none_proxyids_some_active( + self, require_all_active, expected_status, reason, check_firewall_mock + ): + """tunnel with proxyids - proxyids not given and some active + Should return fail by default. Success if require_all_active is False. + """ + check_firewall_mock._node.get_tunnels.return_value = { + "IPSec": { + "east1-vpn:ProxyID1": {"state": "active"}, + "east1-vpn:ProxyID2": {"state": "init"}, + "east1-vpn:ProxyID3": {"state": "active"}, + "central1-vpn:ProxyID1": {"state": "init"}, + } + } + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", require_all_active=require_all_active + ) == CheckResult(expected_status, reason=reason) + + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", proxy_ids=[], require_all_active=require_all_active + ) == CheckResult(expected_status, reason=reason) + + @pytest.mark.parametrize( + "require_all_active, expected_status, reason", + [ + (True, CheckStatus.FAIL, "Tunnel:ProxyID east1-vpn:ProxyID1 in state: init."), + (False, CheckStatus.FAIL, "No active state for tunnel east1-vpn in ProxyIDs ['ProxyID1', 'ProxyID2', 'ProxyID3']."), + ], + ) + def test_ipsec_tunnel_status_none_proxyids_none_active( + self, require_all_active, expected_status, reason, check_firewall_mock + ): + """tunnel with proxyids - proxyids not given and not active + Should return fail whether require_all_active is True or False. + """ + check_firewall_mock._node.get_tunnels.return_value = { + "IPSec": { + "east1-vpn:ProxyID1": {"state": "init"}, + "east1-vpn:ProxyID2": {"state": "init"}, + "east1-vpn:ProxyID3": {"state": "init"}, + "central1-vpn:ProxyID1": {"state": "active"}, + } + } + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", require_all_active=require_all_active + ) == CheckResult(expected_status, reason=reason) + + assert check_firewall_mock.check_ipsec_tunnel_status( + tunnel_name="east1-vpn", proxy_ids=[], require_all_active=require_all_active + ) == CheckResult(expected_status, reason=reason) + def test_check_free_disk_space_ok(self, check_firewall_mock): check_firewall_mock._node.get_disk_utilization.return_value = {"/opt/panrepo": 50000}