Skip to content

Commit

Permalink
feat: ipsec_tunnel_status check Proxy ID support
Browse files Browse the repository at this point in the history
`ipsec_tunnel_status` check to accept a list of `proxy_ids` to check tunnel
status against the provided list, checks for all configured Proxy IDs if not
provided.
Returns success if any Proxy ID is in active state for the ipsec tunnel or
`require_all_active` flag can be set to require all Proxy IDs to be in active state.
  • Loading branch information
alperenkose committed Oct 29, 2024
1 parent 33ee9f1 commit 3262fa5
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 3 deletions.
7 changes: 6 additions & 1 deletion docs/panos-upgrade-assurance/api/check_firewall.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@ __Returns__

```python
def check_ipsec_tunnel_status(
tunnel_name: Optional[str] = None) -> CheckResult
tunnel_name: Optional[str] = None,
proxy_ids: Optional[List[str]] = None,
require_all_active: Optional[bool] = False) -> CheckResult
```

Check if a given IPSec tunnel is in active state.
Expand All @@ -339,6 +341,9 @@ __Parameters__


- __tunnel_name__ (`str, optional`): (defaults to `None`) Name of the searched IPSec tunnel.
- __proxy_ids__ (`list(str), optional`): (defaults to `None`) ProxyID names to check. All ProxyIDs are checked if None provided.
- __require_all_active__ (`bool, optional`): (defaults to `False`) If set, all ProxyIDs should be in `active` state. States are
checked only within `proxy_ids` if provided.

__Returns__

