diff --git a/.unreleased/LLT-5797-batch-test-improvements b/.unreleased/LLT-5797-batch-test-improvements new file mode 100644 index 000000000..e69de29bb diff --git a/nat-lab/pyproject.toml b/nat-lab/pyproject.toml index 53b092f18..998ce9b90 100644 --- a/nat-lab/pyproject.toml +++ b/nat-lab/pyproject.toml @@ -85,7 +85,8 @@ markers = [ "ipv4: tests only ipv4 WG connectivity", "ipv6: tests only ipv6 WG connectivity", "ipv4v6: tests dual stack WG connectivity", - "batching: tests packet batching" + "batching: tests packet batching", + "utils: tests the natlab utilities", ] filterwarnings = [ "ignore::DeprecationWarning" diff --git a/nat-lab/run_local.py b/nat-lab/run_local.py index 3bb3839cd..df23168b2 100755 --- a/nat-lab/run_local.py +++ b/nat-lab/run_local.py @@ -58,6 +58,9 @@ def main() -> int: parser.add_argument( "--linux-native", action="store_true", help="Run tests with 'linux_native' mark" ) + parser.add_argument( + "--utils", action="store_true", help="Run tests with 'utils' mark" + ) parser.add_argument("--nobuild", action="store_true", help="Don't build TCLI") parser.add_argument("--notests", action="store_true", help="Don't run tests") parser.add_argument( diff --git a/nat-lab/tests/test_batching.py b/nat-lab/tests/test_batching.py index cc8cda74e..a7f93a440 100644 --- a/nat-lab/tests/test_batching.py +++ b/nat-lab/tests/test_batching.py @@ -1,17 +1,14 @@ import asyncio +import itertools import pytest -import random from contextlib import AsyncExitStack from helpers import SetupParameters, setup_environment from itertools import zip_longest -from scapy.layers.inet import TCP, UDP # type: ignore +from scapy.layers.inet import TCP, UDP, ICMP # type: ignore +from scapy.layers.l2 import ARP # type: ignore from timeouts import TEST_BATCHING_TIMEOUT -from typing import List, Tuple -from utils.batching import ( - capture_traffic, - print_histogram, - generate_histogram_from_pcap, -) +from typing import List +from utils.asyncio_util import run_async_context from utils.bindings import ( features_with_endpoint_providers, FeatureLinkDetection, @@ -19,28 +16,34 @@ FeatureBatching, EndpointProvider, RelayState, + NodeState, + PathType, TelioAdapterType, ) from utils.connection import DockerConnection from utils.connection_util import DOCKER_GW_MAP, ConnectionTag, container_id +from utils.traffic import ( + capture_traffic, + render_chart, + generate_packet_distribution_histogram, + generate_packet_delay_histogram, +) -BATCHING_MISALIGN_RANGE = (0, 5) # Seconds to sleep for peers before starting -BATCHING_CAPTURE_TIME = 240 # Tied to TEST_BATCHING_TIMEOUT +BATCHING_MISALIGN_S = 7 +BATCHING_CAPTURE_TIME = 120 # Tied to TEST_BATCHING_TIMEOUT def _generate_setup_parameters( conn_tag: ConnectionTag, adapter: TelioAdapterType, batching: bool ) -> SetupParameters: - features = features_with_endpoint_providers( - [EndpointProvider.UPNP, EndpointProvider.LOCAL, EndpointProvider.STUN] - ) + features = features_with_endpoint_providers([EndpointProvider.STUN]) features.link_detection = FeatureLinkDetection( rtt_seconds=1, no_of_pings=1, use_for_downgrade=True ) features.batching = ( FeatureBatching( - direct_connection_threshold=35, + direct_connection_threshold=15, trigger_effective_duration=10, trigger_cooldown_duration=60, ) @@ -48,10 +51,10 @@ def _generate_setup_parameters( else None ) features.wireguard.persistent_keepalive = FeaturePersistentKeepalive( - direct=70, - proxying=70, - stun=70, - vpn=70, + direct=30, + proxying=30, + stun=30, + vpn=30, ) return SetupParameters( @@ -70,58 +73,6 @@ def _generate_setup_parameters( ConnectionTag.DOCKER_CONE_CLIENT_2, TelioAdapterType.LINUX_NATIVE_TUN, ), - ( - ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1, - TelioAdapterType.LINUX_NATIVE_TUN, - ), - ( - ConnectionTag.DOCKER_SYMMETRIC_CLIENT_2, - TelioAdapterType.LINUX_NATIVE_TUN, - ), - ( - ConnectionTag.DOCKER_UPNP_CLIENT_1, - TelioAdapterType.LINUX_NATIVE_TUN, - ), - ( - ConnectionTag.DOCKER_UPNP_CLIENT_2, - TelioAdapterType.LINUX_NATIVE_TUN, - ), - ( - ConnectionTag.DOCKER_SHARED_CLIENT_1, - TelioAdapterType.LINUX_NATIVE_TUN, - ), - ( - ConnectionTag.DOCKER_OPEN_INTERNET_CLIENT_1, - TelioAdapterType.LINUX_NATIVE_TUN, - ), - ( - ConnectionTag.DOCKER_OPEN_INTERNET_CLIENT_2, - TelioAdapterType.LINUX_NATIVE_TUN, - ), - ( - ConnectionTag.DOCKER_OPEN_INTERNET_CLIENT_DUAL_STACK, - TelioAdapterType.LINUX_NATIVE_TUN, - ), - ( - ConnectionTag.DOCKER_UDP_BLOCK_CLIENT_1, - TelioAdapterType.LINUX_NATIVE_TUN, - ), - ( - ConnectionTag.DOCKER_UDP_BLOCK_CLIENT_2, - TelioAdapterType.LINUX_NATIVE_TUN, - ), - ( - ConnectionTag.DOCKER_INTERNAL_SYMMETRIC_CLIENT, - TelioAdapterType.LINUX_NATIVE_TUN, - ), - (ConnectionTag.DOCKER_FULLCONE_CLIENT_1, TelioAdapterType.LINUX_NATIVE_TUN), - (ConnectionTag.DOCKER_FULLCONE_CLIENT_2, TelioAdapterType.LINUX_NATIVE_TUN), - ( - ConnectionTag.MAC_VM, - TelioAdapterType.NEP_TUN, - ), - (ConnectionTag.WINDOWS_VM_1, TelioAdapterType.WINDOWS_NATIVE_TUN), - (ConnectionTag.WINDOWS_VM_2, TelioAdapterType.WIREGUARD_GO_TUN), ] # This test captures histograms of network activity to evaluate the effect of local batching in libtelio. # Since only local batching is implemented, no client-generated traffic should occur during the test. @@ -132,44 +83,38 @@ def _generate_setup_parameters( # not do anything about syncing the keepalives between the peers. -# TODO: Add asserts for local batching -# TODO: Implement received-data-trigger batching @pytest.mark.asyncio @pytest.mark.timeout(TEST_BATCHING_TIMEOUT) @pytest.mark.parametrize( - "setup_params,misalign_sleep_range,capture_duration", + "setup_params,misalign_sleep_s,capture_duration", [ pytest.param( [ - _generate_setup_parameters(conn_tag, adapter, False) + _generate_setup_parameters(conn_tag, adapter, True) for conn_tag, adapter in ALL_NODES ], - BATCHING_MISALIGN_RANGE, + BATCHING_MISALIGN_S, BATCHING_CAPTURE_TIME, marks=[ pytest.mark.batching, - pytest.mark.mac, - pytest.mark.windows, ], ), pytest.param( [ - _generate_setup_parameters(conn_tag, adapter, True) + _generate_setup_parameters(conn_tag, adapter, False) for conn_tag, adapter in ALL_NODES ], - BATCHING_MISALIGN_RANGE, + BATCHING_MISALIGN_S, BATCHING_CAPTURE_TIME, marks=[ pytest.mark.batching, - pytest.mark.mac, - pytest.mark.windows, ], ), ], ) async def test_batching( setup_params: List[SetupParameters], - misalign_sleep_range: Tuple[int, int], + misalign_sleep_s: int, capture_duration: int, ) -> None: async with AsyncExitStack() as exit_stack: @@ -208,29 +153,56 @@ async def test_batching( await client.stop_device() # misalign the peers by sleeping some before starting each node again - async def start_node_manually(client, node, sleep_min: int, sleep_max: int): - await asyncio.sleep(random.randint(sleep_min, sleep_max)) + async def start_node_manually(client, node, sleep_s): + await asyncio.sleep(sleep_s) await client.simple_start() await client.set_meshnet_config(env.api.get_meshnet_config(node.id)) await asyncio.gather(*[ - start_node_manually( - client, node, misalign_sleep_range[0], misalign_sleep_range[1] + start_node_manually(client, node, i * misalign_sleep_s) + for i, (client, node) in enumerate(cnodes) + ]) + + await asyncio.gather(*[ + await exit_stack.enter_async_context( + run_async_context( + client.wait_for_state_peer( + node.public_key, [NodeState.CONNECTED], [PathType.DIRECT] + ) + ) ) - for client, node in cnodes + for client, node in itertools.product(env.clients, env.nodes) + if not client.is_node(node) ]) pyro5_ports = [ int(port) for port in {client.get_proxy_port() for client in env.clients} ] + print("Pyro ports", pyro5_ports) + # In general it's not great to filter traffic but for testing and observing + # it's crucial since it distorts the results. For example Pyro traffic is a constant stream of + # TCP packets allow_pcap_filters = [ ( - "IP46 + No Pyro5 traffic", + "No Pyro5, SSDP, ARP", lambda p: ( - (p.haslayer(UDP) or p.haslayer(TCP)) - and p.sport not in pyro5_ports - and p.dport not in pyro5_ports + ( + (p.haslayer(UDP) or p.haslayer(TCP)) + and p.sport not in pyro5_ports + and p.dport not in pyro5_ports + ) + and ( + not p.haslayer(ICMP) + or p.haslayer(ICMP) + and p[ICMP].type in [0, 8] + ) + and ( + p.haslayer(UDP) + and p[UDP].sport != 1900 + and p[UDP].dport != 1900 + ) + and (not p.haslayer(ARP)) ), ), ] @@ -247,9 +219,24 @@ async def start_node_manually(client, node, sleep_min: int, sleep_max: int): pcap_paths = await asyncio.gather(*pcap_capture_tasks) + is_batching_enabled = env.clients[0].get_features().batching is not None for container, pcap_path in zip(container_names, pcap_paths): - for filt in allow_pcap_filters: - filter_name = filt[0] - hs = generate_histogram_from_pcap(pcap_path, capture_duration, filt[1]) - title = f"{container}-filter({filter_name})" - print_histogram(title, hs, max_height=12) + distribution_hs = generate_packet_distribution_histogram( + pcap_path, capture_duration, allow_pcap_filters + ) + delay_hs = generate_packet_delay_histogram( + pcap_path, capture_duration, allow_pcap_filters + ) + + batch_str = "batch" if is_batching_enabled else "nobatch" + + print(f"*** {container}-{batch_str} ***") + + distribution_chart = render_chart(distribution_hs) + delay_chart = render_chart(delay_hs) + + print("Distribution chart below") + print(distribution_chart) + + print("Delay chart below") + print(delay_chart) diff --git a/nat-lab/tests/timeouts.py b/nat-lab/tests/timeouts.py index 3b693906b..1dfb344c0 100644 --- a/nat-lab/tests/timeouts.py +++ b/nat-lab/tests/timeouts.py @@ -12,4 +12,4 @@ TEST_NODE_STATE_FLICKERING_RELAY_TIMEOUT = 180 TEST_NODE_STATE_FLICKERING_DIRECT_TIMEOUT = 180 TEST_MESH_STATE_AFTER_DISCONNECTING_NODE_TIMEOUT = 300 -TEST_BATCHING_TIMEOUT = 600 +TEST_BATCHING_TIMEOUT = 1000 diff --git a/nat-lab/tests/utils/__init__.py b/nat-lab/tests/utils/__init__.py index e69de29bb..16de5221f 100644 --- a/nat-lab/tests/utils/__init__.py +++ b/nat-lab/tests/utils/__init__.py @@ -0,0 +1 @@ +from .traffic import * diff --git a/nat-lab/tests/utils/batching.py b/nat-lab/tests/utils/batching.py deleted file mode 100644 index 8c84e1c37..000000000 --- a/nat-lab/tests/utils/batching.py +++ /dev/null @@ -1,134 +0,0 @@ -import asyncio -import math -import os -import pytest -import subprocess -import tempfile -import typing -from scapy.all import PcapReader, Packet # type: ignore -from typing import Callable, List, Optional - - -def _generate_histogram( - data: list[int], buckets: int, bucket_size: int = 1 -) -> List[int]: - assert len(data) > 0 - max_val = max(data) - - if max_val >= buckets * bucket_size: - raise ValueError( - f"Histogram doesn't fit the data({max_val}). The max value is {max_val} but the histogram has {buckets} buckets with a width of" - f" {bucket_size}, the maximum value it can fit is {buckets * bucket_size}" - ) - - hs = [0] * buckets - - for v in data: - bucket_index = int(v / bucket_size) - hs[bucket_index] += 1 - - return hs - - -async def capture_traffic(container_name: str, duration_s: int) -> str: - cmd_rm = f"docker exec --privileged {container_name} rm /home/capture.pcap" - os.system(cmd_rm) - - iface = "any" - capture_path = "/home/capture.pcap" - - cmd = f"docker exec -d --privileged {container_name} tcpdump -i {iface} -U -w {capture_path}" - res = os.system(cmd) - if res != 0: - raise RuntimeError(f"Failed to launch tcpdump on {container_name}") - - await asyncio.sleep(duration_s) - - with tempfile.NamedTemporaryFile() as tmpfile: - local_path = f"{tmpfile.name}.pcap" - print(f"Copying pcap to {local_path}") - subprocess.run([ - "docker", - "cp", - container_name + ":" + "/home/capture.pcap", - local_path, - ]) - - cmd_rm = f"docker exec --privileged {container_name} pkill tcpdump" - os.system(cmd_rm) - - return local_path - - -# Render ASCII histogram drawing for visual inspection -def print_histogram(name: str, data: List[int], max_height=None): - output = [] - if not data: - output.append(f"No data provided for {name}") - return - - max_value = max(data) - - if max_height is None: - max_height = max_value - - scaled_data = [math.ceil((value / max_value) * max_height) for value in data] - for row in range(max_height, 0, -1): - line = "" - for value in scaled_data: - if value >= row: - line += "█" - else: - line += " " - line = "|" + line - output.append(line) - - output.append(f"+{'-' * (len(data))}") - output.append(f"0{' ' * (len(data)-1)}{len(data)}") - output.append(f"^-Histogram of {name}") - - print("\n".join(output)) - - -def generate_histogram_from_pcap( - pcap_path: str, - buckets: int, - allow_packet_filter: Optional[Callable[[Packet], bool]], -) -> typing.List[int]: - print("Looking for a pcap at", pcap_path) - - first_packet_time = None - timestamps = [] - - with PcapReader(pcap_path) as pcap_reader: - first_packet = True - for pkt in pcap_reader: - if first_packet: - first_packet_time = pkt.time - first_packet = False - - if allow_packet_filter and not allow_packet_filter(pkt): - continue - - timestamps.append(pkt.time - first_packet_time) - - # we either filtered out everything or didn't receive any traffic - if len(timestamps) == 0: - return [] - - return _generate_histogram(timestamps, buckets) - - -@pytest.mark.asyncio -async def test_histogram(): - data = [] - for _ in range(10): - data.append(2) - data.append(3) - - for _ in range(50): - data.append(4) - - data.append(9) - - assert _generate_histogram(data, 10, 1) == [0, 0, 10, 10, 50, 0, 0, 0, 0, 1] diff --git a/nat-lab/tests/utils/test.pcap b/nat-lab/tests/utils/test.pcap new file mode 100644 index 000000000..75ed54b04 Binary files /dev/null and b/nat-lab/tests/utils/test.pcap differ diff --git a/nat-lab/tests/utils/test_traffic.py b/nat-lab/tests/utils/test_traffic.py new file mode 100644 index 000000000..160c65134 --- /dev/null +++ b/nat-lab/tests/utils/test_traffic.py @@ -0,0 +1,49 @@ +import pytest +from scapy.layers.inet import TCP, ICMP # type: ignore +from utils import generate_histogram, generate_packet_delay_histogram + + +@pytest.mark.utils +async def test_histogram(): + data = [] + for _ in range(10): + data.append(2) + data.append(3) + + for _ in range(50): + data.append(4) + + data.append(9) + + assert generate_histogram(data, 10, 1) == [0, 0, 10, 10, 50, 0, 0, 0, 0, 1] + + +@pytest.mark.utils +@pytest.mark.parametrize( + "filter_name,filter_func,expected_hs", + [ + ( + "icmp", + lambda p: p.haslayer(ICMP), + # fmt: off + [8, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + # fmt: on + ), + ( + "tcp", + lambda p: p.haslayer(TCP), + # fmt: off + [ 41, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,0,], + # fmt: on + ), + ], +) +def test_stats_and_filters_delay(filter_name, filter_func, expected_hs): + # Test packet capture file made inside of a docker container that has some TCP, UDP, ARP, ICMP packets, few ARP packets and ping every 7seconds + # It is very small pcap which can be easily observed and assertions made. It is very small, in total of 66 packets + pcap_path = "./tests/utils/test.pcap" + + delay_hs = generate_packet_delay_histogram( + pcap_path, 30, [(filter_name, filter_func)] + ) + assert expected_hs == delay_hs diff --git a/nat-lab/tests/utils/traffic.py b/nat-lab/tests/utils/traffic.py new file mode 100644 index 000000000..1c73e1441 --- /dev/null +++ b/nat-lab/tests/utils/traffic.py @@ -0,0 +1,146 @@ +import asyncio +import math +import os +import subprocess +import tempfile +import typing +from scapy.all import PcapReader # type: ignore +from typing import List, Any + + +def generate_histogram(data: list, buckets: int, bucket_size: int = 1) -> List[int]: + """Generate histogram based on passed data. Each item increases the count in respective bucket of histogram""" + assert len(data) > 0 + max_val = max(data) + + if max_val >= buckets * bucket_size: + raise ValueError( + f"Histogram doesn't fit the data({max_val}). The max value is {max_val} but the histogram has {buckets} buckets with a width of" + f" {bucket_size}, the maximum value it can fit is {buckets * bucket_size}" + ) + + hs = [0] * buckets + + for v in data: + bucket_index = int(v / bucket_size) + hs[bucket_index] += 1 + + return hs + + +async def capture_traffic(container_name: str, duration_s: int) -> str: + """Capture traffic on the target container for a duration of time. Returned is the path of a *.pcap file""" + + cmd_rm = f"docker exec --privileged {container_name} rm /home/capture.pcap" + os.system(cmd_rm) + + iface = "any" + capture_path = "/home/capture.pcap" + + cmd = f"docker exec -d --privileged {container_name} tcpdump -i {iface} -U -w {capture_path}" + res = os.system(cmd) + if res != 0: + raise RuntimeError(f"Failed to launch tcpdump on {container_name}") + + await asyncio.sleep(duration_s) + + # Use temporary file so it would not collide, however don't delete it as it + # leaves ability for us to inspect it manually + with tempfile.NamedTemporaryFile(delete=False) as tmpfile: + local_path = tmpfile.name + print(f"Copying pcap to {local_path}") + subprocess.run([ + "docker", + "cp", + container_name + ":" + "/home/capture.pcap", + local_path, + ]) + + cmd_rm = f"docker exec --privileged {container_name} pkill tcpdump" + os.system(cmd_rm) + + return local_path + + +# Render ASCII chart into a string +def render_chart(data: List[int], max_height=10) -> str: + """Render ASCII chart into a string and return it""" + + if not data: + raise ValueError("No data provided to render") + + output = [] + + max_value = max(data) + scaled_data = [math.ceil((value / max_value) * max_height) for value in data] + for row in range(max_height, 0, -1): + line = "" + for value in scaled_data: + if value >= row: + line += "█" + else: + line += " " + line = "|" + line + output.append(line) + + output.append(f"+{'-' * (len(data))}") + output.append(f"0{' ' * (len(data)-1)}{len(data)}") + + return "\n".join(output) + + +def generate_packet_delay_histogram( + pcap_path: str, + buckets: int, + allow_packet_filters: Any, +) -> typing.List[int]: + """Generate histogram based on the relative packet(and packet before) timestamp differences. Good for observing bursts""" + + print("Looking for a pcap at", pcap_path) + + last_packet_time = None + timestamps = [] + + with PcapReader(pcap_path) as pcap_reader: + for pkt in pcap_reader: + if last_packet_time is None: + last_packet_time = pkt.time + + if all(f[1](pkt) for f in allow_packet_filters): + timestamps.append(pkt.time - last_packet_time) + last_packet_time = pkt.time + + if len(timestamps) == 0: + raise ValueError( + "No data for histogram generation. It was either fully filtered out or not present" + ) + + return generate_histogram(timestamps, buckets) + + +def generate_packet_distribution_histogram( + pcap_path: str, + buckets: int, + allow_packet_filters: Any, +) -> typing.List[int]: + """Generate histogram based on absolute packet timestamps. Good for observing trends and patterns""" + + print("Looking for a pcap at", pcap_path) + + first_packet_time = None + timestamps = [] + + with PcapReader(pcap_path) as pcap_reader: + for pkt in pcap_reader: + if first_packet_time is None: + first_packet_time = pkt.time + + if all(f[1](pkt) for f in allow_packet_filters): + timestamps.append(pkt.time - first_packet_time) + + if len(timestamps) == 0: + raise ValueError( + "No data for histogram generation. It was either fully filtered out or not present" + ) + + return generate_histogram(timestamps, buckets)