From 46feab970158f6b19f962ddc374eb46d8357e7a6 Mon Sep 17 00:00:00 2001 From: Matthias Ringwald Date: Tue, 11 Jun 2024 12:05:08 +0200 Subject: [PATCH] wid/tmap: add missing wids --- autopts/wid/tmap.py | 692 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 686 insertions(+), 6 deletions(-) diff --git a/autopts/wid/tmap.py b/autopts/wid/tmap.py index 8589d66112..abdd8917e8 100644 --- a/autopts/wid/tmap.py +++ b/autopts/wid/tmap.py @@ -15,16 +15,45 @@ import logging +import re +from argparse import Namespace + from autopts.ptsprojects.stack import get_stack from autopts.pybtp import btp +from autopts.pybtp.defs import AUDIO_METADATA_STREAMING_AUDIO_CONTEXTS from autopts.wid import generic_wid_hdl from autopts.pybtp.types import * -from autopts.pybtp.btp.btp import pts_addr_get, pts_addr_type_get +from autopts.pybtp.btp.btp import pts_addr_get, pts_addr_type_get, lt2_addr_get, lt2_addr_type_get from autopts.ptsprojects.testcase import MMI +from autopts.wid.bap import create_default_config, get_audio_locations_from_pac, BAS_CONFIG_SETTINGS +from autopts.wid.ccp import BT_TBS_GTBS_INDEX log = logging.debug +def trigger_discovery_if_needed(params): + # get peer addr + if params.test_case_name.endswith('LT2'): + addr = lt2_addr_get() + addr_type = lt2_addr_type_get() + else: + addr = pts_addr_get() + addr_type = pts_addr_type_get() + + stack = get_stack() + peer = stack.bap.get_peer(addr_type, addr) + if not peer.discovery_completed: + # CAP discover includes CSIP + btp.cap_discover(addr_type, addr) + stack.cap.wait_discovery_completed_ev(addr_type, addr, 30) + + btp.bap_discover(addr_type, addr) + stack.bap.wait_discovery_completed_ev(addr_type, addr, 30) + + btp.vcp_discover(addr_type, addr) + stack.vcp.wait_discovery_completed_ev(addr_type, addr, 10) + + def tmap_wid_hdl(wid, description, test_case_name): log(f'{tmap_wid_hdl.__name__}, {wid}, {description}, {test_case_name}') return generic_wid_hdl(wid, description, test_case_name, [__name__]) @@ -78,6 +107,196 @@ def hdl_wid_100(params: WIDParams): return True +def hdl_wid_104(_: WIDParams): + """Please send non connectable advertise with periodic info.""" + + # Periodic adv started within cap_broadcast_adv_start at hdl_wid_114. + + return True + + +def hdl_wid_114(params: WIDParams): + """Please advertise with Broadcast Audio Announcement (0x1852) service data""" + + # advertisement started in hdl_wid_506 + + return True + + +wid_311_settings = { + # lt_count, num sink ASEs, sink locations (0 - don't care), num source ASEs, QoS Set Name + 'TMAP/UMS/VRC/BV-01-I': (1, 2, 0, 0, '48_2_1'), + 'TMAP/UMS/VRC/BV-02-I': (2, 1, 0, 0, '48_2_1'), + 'TMAP/UMS/VRC/BV-02-I_LT2': (2, 1, 0, 0, '48_2_1'), + 'TMAP/UMS/VRC/BV-03-I': (1, 2, 0, 0, '48_2_1'), + 'TMAP/UMS/ASC/BV-01-I': (1, 1, 1, 0, '48_2_1'), + 'TMAP/UMS/ASC/BV-02-I': (1, 1, 2, 0, '48_2_1'), + 'TMAP/UMS/ASC/BV-03-I': (1, 1, 3, 0, '48_2_1'), +} + + +def hdl_wid_311(params: WIDParams): + """Please configure 2 SINK ASE with Config Setting: 48_2_1.""" + + # Based on cap/wid_400 + stack = get_stack() + if params.test_case_name.endswith('LT2'): + log('hdl_wid_311 started for LT2') + addr = lt2_addr_get() + addr_type = lt2_addr_type_get() + settings_name = params.test_case_name + # This WID for LT2 is synchronized to arrived as a second one. + # All CISes in a CIG shall have different CIS_ID, so let's keep + # the increasing order to assure this. + cis_id = stack.bap.ase_configs[-1].cis_id + 1 + else: + log('hdl_wid_311 started for LT1') + addr = pts_addr_get() + addr_type = pts_addr_type_get() + settings_name = params.test_case_name + cis_id = 0x00 + + lt_count, num_sink_ases, sink_locations, num_source_ases, qos_set_name = wid_311_settings[settings_name] + log('Looking for %u LTs, num Sink ASEs %u, Sink locations %u, num Source ASEs %u, QoS Config %s' % (lt_count, num_sink_ases, sink_locations, num_source_ases, qos_set_name)) + + metadata = b'' + default_config = create_default_config() + default_config.qos_set_name = qos_set_name + + default_config.codec_set_name = '_'.join(default_config.qos_set_name.split('_')[:-1]) + default_config.addr = addr + default_config.addr_type = addr_type + default_config.metadata_ltvs = metadata + + (default_config.sampling_freq, + default_config.frame_duration, + default_config.octets_per_frame) = CODEC_CONFIG_SETTINGS[default_config.codec_set_name] + + (default_config.sdu_interval, + default_config.framing, + default_config.max_sdu_size, + default_config.retransmission_number, + default_config.max_transport_latency) = QOS_CONFIG_SETTINGS[default_config.qos_set_name] + + sinks = [] + for i in range(0, num_sink_ases): + config = Namespace(**vars(default_config)) + config.audio_dir = AudioDir.SINK + config.max_sdu_size = default_config.max_sdu_size + + if sink_locations != 0: + config.audio_locations = sink_locations + log("Use Sink Locations %u from Test Specification", config.audio_locations) + else: + config.audio_locations = get_audio_locations_from_pac( + default_config.addr_type, default_config.addr, config.audio_dir) + log("Use Sink Locations %u from PACS Sink Record", config.audio_locations) + + if not config.audio_locations: + log('hdl_wid_311 exit, no suitable sink audio locations') + return False + + sinks.append(config) + + sources = [] + for i in range(0, num_source_ases): + config = Namespace(**vars(default_config)) + config.audio_dir = AudioDir.SOURCE + config.max_sdu_size = default_config.max_sdu_size + config.audio_locations = get_audio_locations_from_pac( + default_config.addr_type, default_config.addr, config.audio_dir) + + if not config.audio_locations: + log('hdl_wid_311 exit, no suitable source audio locations') + return False + + sources.append(config) + + ase_found_ev_cache = [] + ases = sinks + sources + for config in ases: + ev = stack.bap.wait_ase_found_ev(default_config.addr_type, + default_config.addr, + config.audio_dir, 30, remove=True) + if ev is None: + log('hdl_wid_311 exit, no suitable ase found') + return False + + ase_found_ev_cache.append(ev) + + _, _, audio_dir, ase_id = ev + config.ase_id = ase_id + config.codec_ltvs = create_lc3_ltvs_bytes(config.sampling_freq, + config.frame_duration, + config.audio_locations, + config.octets_per_frame, + config.frames_per_sdu) + + bidir_cises = list(zip(sinks, sources)) + bidir_cises_num = len(bidir_cises) + + for sink_config, source_config in bidir_cises: + sink_config.cis_id = cis_id + source_config.cis_id = cis_id + cis_id += 1 + + unidir_cises = [] + for sink_config in sinks[bidir_cises_num:]: + sink_config.cis_id = cis_id + unidir_cises.append((sink_config, None)) + cis_id += 1 + + for source_config in sources[bidir_cises_num:]: + source_config.cis_id = cis_id + unidir_cises.append((None, source_config)) + cis_id += 1 + + cig_id = 0x00 + for config in ases: + config.cig_id = cig_id + btp.cap_unicast_setup_ase(config, config.addr_type, config.addr) + stack.bap.ase_configs.append(config) + + if lt_count == 1 or (lt_count == 2 and params.test_case_name.endswith('LT2')) or \ + (lt_count == 3 and params.test_case_name.endswith('LT3')): + + # Zephyr CAP API starts all streams in group at once + btp.cap_unicast_audio_start(cig_id, defs.CAP_UNICAST_AUDIO_START_SET_TYPE_AD_HOC) + ev = stack.cap.wait_unicast_start_completed_ev(cig_id, 10) + if ev is None: + return False + + # We could wait for this, but Zephyr controller has issue with the second CIS, + # so PTS does not send Streaming notification. + for config in ases: + # Wait for the ASE states to be changed to streaming + ev = stack.ascs.wait_ascs_ase_state_changed_ev(config.addr_type, + config.addr, + config.ase_id, + ASCSState.STREAMING, + 20) + if ev is None: + log('hdl_wid_311 exit, not streaming') + return False + + return True + + +def hdl_wid_364(_: WIDParams): + """ + After processed audio stream data, please click OK. + """ + + return True + + +def hdl_wid_367(_: WIDParams): + """ + Lower tester is streaming audio data. + """ + return True + + def hdl_wid_376(_: WIDParams): """ Please confirm received streaming data. @@ -106,6 +325,303 @@ def hdl_wid_384(_: WIDParams): return True +def hdl_wid_500(_: WIDParams): + """Please click ok when the tester is ready to discover Generic Telephone Bearer Service""" + return True + + +def hdl_wid_501(params: WIDParams): + """Please discover Generic Telephone Bearer Service and click ok when the tester is ready to start CAP Unicast Audio Starting procedure""" + + # get peer addr + if params.test_case_name.endswith('LT2'): + addr = lt2_addr_get() + addr_type = lt2_addr_type_get() + else: + addr = pts_addr_get() + addr_type = pts_addr_type_get() + + btp.ccp_discover_tbs(addr_type, addr) + btp.ccp.ccp_await_discovered(30000) + + return True + + +def hdl_wid_502(params: WIDParams): + """ + Please order IUT to receive an incoming call, accepts and establishes the call with + confirmation from the Upper Tester + """ + + if params.test_case_name.endswith('LT2'): + return True + + btp.tbs_remote_incoming(0, 'tel:+19991111234', 'tel:+19991111235', + 'tel:+19991110011') + + return True + + +def hdl_wid_503(params: WIDParams): + """ + Please order IUT to hang up the call + """ + + if params.test_case_name.endswith('LT2'): + return True + + btp.tbs_terminate_call(0) + + return True + + +wid_504_settings = { + # test_case_name: (lt count, iut as audio source: streams + channels, sink locations (0 - don't care), iut as audio sink streams, metadata) + 'TMAP/CG/VRC/BV-01-I': (1, 1, 1, 0, 1, 1, struct.pack(' only connect here and trigger CAP, BAP, CSIP discovery later return True @@ -156,6 +782,18 @@ def hdl_wid_20103(_: WIDParams): return True + +def hdl_wid_20106(params: WIDParams): + """ + Please write to Client Characteristic Configuration Descriptor + of ASE Control Point characteristic to enable notification. + """ + + trigger_discovery_if_needed(params) + + return True + + def hdl_wid_20107(params: WIDParams): """ Please send Read Request to read X characteristic with handle = 0xXXXX. @@ -163,15 +801,48 @@ def hdl_wid_20107(params: WIDParams): logging.debug("description=%r", params.description) + if params.test_case_name.endswith('LT2'): + addr = lt2_addr_get() + addr_type = lt2_addr_type_get() + else: + addr = pts_addr_get() + addr_type = pts_addr_type_get() + MMI.reset() MMI.parse_description(params.description) handle = MMI.args[0] - btp.gattc_read(btp.pts_addr_type_get(), btp.pts_addr_get(), handle) + btp.gattc_read(addr_type, addr, handle) btp.gattc_read_rsp() return True +def hdl_wid_20110(params: WIDParams): + """ + Please send write request to handle 0x024A with following value. + Volume Control Point: + Op Code: [4 (0x04)] Set Absolute Volume + Change Counter: + Volume Setting: [255 (0xFF)] + """ + + if params.test_case_name.endswith('LT2'): + addr = lt2_addr_get() + addr_type = lt2_addr_type_get() + else: + addr = pts_addr_get() + addr_type = pts_addr_type_get() + + # Get volume from description + volume_pattern = r"Volume Setting: \[(\d+) \(0x[0-9A-Fa-f]+\)\]" + match = re.search(volume_pattern, params.description) + if match: + volume_setting = match.group(1) + btp.vcp_set_vol(volume_setting, addr_type, addr) + + return True + + def hdl_wid_20116(_: WIDParams): """ Please take action to discover the TMAP Role characteristic from the Telephony and Media Audio. @@ -188,3 +859,12 @@ def hdl_wid_20116(_: WIDParams): return False return True + +def hdl_wid_20206(_: WIDParams): + """ + Please verify that for each supported characteristic, attribute handle/UUID pair(s) is returned to the upper tester.TMAP Role: Attribute Handle = 0x0321 + """ + + # BTstack uses GATT Read Value by Type without first searching for TMAP Service and finding TMAP Characteristics + + return True