Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ipsec_tunnel_status check Proxy ID support #174

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/panos-upgrade-assurance/api/check_firewall.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@ __Returns__

```python
def check_ipsec_tunnel_status(
tunnel_name: Optional[str] = None) -> CheckResult
tunnel_name: Optional[str] = None,
proxy_ids: Optional[List[str]] = None,
require_all_active: Optional[bool] = False) -> CheckResult
```

Check if a given IPSec tunnel is in active state.
Expand All @@ -339,6 +341,9 @@ __Parameters__


- __tunnel_name__ (`str, optional`): (defaults to `None`) Name of the searched IPSec tunnel.
- __proxy_ids__ (`list(str), optional`): (defaults to `None`) ProxyID names to check. All ProxyIDs are checked if None provided.
- __require_all_active__ (`bool, optional`): (defaults to `False`) If set, all ProxyIDs should be in `active` state. States are
checked only within `proxy_ids` if provided.

__Returns__

Expand Down
44 changes: 42 additions & 2 deletions panos_upgrade_assurance/check_firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,12 +593,17 @@ def check_arp_entry(self, ip: Optional[str] = None, interface: Optional[str] = N
result.reason = "Entry not found in ARP table."
return result

def check_ipsec_tunnel_status(self, tunnel_name: Optional[str] = None) -> CheckResult:
def check_ipsec_tunnel_status(
self, tunnel_name: Optional[str] = None, proxy_ids: Optional[List[str]] = None, require_all_active: Optional[bool] = False
) -> CheckResult:
"""Check if a given IPSec tunnel is in active state.

# Parameters

tunnel_name (str, optional): (defaults to `None`) Name of the searched IPSec tunnel.
proxy_ids (list(str), optional): (defaults to `None`) ProxyID names to check. All ProxyIDs are checked if None provided.
require_all_active (bool, optional): (defaults to `False`) If set, all ProxyIDs should be in `active` state. States are
checked only within `proxy_ids` if provided.

# Returns

Expand Down Expand Up @@ -630,6 +635,8 @@ def check_ipsec_tunnel_status(self, tunnel_name: Optional[str] = None) -> CheckR
result.status = CheckStatus.ERROR
return result

ipsec_proxyids = [] # IPSec ProxyIDs that exist

for name in tunnels["IPSec"]:
data = tunnels["IPSec"][name]
if name == tunnel_name:
Expand All @@ -638,8 +645,41 @@ def check_ipsec_tunnel_status(self, tunnel_name: Optional[str] = None) -> CheckR
else:
result.reason = f"Tunnel {tunnel_name} in state: {data['state']}."
return result
elif name.startswith(f"{tunnel_name}:"):
ipsec_proxyids.append(name.split(":")[-1])
else:
if not ipsec_proxyids: # ipsec tunnel not found with or without proxyids
result.reason = f"Tunnel {tunnel_name} not found."
return result

result.reason = f"Tunnel {tunnel_name} not found."
proxyids_to_check = [] # IPSec ProxyIDs to check
ipsec_proxyids_active = 0 # number of active ProxyIDs within proxyids_to_check

if proxy_ids:
if set(proxy_ids).issubset(ipsec_proxyids):
proxyids_to_check = proxy_ids
else:
result.reason = f"Tunnel {tunnel_name} has missing ProxyIDs in {proxy_ids}."
return result
else:
proxyids_to_check = ipsec_proxyids

for proxy_id in proxyids_to_check:
data = tunnels["IPSec"][f"{tunnel_name}:{proxy_id}"]
if data["state"] == "active":
ipsec_proxyids_active += 1
elif require_all_active: # state not active but we require all active
result.reason = f"Tunnel:ProxyID {tunnel_name}:{proxy_id} in state: {data['state']}."
return result

if require_all_active:
if proxyids_to_check and (len(proxyids_to_check) == ipsec_proxyids_active):
result.status = CheckStatus.SUCCESS
else:
if ipsec_proxyids_active >= 1:
result.status = CheckStatus.SUCCESS
elif ipsec_proxyids_active == 0:
result.reason = f"No active state for tunnel {tunnel_name} in ProxyIDs {proxyids_to_check}."

return result

Expand Down
165 changes: 165 additions & 0 deletions tests/test_check_firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,171 @@ def test_ipsec_tunnel_status_not_found(self, check_firewall_mock):
reason="Tunnel NotMyTunnel not found."
)

def test_ipsec_tunnel_status_proxyids_not_found(self, check_firewall_mock):
"""tunnel with proxyids - proxyids given but not found"""
check_firewall_mock._node.get_tunnels.return_value = {
"IPSec": {
"east1-vpn:ProxyID1": {"state": "active"},
"east1-vpn:ProxyID2": {"state": "active"},
"central1-vpn:ProxyID1": {"state": "init"},
}
}
assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", proxy_ids=["ProxyID1", "ProxyID3"]
) == CheckResult(reason="Tunnel east1-vpn has missing ProxyIDs in ['ProxyID1', 'ProxyID3'].")

@pytest.mark.parametrize(
"require_all_active, expected_status",
[
(True, CheckStatus.SUCCESS),
(False, CheckStatus.SUCCESS),
],
)
def test_ipsec_tunnel_status_proxyids_all_active(self, require_all_active, expected_status, check_firewall_mock):
"""tunnel with proxyids - proxyids given and all active
Should return success whether require_all_active is True or False.
"""
check_firewall_mock._node.get_tunnels.return_value = {
"IPSec": {
"east1-vpn:ProxyID1": {"state": "active"},
"east1-vpn:ProxyID2": {"state": "active"},
"east1-vpn:ProxyID3": {"state": "active"},
"central1-vpn:ProxyID1": {"state": "init"},
}
}
assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", proxy_ids=["ProxyID1", "ProxyID2", "ProxyID3"], require_all_active=require_all_active
) == CheckResult(expected_status)

@pytest.mark.parametrize(
"require_all_active, expected_status, reason",
[
(True, CheckStatus.FAIL, "Tunnel:ProxyID east1-vpn:ProxyID3 in state: init."),
(False, CheckStatus.SUCCESS, ""),
],
)
def test_ipsec_tunnel_status_proxyids_some_active(self, require_all_active, expected_status, reason, check_firewall_mock):
"""tunnel with proxyids - proxyids given and some active
Should return fail by default. Success if require_all_active is False.
"""
check_firewall_mock._node.get_tunnels.return_value = {
"IPSec": {
"east1-vpn:ProxyID1": {"state": "active"},
"east1-vpn:ProxyID2": {"state": "active"},
"east1-vpn:ProxyID3": {"state": "init"},
"central1-vpn:ProxyID1": {"state": "init"},
}
}
assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", proxy_ids=["ProxyID1", "ProxyID2", "ProxyID3"], require_all_active=require_all_active
) == CheckResult(expected_status, reason=reason)

@pytest.mark.parametrize(
"require_all_active, expected_status, reason",
[
(True, CheckStatus.FAIL, "Tunnel:ProxyID east1-vpn:ProxyID1 in state: init."),
(False, CheckStatus.FAIL, "No active state for tunnel east1-vpn in ProxyIDs ['ProxyID1', 'ProxyID2', 'ProxyID3']."),
],
)
def test_ipsec_tunnel_status_proxyids_none_active(self, require_all_active, expected_status, reason, check_firewall_mock):
"""tunnel with proxyids - proxyids given and all not active
Should return fail whether require_all_active is True or False.
"""
check_firewall_mock._node.get_tunnels.return_value = {
"IPSec": {
"east1-vpn:ProxyID1": {"state": "init"},
"east1-vpn:ProxyID2": {"state": "init"},
"east1-vpn:ProxyID3": {"state": "init"},
"central1-vpn:ProxyID1": {"state": "active"},
}
}
assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", proxy_ids=["ProxyID1", "ProxyID2", "ProxyID3"], require_all_active=require_all_active
) == CheckResult(expected_status, reason=reason)

@pytest.mark.parametrize(
"require_all_active, expected_status",
[
(True, CheckStatus.SUCCESS),
(False, CheckStatus.SUCCESS),
],
)
def test_ipsec_tunnel_status_none_proxyids_all_active(self, require_all_active, expected_status, check_firewall_mock):
"""tunnel with proxyids - proxyids not given and all active"""
check_firewall_mock._node.get_tunnels.return_value = {
"IPSec": {
"east1-vpn:ProxyID1": {"state": "active"},
"east1-vpn:ProxyID2": {"state": "active"},
"east1-vpn:ProxyID3": {"state": "active"},
"central1-vpn:ProxyID1": {"state": "init"},
}
}
assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", require_all_active=require_all_active
) == CheckResult(expected_status)

assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", proxy_ids=[], require_all_active=require_all_active
) == CheckResult(expected_status)

@pytest.mark.parametrize(
"require_all_active, expected_status, reason",
[
(True, CheckStatus.FAIL, "Tunnel:ProxyID east1-vpn:ProxyID2 in state: init."),
(False, CheckStatus.SUCCESS, ""),
],
)
def test_ipsec_tunnel_status_none_proxyids_some_active(
self, require_all_active, expected_status, reason, check_firewall_mock
):
"""tunnel with proxyids - proxyids not given and some active
Should return fail by default. Success if require_all_active is False.
"""
check_firewall_mock._node.get_tunnels.return_value = {
"IPSec": {
"east1-vpn:ProxyID1": {"state": "active"},
"east1-vpn:ProxyID2": {"state": "init"},
"east1-vpn:ProxyID3": {"state": "active"},
"central1-vpn:ProxyID1": {"state": "init"},
}
}
assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", require_all_active=require_all_active
) == CheckResult(expected_status, reason=reason)

assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", proxy_ids=[], require_all_active=require_all_active
) == CheckResult(expected_status, reason=reason)

@pytest.mark.parametrize(
"require_all_active, expected_status, reason",
[
(True, CheckStatus.FAIL, "Tunnel:ProxyID east1-vpn:ProxyID1 in state: init."),
(False, CheckStatus.FAIL, "No active state for tunnel east1-vpn in ProxyIDs ['ProxyID1', 'ProxyID2', 'ProxyID3']."),
],
)
def test_ipsec_tunnel_status_none_proxyids_none_active(
self, require_all_active, expected_status, reason, check_firewall_mock
):
"""tunnel with proxyids - proxyids not given and not active
Should return fail whether require_all_active is True or False.
"""
check_firewall_mock._node.get_tunnels.return_value = {
"IPSec": {
"east1-vpn:ProxyID1": {"state": "init"},
"east1-vpn:ProxyID2": {"state": "init"},
"east1-vpn:ProxyID3": {"state": "init"},
"central1-vpn:ProxyID1": {"state": "active"},
}
}
assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", require_all_active=require_all_active
) == CheckResult(expected_status, reason=reason)

assert check_firewall_mock.check_ipsec_tunnel_status(
tunnel_name="east1-vpn", proxy_ids=[], require_all_active=require_all_active
) == CheckResult(expected_status, reason=reason)

def test_check_free_disk_space_ok(self, check_firewall_mock):
check_firewall_mock._node.get_disk_utilization.return_value = {"/opt/panrepo": 50000}

Expand Down
Loading