Skip to content

Commit

Permalink
Use default latency implementation for latency test
Browse files Browse the repository at this point in the history
Since bitcraze/crazyflie-lib-python#492 there is a default implementation in the lib for latency. Retrieve the latency from here instead of using the custom implementation.

This means we stop discerning between large and small packet latency.
  • Loading branch information
gemenerik committed Jan 8, 2025
1 parent 51430b9 commit fa8a996
Showing 1 changed file with 35 additions and 44 deletions.
79 changes: 35 additions & 44 deletions tests/QA/test_radio.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
import time
import struct

import numpy as np

import cflib.crtp
from cflib.crtp.crtpstack import CRTPPacket
from cflib.crtp.crtpstack import CRTPPort
from conftest import ValidatedSyncCrazyflie
from cflib.utils.callbacks import Syncer

import conftest
import logging
Expand All @@ -29,12 +29,8 @@
@pytest.mark.sanity
class TestRadio:
def test_latency_small_packets(self, dev: conftest.BCDevice):
requirement = conftest.get_requirement('radio.latencysmall')
assert(latency(dev.link_uri, requirement['packet_size']) < requirement['limit_high_ms'])

def test_latency_big_packets(self, dev: conftest.BCDevice):
requirement = conftest.get_requirement('radio.latencybig')
assert(latency(dev.link_uri, requirement['packet_size']) < requirement['limit_high_ms'])
requirement = conftest.get_requirement('radio.latency')
assert(latency(dev.link_uri) < requirement['limit_high_ms'])

@pytest.mark.requirements("syslink_flowctrl")
def test_bandwidth_small_packets(self, dev: conftest.BCDevice):
Expand All @@ -46,57 +42,52 @@ def test_bandwidth_big_packets(self, dev: conftest.BCDevice):
requirement = conftest.get_requirement('radio.bwbig')
assert(bandwidth(dev.link_uri, requirement['packet_size']) > requirement['limit_low'])


@pytest.mark.requirements("syslink_flowctrl")
def test_reliability(self, dev: conftest.BCDevice):
requirement = conftest.get_requirement('radio.reliability')
# The bandwidth function will assert if there is any packet loss
bandwidth(dev.link_uri, 4, requirement['limit_low'])


def build_data(i, packet_size):
assert(packet_size % 4 == 0)
repeats = packet_size // 4
return struct.pack('<' + 'I' * repeats, *[i] * repeats)
def latency(uri, timeout=10):
"""
Retrieve the latency to a Crazyflie.
Args:
uri (str): The URI of the Crazyflie.
timeout (float): Maximum time to wait for latency updates.
def latency(uri, packet_size=4, count=500):
link = cflib.crtp.get_link_driver(uri)
Returns:
float: The latency value received.
try:
pk = CRTPPacket()
pk.set_header(CRTPPort.LINKCTRL, 0) # Echo channel
Raises:
TimeoutError: If the timeout is reached during latency retrieval.
"""
with ValidatedSyncCrazyflie(uri) as scf:
syncer = Syncer()

latencies = []
for i in range(count):
pk.data = build_data(i, packet_size)
def on_latency_update(latency):
syncer.success_cb(latency)

start_time = time.time()
if not link.send_packet(pk):
link.close()
raise Exception("send_packet() timeout!")
while True:
pk_ack = link.receive_packet(2)
if pk_ack is None:
link.close()
raise Exception("Receive packet timeout!")
if pk_ack.port == CRTPPort.LINKCTRL and pk_ack.channel == 0:
break
end_time = time.time()
# Add the callback
scf.cf.link_statistics.latency_updated.add_callback(on_latency_update)

# make sure we actually received the expected value
i_recv, = struct.unpack('<I', pk_ack.data[0:4])
assert(i == i_recv)
latencies.append((end_time - start_time) * 1000)
except Exception as e:
link.close()
raise e
try:
# Wait for latency update
success = syncer._event.wait(timeout)
if not success:
raise TimeoutError("Timed out waiting for a latency update.")
latency = syncer.success_args[0]
logger.info('latency: {}'.format(latency))
return latency
finally:
scf.cf.link_statistics.latency_updated.remove_callback(on_latency_update)

link.close()
result = np.min(latencies)
logger.info('latency: {}'.format(result))

return result
def build_data(i, packet_size):
assert(packet_size % 4 == 0)
repeats = packet_size // 4
return struct.pack('<' + 'I' * repeats, *[i] * repeats)


def bandwidth(uri, packet_size=4, count=500):
Expand Down

0 comments on commit fa8a996

Please sign in to comment.