Skip to content

Commit

Permalink
Merge pull request NordSecurity#969 from NordSecurity/LLT-5797-batch-…
Browse files Browse the repository at this point in the history
…test-improvements

LLT-5797-batch-test-improvements
  • Loading branch information
LukasPukenis authored Nov 29, 2024
2 parents e949d77 + 03102fa commit b6aff68
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 232 deletions.
Empty file.
3 changes: 2 additions & 1 deletion nat-lab/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ markers = [
"ipv4: tests only ipv4 WG connectivity",
"ipv6: tests only ipv6 WG connectivity",
"ipv4v6: tests dual stack WG connectivity",
"batching: tests packet batching"
"batching: tests packet batching",
"utils: tests the natlab utilities",
]
filterwarnings = [
"ignore::DeprecationWarning"
Expand Down
3 changes: 3 additions & 0 deletions nat-lab/run_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ def main() -> int:
parser.add_argument(
"--linux-native", action="store_true", help="Run tests with 'linux_native' mark"
)
parser.add_argument(
"--utils", action="store_true", help="Run tests with 'utils' mark"
)
parser.add_argument("--nobuild", action="store_true", help="Don't build TCLI")
parser.add_argument("--notests", action="store_true", help="Don't run tests")
parser.add_argument(
Expand Down
179 changes: 83 additions & 96 deletions nat-lab/tests/test_batching.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,60 @@
import asyncio
import itertools
import pytest
import random
from contextlib import AsyncExitStack
from helpers import SetupParameters, setup_environment
from itertools import zip_longest
from scapy.layers.inet import TCP, UDP # type: ignore
from scapy.layers.inet import TCP, UDP, ICMP # type: ignore
from scapy.layers.l2 import ARP # type: ignore
from timeouts import TEST_BATCHING_TIMEOUT
from typing import List, Tuple
from utils.batching import (
capture_traffic,
print_histogram,
generate_histogram_from_pcap,
)
from typing import List
from utils.asyncio_util import run_async_context
from utils.bindings import (
features_with_endpoint_providers,
FeatureLinkDetection,
FeaturePersistentKeepalive,
FeatureBatching,
EndpointProvider,
RelayState,
NodeState,
PathType,
TelioAdapterType,
)
from utils.connection import DockerConnection
from utils.connection_util import DOCKER_GW_MAP, ConnectionTag, container_id
from utils.traffic import (
capture_traffic,
render_chart,
generate_packet_distribution_histogram,
generate_packet_delay_histogram,
)

BATCHING_MISALIGN_RANGE = (0, 5) # Seconds to sleep for peers before starting
BATCHING_CAPTURE_TIME = 240 # Tied to TEST_BATCHING_TIMEOUT
BATCHING_MISALIGN_S = 7
BATCHING_CAPTURE_TIME = 120 # Tied to TEST_BATCHING_TIMEOUT


def _generate_setup_parameters(
conn_tag: ConnectionTag, adapter: TelioAdapterType, batching: bool
) -> SetupParameters:
features = features_with_endpoint_providers(
[EndpointProvider.UPNP, EndpointProvider.LOCAL, EndpointProvider.STUN]
)
features = features_with_endpoint_providers([EndpointProvider.STUN])

features.link_detection = FeatureLinkDetection(
rtt_seconds=1, no_of_pings=1, use_for_downgrade=True
)
features.batching = (
FeatureBatching(
direct_connection_threshold=35,
direct_connection_threshold=15,
trigger_effective_duration=10,
trigger_cooldown_duration=60,
)
if batching
else None
)
features.wireguard.persistent_keepalive = FeaturePersistentKeepalive(
direct=70,
proxying=70,
stun=70,
vpn=70,
direct=30,
proxying=30,
stun=30,
vpn=30,
)

return SetupParameters(
Expand All @@ -70,58 +73,6 @@ def _generate_setup_parameters(
ConnectionTag.DOCKER_CONE_CLIENT_2,
TelioAdapterType.LINUX_NATIVE_TUN,
),
(
ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1,
TelioAdapterType.LINUX_NATIVE_TUN,
),
(
ConnectionTag.DOCKER_SYMMETRIC_CLIENT_2,
TelioAdapterType.LINUX_NATIVE_TUN,
),
(
ConnectionTag.DOCKER_UPNP_CLIENT_1,
TelioAdapterType.LINUX_NATIVE_TUN,
),
(
ConnectionTag.DOCKER_UPNP_CLIENT_2,
TelioAdapterType.LINUX_NATIVE_TUN,
),
(
ConnectionTag.DOCKER_SHARED_CLIENT_1,
TelioAdapterType.LINUX_NATIVE_TUN,
),
(
ConnectionTag.DOCKER_OPEN_INTERNET_CLIENT_1,
TelioAdapterType.LINUX_NATIVE_TUN,
),
(
ConnectionTag.DOCKER_OPEN_INTERNET_CLIENT_2,
TelioAdapterType.LINUX_NATIVE_TUN,
),
(
ConnectionTag.DOCKER_OPEN_INTERNET_CLIENT_DUAL_STACK,
TelioAdapterType.LINUX_NATIVE_TUN,
),
(
ConnectionTag.DOCKER_UDP_BLOCK_CLIENT_1,
TelioAdapterType.LINUX_NATIVE_TUN,
),
(
ConnectionTag.DOCKER_UDP_BLOCK_CLIENT_2,
TelioAdapterType.LINUX_NATIVE_TUN,
),
(
ConnectionTag.DOCKER_INTERNAL_SYMMETRIC_CLIENT,
TelioAdapterType.LINUX_NATIVE_TUN,
),
(ConnectionTag.DOCKER_FULLCONE_CLIENT_1, TelioAdapterType.LINUX_NATIVE_TUN),
(ConnectionTag.DOCKER_FULLCONE_CLIENT_2, TelioAdapterType.LINUX_NATIVE_TUN),
(
ConnectionTag.MAC_VM,
TelioAdapterType.NEP_TUN,
),
(ConnectionTag.WINDOWS_VM_1, TelioAdapterType.WINDOWS_NATIVE_TUN),
(ConnectionTag.WINDOWS_VM_2, TelioAdapterType.WIREGUARD_GO_TUN),
]
# This test captures histograms of network activity to evaluate the effect of local batching in libtelio.
# Since only local batching is implemented, no client-generated traffic should occur during the test.
Expand All @@ -132,44 +83,38 @@ def _generate_setup_parameters(
# not do anything about syncing the keepalives between the peers.


# TODO: Add asserts for local batching
# TODO: Implement received-data-trigger batching
@pytest.mark.asyncio
@pytest.mark.timeout(TEST_BATCHING_TIMEOUT)
@pytest.mark.parametrize(
"setup_params,misalign_sleep_range,capture_duration",
"setup_params,misalign_sleep_s,capture_duration",
[
pytest.param(
[
_generate_setup_parameters(conn_tag, adapter, False)
_generate_setup_parameters(conn_tag, adapter, True)
for conn_tag, adapter in ALL_NODES
],
BATCHING_MISALIGN_RANGE,
BATCHING_MISALIGN_S,
BATCHING_CAPTURE_TIME,
marks=[
pytest.mark.batching,
pytest.mark.mac,
pytest.mark.windows,
],
),
pytest.param(
[
_generate_setup_parameters(conn_tag, adapter, True)
_generate_setup_parameters(conn_tag, adapter, False)
for conn_tag, adapter in ALL_NODES
],
BATCHING_MISALIGN_RANGE,
BATCHING_MISALIGN_S,
BATCHING_CAPTURE_TIME,
marks=[
pytest.mark.batching,
pytest.mark.mac,
pytest.mark.windows,
],
),
],
)
async def test_batching(
setup_params: List[SetupParameters],
misalign_sleep_range: Tuple[int, int],
misalign_sleep_s: int,
capture_duration: int,
) -> None:
async with AsyncExitStack() as exit_stack:
Expand Down Expand Up @@ -208,29 +153,56 @@ async def test_batching(
await client.stop_device()

# misalign the peers by sleeping some before starting each node again
async def start_node_manually(client, node, sleep_min: int, sleep_max: int):
await asyncio.sleep(random.randint(sleep_min, sleep_max))
async def start_node_manually(client, node, sleep_s):
await asyncio.sleep(sleep_s)
await client.simple_start()
await client.set_meshnet_config(env.api.get_meshnet_config(node.id))

await asyncio.gather(*[
start_node_manually(
client, node, misalign_sleep_range[0], misalign_sleep_range[1]
start_node_manually(client, node, i * misalign_sleep_s)
for i, (client, node) in enumerate(cnodes)
])

await asyncio.gather(*[
await exit_stack.enter_async_context(
run_async_context(
client.wait_for_state_peer(
node.public_key, [NodeState.CONNECTED], [PathType.DIRECT]
)
)
)
for client, node in cnodes
for client, node in itertools.product(env.clients, env.nodes)
if not client.is_node(node)
])

pyro5_ports = [
int(port) for port in {client.get_proxy_port() for client in env.clients}
]

print("Pyro ports", pyro5_ports)
# In general it's not great to filter traffic but for testing and observing
# it's crucial since it distorts the results. For example Pyro traffic is a constant stream of
# TCP packets
allow_pcap_filters = [
(
"IP46 + No Pyro5 traffic",
"No Pyro5, SSDP, ARP",
lambda p: (
(p.haslayer(UDP) or p.haslayer(TCP))
and p.sport not in pyro5_ports
and p.dport not in pyro5_ports
(
(p.haslayer(UDP) or p.haslayer(TCP))
and p.sport not in pyro5_ports
and p.dport not in pyro5_ports
)
and (
not p.haslayer(ICMP)
or p.haslayer(ICMP)
and p[ICMP].type in [0, 8]
)
and (
p.haslayer(UDP)
and p[UDP].sport != 1900
and p[UDP].dport != 1900
)
and (not p.haslayer(ARP))
),
),
]
Expand All @@ -247,9 +219,24 @@ async def start_node_manually(client, node, sleep_min: int, sleep_max: int):

pcap_paths = await asyncio.gather(*pcap_capture_tasks)

is_batching_enabled = env.clients[0].get_features().batching is not None
for container, pcap_path in zip(container_names, pcap_paths):
for filt in allow_pcap_filters:
filter_name = filt[0]
hs = generate_histogram_from_pcap(pcap_path, capture_duration, filt[1])
title = f"{container}-filter({filter_name})"
print_histogram(title, hs, max_height=12)
distribution_hs = generate_packet_distribution_histogram(
pcap_path, capture_duration, allow_pcap_filters
)
delay_hs = generate_packet_delay_histogram(
pcap_path, capture_duration, allow_pcap_filters
)

batch_str = "batch" if is_batching_enabled else "nobatch"

print(f"*** {container}-{batch_str} ***")

distribution_chart = render_chart(distribution_hs)
delay_chart = render_chart(delay_hs)

print("Distribution chart below")
print(distribution_chart)

print("Delay chart below")
print(delay_chart)
2 changes: 1 addition & 1 deletion nat-lab/tests/timeouts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
TEST_NODE_STATE_FLICKERING_RELAY_TIMEOUT = 180
TEST_NODE_STATE_FLICKERING_DIRECT_TIMEOUT = 180
TEST_MESH_STATE_AFTER_DISCONNECTING_NODE_TIMEOUT = 300
TEST_BATCHING_TIMEOUT = 600
TEST_BATCHING_TIMEOUT = 1000
1 change: 1 addition & 0 deletions nat-lab/tests/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .traffic import *
Loading

0 comments on commit b6aff68

Please sign in to comment.