Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor(eos_designs): Refactor eos_designs structured_config code for router_path_selection #5002

Open
wants to merge 12 commits into
base: devel
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def _cv_pathfinder_profiles(self: AvdStructuredConfigNetworkServicesProtocol) ->
for match in policy.get("matches", []):
profile = {
"name": match["avt_profile"],
"load_balance_policy": match["load_balance_policy"]["name"],
"load_balance_policy": match["load_balance_policy"].name,
}
if (internet_exit_policy_name := match["internet_exit_policy_name"]) is not None and internet_exit_policy_name in [
policy.name for policy, _connections in self._filtered_internet_exit_policies_and_connections
Expand All @@ -123,7 +123,7 @@ def _cv_pathfinder_profiles(self: AvdStructuredConfigNetworkServicesProtocol) ->
if (default_match := policy.get("default_match")) is not None:
profile = {
"name": default_match["avt_profile"],
"load_balance_policy": default_match["load_balance_policy"]["name"],
"load_balance_policy": default_match["load_balance_policy"].name,
}
if (internet_exit_policy_name := default_match["internet_exit_policy_name"]) is not None and internet_exit_policy_name in [
policy.name for policy, _connections in self._filtered_internet_exit_policies_and_connections
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
# that can be found in the LICENSE file.
from __future__ import annotations

from functools import cached_property
from typing import TYPE_CHECKING, Protocol

from pyavd._utils import append_if_not_duplicate, get, strip_empties_from_dict
from pyavd._eos_cli_config_gen.schema import EosCliConfigGen
from pyavd._eos_designs.structured_config.structured_config_generator import structured_config_contributor
from pyavd._utils import get

if TYPE_CHECKING:
from . import AvdStructuredConfigNetworkServicesProtocol
Expand All @@ -19,15 +20,13 @@ class RouterPathSelectionMixin(Protocol):
Class should only be used as Mixin to a AvdStructuredConfig class.
"""

@cached_property
def router_path_selection(self: AvdStructuredConfigNetworkServicesProtocol) -> dict | None:
@structured_config_contributor
def router_path_selection(self: AvdStructuredConfigNetworkServicesProtocol) -> None:
"""Return structured config for router path-selection (DPS)."""
if not self.shared_utils.is_wan_router:
return None
return

router_path_selection = {
"load_balance_policies": self._wan_load_balance_policies(),
}
self._set_wan_load_balance_policies()

# When running CV Pathfinder, only load balance policies are configured
# for AutoVPN, need also vrfs and policies.
Expand All @@ -36,53 +35,34 @@ def router_path_selection(self: AvdStructuredConfigNetworkServicesProtocol) -> d
{"name": vrf.name, "path_selection_policy": f"{vrf.policy}-WITH-CP" if vrf.name == "default" else vrf.policy} for vrf in self._filtered_wan_vrfs
]

router_path_selection.update(
{
"policies": self._autovpn_policies(),
"vrfs": vrfs,
},
)
for vrf in vrfs:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above we are iterating on self._filtered_wan_vrfs and here again iterating on vrfs so i think we can from the vrfs data in above loop only with if condition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

self.structured_config.router_path_selection.vrfs.append(EosCliConfigGen.RouterPathSelection.VrfsItem(**vrf))

return strip_empties_from_dict(router_path_selection)
self._autovpn_policies()

def _wan_load_balance_policies(self: AvdStructuredConfigNetworkServicesProtocol) -> list:
"""Return a list of load balance policies."""
load_balance_policies = []
def _set_wan_load_balance_policies(self: AvdStructuredConfigNetworkServicesProtocol) -> None:
"""Set list of load balance policies."""
for policy in self._filtered_wan_policies:
for match in policy.get("matches", []):
append_if_not_duplicate(
list_of_dicts=load_balance_policies,
primary_key="name",
new_dict=match["load_balance_policy"],
context="Router Path-Selection load-balance policies.",
context_keys=["name"],
)
if (default_match := policy.get("default_match")) is not None:
append_if_not_duplicate(
list_of_dicts=load_balance_policies,
primary_key="name",
new_dict=default_match["load_balance_policy"],
context="Router Path-Selection load-balance policies.",
context_keys=["name"],
)
if "load_balance_policy" in match:
self.structured_config.router_path_selection.load_balance_policies.append(match["load_balance_policy"])

return load_balance_policies
if (default_match := policy.get("default_match")) is not None and "load_balance_policy" in default_match:
self.structured_config.router_path_selection.load_balance_policies.append(default_match["load_balance_policy"])

def _autovpn_policies(self: AvdStructuredConfigNetworkServicesProtocol) -> list:
def _autovpn_policies(self: AvdStructuredConfigNetworkServicesProtocol) -> None:
"""Return a list of policies for AutoVPN."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Return a list of policies for AutoVPN."""
"""Set the list of policies for AutoVPN."""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

policies = []
for policy in self._filtered_wan_policies:
autovpn_policy = {"name": policy["name"], "rules": []}
policy_item = EosCliConfigGen.RouterPathSelection.PoliciesItem()
policy_item.name = policy["name"]
for index, match in enumerate(get(policy, "matches", default=[]), start=1):
autovpn_policy["rules"].append(
{
"id": 10 * index,
"application_profile": match["application_profile"],
"load_balance": match["load_balance_policy"]["name"],
},
policy_item.rules.append(
EosCliConfigGen.RouterPathSelection.PoliciesItem.RulesItem(
id=10 * index,
application_profile=get(match, "application_profile"),
load_balance=match["load_balance_policy"].name if "load_balance_policy" in match else None,
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
policy_item.rules.append(
EosCliConfigGen.RouterPathSelection.PoliciesItem.RulesItem(
id=10 * index,
application_profile=get(match, "application_profile"),
load_balance=match["load_balance_policy"].name if "load_balance_policy" in match else None,
)
)
policy_item.rules.append_new(
id=10 * index,
application_profile=get(match, "application_profile"),
load_balance=match["load_balance_policy"].name if "load_balance_policy" in match else None,
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (default_match := policy.get("default_match")) is not None:
autovpn_policy["default_match"] = {"load_balance": default_match["load_balance_policy"]["name"]}

policies.append(strip_empties_from_dict(autovpn_policy))
return policies
if (default_match := policy.get("default_match")) is not None and "load_balance_policy" in default_match:
policy_item.default_match.load_balance = default_match["load_balance_policy"].name
self.structured_config.router_path_selection.policies.append(policy_item)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from functools import cached_property
from typing import TYPE_CHECKING, Literal, Protocol

from pyavd._eos_cli_config_gen.schema import EosCliConfigGen
from pyavd._eos_designs.schema import EosDesigns
from pyavd._errors import AristaAvdError, AristaAvdInvalidInputsError, AristaAvdMissingVariableError
from pyavd._utils import get, get_ip_from_ip_prefix
Expand Down Expand Up @@ -71,6 +72,7 @@ def _filtered_wan_policies(self: AvdStructuredConfigNetworkServicesProtocol) ->
)
raise AristaAvdInvalidInputsError(msg)

# TODO: remove as_dict and fix the rest of utils_wan in another PR
vrf_policy = self._wan_virtual_topologies_policies[vrf.policy]._as_dict()
vrf_policy["profile_prefix"] = vrf.policy

Expand Down Expand Up @@ -104,7 +106,7 @@ def _update_policy_match_statements(self: AvdStructuredConfigNetworkServicesProt
if (
load_balance_policy := self._generate_wan_load_balance_policy(
load_balance_policy_name,
control_plane_virtual_topology._as_dict(),
control_plane_virtual_topology,
policy["name"],
)
) is None:
Expand Down Expand Up @@ -134,7 +136,9 @@ def _update_policy_match_statements(self: AvdStructuredConfigNetworkServicesProt
f"wan_virtual_topologies.policies[{policy['profile_prefix']}]."
f"application_virtual_topologies[{application_virtual_topology['application_profile']}]"
)
load_balance_policy = self._generate_wan_load_balance_policy(load_balance_policy_name, application_virtual_topology, context_path)
# TODO: Refactor this once we get objects from policy object
avt_object = EosDesigns.WanVirtualTopologies.PoliciesItem.ApplicationVirtualTopologiesItem._from_dict(application_virtual_topology)
load_balance_policy = self._generate_wan_load_balance_policy(load_balance_policy_name, avt_object, context_path)
if not load_balance_policy:
# Empty load balance policy so skipping
# TODO: Add "nodes" or similar under the profile and raise here
Expand Down Expand Up @@ -171,35 +175,28 @@ def _update_policy_match_statements(self: AvdStructuredConfigNetworkServicesProt
required=True,
custom_error_msg=f"wan_virtual_topologies.policies[{policy['profile_prefix']}].default_virtual_toplogy.",
)
# TODO: this should be removed in a future refactor
avt_object = EosDesigns.WanVirtualTopologies.PoliciesItem.DefaultVirtualTopology._from_dict(default_virtual_topology)
# Separating default_match as it is used differently
default_match = None
if not get(default_virtual_topology, "drop_unmatched", default=False):
name = get(
default_virtual_topology,
"name",
default=self._default_profile_name(policy["profile_prefix"], "DEFAULT"),
)
if not avt_object.drop_unmatched:
name = avt_object.name or self._default_profile_name(policy["profile_prefix"], "DEFAULT")
context_path = f"wan_virtual_topologies.policies[{policy['profile_prefix']}].default_virtual_topology"
# Verify that path_groups are set or raise
get(
default_virtual_topology,
"path_groups",
required=True,
custom_error_msg=f"Either 'drop_unmatched' or 'path_groups' must be set under '{context_path}'.",
)
if not avt_object.path_groups:
msg = (f"Either 'drop_unmatched' or 'path_groups' must be set under '{context_path}'.",)
raise AristaAvdInvalidInputsError(msg)
load_balance_policy_name = self.shared_utils.generate_lb_policy_name(name)
load_balance_policy = self._generate_wan_load_balance_policy(load_balance_policy_name, default_virtual_topology, context_path)
load_balance_policy = self._generate_wan_load_balance_policy(load_balance_policy_name, avt_object, context_path)
if not load_balance_policy:
msg = (
f"The `default_virtual_topology` path-groups configuration for `wan_virtual_topologies.policies[{policy['name']}]` produces "
"an empty load-balancing policy. Make sure at least one path-group present on the device is allowed in the "
"`default_virtual_topology` path-groups."
)
raise AristaAvdError(msg)
application_profile = get(default_virtual_topology, "application_profile", default="default")

default_match = {
"application_profile": application_profile,
"application_profile": "default",
"avt_profile": name,
"internet_exit_policy_name": get(default_virtual_topology, "internet_exit.policy"),
"traffic_class": get(default_virtual_topology, "traffic_class"),
Expand All @@ -219,7 +216,14 @@ def _update_policy_match_statements(self: AvdStructuredConfigNetworkServicesProt
policy["matches"] = matches
policy["default_match"] = default_match

def _generate_wan_load_balance_policy(self: AvdStructuredConfigNetworkServicesProtocol, name: str, input_dict: dict, context_path: str) -> dict | None:
def _generate_wan_load_balance_policy(
self: AvdStructuredConfigNetworkServicesProtocol,
name: str,
input_topology: EosDesigns.WanVirtualTopologies.ControlPlaneVirtualTopology
| EosDesigns.WanVirtualTopologies.PoliciesItem.DefaultVirtualTopology
| EosDesigns.WanVirtualTopologies.PoliciesItem.ApplicationVirtualTopologiesItem,
context_path: str,
) -> EosCliConfigGen.RouterPathSelection.LoadBalancePoliciesItem | None:
"""
Generate and return a router path-selection load-balance policy.
Expand All @@ -233,28 +237,24 @@ def _generate_wan_load_balance_policy(self: AvdStructuredConfigNetworkServicesPr
input_dict (dict): The dictionary containing the list of path-groups and their preference.
context_path (str): Key used for context for error messages.
"""
wan_load_balance_policy = {
"name": name,
"path_groups": [],
**get(input_dict, "constraints", default={}),
}
wan_load_balance_policy = EosCliConfigGen.RouterPathSelection.LoadBalancePoliciesItem(
name=name,
**input_topology.constraints._as_dict(),
)

if self.inputs.wan_mode == "cv-pathfinder":
wan_load_balance_policy["lowest_hop_count"] = get(input_dict, "lowest_hop_count")

# An entry is composed of a list of path-groups in `names` and a `priority`
policy_entries = get(input_dict, "path_groups", [])
# TODO: Decide if the default value in schema for lowest_hop_count is correct.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discuss between @gmuloc and @ClausHolbechArista

wan_load_balance_policy.lowest_hop_count = input_topology._get_defined_attr("lowest_hop_count") or None

# Using this flag while looping through all entries to keep track of any path group present on the remote host
any_path_group_on_wan_ha_peer = self.shared_utils.wan_ha

for policy_entry in policy_entries:
for policy_entry in input_topology.path_groups:
policy_entry_priority = None
if preference := get(policy_entry, "preference"):
policy_entry_priority = self._path_group_preference_to_eos_priority(preference, f"{context_path}[{policy_entry.get('names')}]")
if policy_entry.preference:
policy_entry_priority = self._path_group_preference_to_eos_priority(policy_entry.preference, f"{context_path}[{policy_entry.names}]")

entry_path_groups = policy_entry.get("names")
for path_group_name in entry_path_groups:
for path_group_name in policy_entry.names:
if (priority := policy_entry_priority) is None:
# No preference defined at the policy level, need to retrieve the default preference
if path_group_name not in self.inputs.wan_path_groups:
Expand All @@ -270,23 +270,21 @@ def _generate_wan_load_balance_policy(self: AvdStructuredConfigNetworkServicesPr
if self.shared_utils.is_wan_client and path_group_name not in self.shared_utils.wan_local_path_group_names:
continue

path_group = {
"name": path_group_name,
"priority": priority if priority != 1 else None,
}

wan_load_balance_policy["path_groups"].append(path_group)
wan_load_balance_policy.path_groups.append_new(
name=path_group_name,
priority=priority if priority != 1 else None,
)

# Updating peer path-groups tracking
any_path_group_on_wan_ha_peer = any_path_group_on_wan_ha_peer and set(self.shared_utils.wan_ha_peer_path_group_names).union(set(entry_path_groups))
any_path_group_on_wan_ha_peer = any_path_group_on_wan_ha_peer and set(self.shared_utils.wan_ha_peer_path_group_names).union(set(policy_entry.names))

if len(wan_load_balance_policy["path_groups"]) == 0 and not any_path_group_on_wan_ha_peer:
if len(wan_load_balance_policy.path_groups) == 0 and not any_path_group_on_wan_ha_peer:
# The policy is empty, and either the site is not using HA or no path-group in the policy is present on the HA peer
return None

if self.shared_utils.wan_ha or self.shared_utils.is_cv_pathfinder_server:
# Adding HA path-group with priority 1
wan_load_balance_policy["path_groups"].append({"name": self.inputs.wan_ha.lan_ha_path_group_name})
wan_load_balance_policy.path_groups.append_new(name=self.inputs.wan_ha.lan_ha_path_group_name)

return wan_load_balance_policy

Expand Down
Loading