Expand Down
44 changes: 42 additions & 2 deletions panos_upgrade_assurance/check_firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,12 +593,17 @@ def check_arp_entry(self, ip: Optional[str] = None, interface: Optional[str] = N
result.reason = "Entry not found in ARP table."
return result

def check_ipsec_tunnel_status(self, tunnel_name: Optional[str] = None) -> CheckResult:
def check_ipsec_tunnel_status(
self, tunnel_name: Optional[str] = None, proxy_ids: Optional[List[str]] = None, require_all_active: Optional[bool] = False
) -> CheckResult:
"""Check if a given IPSec tunnel is in active state.
# Parameters
tunnel_name (str, optional): (defaults to `None`) Name of the searched IPSec tunnel.
proxy_ids (list(str), optional): (defaults to `None`) ProxyID names to check. All ProxyIDs are checked if None provided.
require_all_active (bool, optional): (defaults to `False`) If set, all ProxyIDs should be in `active` state. States are
checked only within `proxy_ids` if provided.
# Returns
Expand Down Expand Up @@ -630,6 +635,8 @@ def check_ipsec_tunnel_status(self, tunnel_name: Optional[str] = None) -> CheckR
result.status = CheckStatus.ERROR
return result

ipsec_proxyids = [] # IPSec ProxyIDs that exist

for name in tunnels["IPSec"]:
data = tunnels["IPSec"][name]
if name == tunnel_name:
Expand All @@ -638,8 +645,41 @@ def check_ipsec_tunnel_status(self, tunnel_name: Optional[str] = None) -> CheckR
else:
result.reason = f"Tunnel {tunnel_name} in state: {data['state']}."
return result
elif name.startswith(f"{tunnel_name}:"):
ipsec_proxyids.append(name.split(":")[-1])
else:
if not ipsec_proxyids: # ipsec tunnel not found with or without proxyids
result.reason = f"Tunnel {tunnel_name} not found."
return result

result.reason = f"Tunnel {tunnel_name} not found."
proxyids_to_check = [] # IPSec ProxyIDs to check
ipsec_proxyids_active = 0 # number of active ProxyIDs within proxyids_to_check

if proxy_ids:
if set(proxy_ids).issubset(ipsec_proxyids):
proxyids_to_check = proxy_ids
else:
result.reason = f"Tunnel {tunnel_name} has missing ProxyIDs in {proxy_ids}."
return result
else:
proxyids_to_check = ipsec_proxyids

for proxy_id in proxyids_to_check:
data = tunnels["IPSec"][f"{tunnel_name}:{proxy_id}"]
if data["state"] == "active":
ipsec_proxyids_active += 1
elif require_all_active: # state not active but we require all active
result.reason = f"Tunnel:ProxyID {tunnel_name}:{proxy_id} in state: {data['state']}."
return result

if require_all_active:
if proxyids_to_check and (len(proxyids_to_check) == ipsec_proxyids_active):
result.status = CheckStatus.SUCCESS
else:
if ipsec_proxyids_active >= 1:
result.status = CheckStatus.SUCCESS
elif ipsec_proxyids_active == 0:
result.reason = f"No active state for tunnel {tunnel_name} in ProxyIDs {proxyids_to_check}."

return result

Expand Down
165 changes: 165 additions & 0 deletions tests/test_check_firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,171 @@ def test_ipsec_tunnel_status_not_found(self, check_firewall_mock):
reason="Tunnel NotMyTunnel not found."
)

def test_ipsec_tunnel_status_proxyids_not_found(self, check_firewall_mock):
"""tunnel with proxyids - proxyids given but not found"""
check_firewall_mock._node.get_tunnels.return_value = {
"IPSec": {
"east1-vpn:ProxyID1": {"state": "active"},
"east1-vpn:ProxyID2": {"state": "active"},
"central1-vpn:ProxyID1": {"state": "init"},
}
}
assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", proxy_ids=["ProxyID1", "ProxyID3"]
) == CheckResult(reason="Tunnel east1-vpn has missing ProxyIDs in ['ProxyID1', 'ProxyID3'].")

@pytest.mark.parametrize(
"require_all_active, expected_status",
[
(True, CheckStatus.SUCCESS),
(False, CheckStatus.SUCCESS),
],
)
def test_ipsec_tunnel_status_proxyids_all_active(self, require_all_active, expected_status, check_firewall_mock):
"""tunnel with proxyids - proxyids given and all active
Should return success whether require_all_active is True or False.
"""
check_firewall_mock._node.get_tunnels.return_value = {
"IPSec": {
"east1-vpn:ProxyID1": {"state": "active"},
"east1-vpn:ProxyID2": {"state": "active"},
"east1-vpn:ProxyID3": {"state": "active"},
"central1-vpn:ProxyID1": {"state": "init"},
}
}
assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", proxy_ids=["ProxyID1", "ProxyID2", "ProxyID3"], require_all_active=require_all_active
) == CheckResult(expected_status)

@pytest.mark.parametrize(
"require_all_active, expected_status, reason",
[
(True, CheckStatus.FAIL, "Tunnel:ProxyID east1-vpn:ProxyID3 in state: init."),
(False, CheckStatus.SUCCESS, ""),
],
)
def test_ipsec_tunnel_status_proxyids_some_active(self, require_all_active, expected_status, reason, check_firewall_mock):
"""tunnel with proxyids - proxyids given and some active
Should return fail by default. Success if require_all_active is False.
"""
check_firewall_mock._node.get_tunnels.return_value = {
"IPSec": {
"east1-vpn:ProxyID1": {"state": "active"},
"east1-vpn:ProxyID2": {"state": "active"},
"east1-vpn:ProxyID3": {"state": "init"},
"central1-vpn:ProxyID1": {"state": "init"},
}
}
assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", proxy_ids=["ProxyID1", "ProxyID2", "ProxyID3"], require_all_active=require_all_active
) == CheckResult(expected_status, reason=reason)

@pytest.mark.parametrize(
"require_all_active, expected_status, reason",
[
(True, CheckStatus.FAIL, "Tunnel:ProxyID east1-vpn:ProxyID1 in state: init."),
(False, CheckStatus.FAIL, "No active state for tunnel east1-vpn in ProxyIDs ['ProxyID1', 'ProxyID2', 'ProxyID3']."),
],
)
def test_ipsec_tunnel_status_proxyids_none_active(self, require_all_active, expected_status, reason, check_firewall_mock):
"""tunnel with proxyids - proxyids given and all not active
Should return fail whether require_all_active is True or False.
"""
check_firewall_mock._node.get_tunnels.return_value = {
"IPSec": {
"east1-vpn:ProxyID1": {"state": "init"},
"east1-vpn:ProxyID2": {"state": "init"},
"east1-vpn:ProxyID3": {"state": "init"},
"central1-vpn:ProxyID1": {"state": "active"},
}
}
assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", proxy_ids=["ProxyID1", "ProxyID2", "ProxyID3"], require_all_active=require_all_active
) == CheckResult(expected_status, reason=reason)

@pytest.mark.parametrize(
"require_all_active, expected_status",
[
(True, CheckStatus.SUCCESS),
(False, CheckStatus.SUCCESS),
],
)
def test_ipsec_tunnel_status_none_proxyids_all_active(self, require_all_active, expected_status, check_firewall_mock):
"""tunnel with proxyids - proxyids not given and all active"""
check_firewall_mock._node.get_tunnels.return_value = {
"IPSec": {
"east1-vpn:ProxyID1": {"state": "active"},
"east1-vpn:ProxyID2": {"state": "active"},
"east1-vpn:ProxyID3": {"state": "active"},
"central1-vpn:ProxyID1": {"state": "init"},
}
}
assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", require_all_active=require_all_active
) == CheckResult(expected_status)

assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", proxy_ids=[], require_all_active=require_all_active
) == CheckResult(expected_status)

@pytest.mark.parametrize(
"require_all_active, expected_status, reason",
[
(True, CheckStatus.FAIL, "Tunnel:ProxyID east1-vpn:ProxyID2 in state: init."),
(False, CheckStatus.SUCCESS, ""),
],
)
def test_ipsec_tunnel_status_none_proxyids_some_active(
self, require_all_active, expected_status, reason, check_firewall_mock
):
"""tunnel with proxyids - proxyids not given and some active
Should return fail by default. Success if require_all_active is False.
"""
check_firewall_mock._node.get_tunnels.return_value = {
"IPSec": {
"east1-vpn:ProxyID1": {"state": "active"},
"east1-vpn:ProxyID2": {"state": "init"},
"east1-vpn:ProxyID3": {"state": "active"},
"central1-vpn:ProxyID1": {"state": "init"},
}
}
assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", require_all_active=require_all_active
) == CheckResult(expected_status, reason=reason)

assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", proxy_ids=[], require_all_active=require_all_active
) == CheckResult(expected_status, reason=reason)

@pytest.mark.parametrize(
"require_all_active, expected_status, reason",
[
(True, CheckStatus.FAIL, "Tunnel:ProxyID east1-vpn:ProxyID1 in state: init."),
(False, CheckStatus.FAIL, "No active state for tunnel east1-vpn in ProxyIDs ['ProxyID1', 'ProxyID2', 'ProxyID3']."),
],
)
def test_ipsec_tunnel_status_none_proxyids_none_active(
self, require_all_active, expected_status, reason, check_firewall_mock
):
"""tunnel with proxyids - proxyids not given and not active
Should return fail whether require_all_active is True or False.
"""
check_firewall_mock._node.get_tunnels.return_value = {
"IPSec": {
"east1-vpn:ProxyID1": {"state": "init"},
"east1-vpn:ProxyID2": {"state": "init"},
"east1-vpn:ProxyID3": {"state": "init"},
"central1-vpn:ProxyID1": {"state": "active"},
}
}
assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", require_all_active=require_all_active
) == CheckResult(expected_status, reason=reason)

assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", proxy_ids=[], require_all_active=require_all_active
) == CheckResult(expected_status, reason=reason)

def test_check_free_disk_space_ok(self, check_firewall_mock):
check_firewall_mock._node.get_disk_utilization.return_value = {"/opt/panrepo": 50000}

Expand Down

0 comments on commit 3262fa5

Please sign in to comment.