Skip to content

Commit

Permalink
wid/hap.py: use CAP Unicast client to configure Streaming state
Browse files Browse the repository at this point in the history
  • Loading branch information
mringwal committed Sep 17, 2024
1 parent 0c23c12 commit cdbd80c
Showing 1 changed file with 66 additions and 28 deletions.
94 changes: 66 additions & 28 deletions autopts/wid/hap.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,34 @@
#

import logging
from argparse import Namespace

from autopts.pybtp import btp
from autopts.ptsprojects.stack import get_stack
from autopts.ptsprojects.testcase import MMI
from autopts.pybtp.defs import AUDIO_METADATA_STREAMING_AUDIO_CONTEXTS
from autopts.pybtp.types import *
from autopts.pybtp.btp.btp import pts_addr_get, pts_addr_type_get
from autopts.wid import generic_wid_hdl

log = logging.debug

def create_default_config():
return Namespace(addr=pts_addr_get(),
addr_type=pts_addr_type_get(),
vid=0x0000,
cid=0x0000,
coding_format=0x06,
frames_per_sdu=0x01,
audio_locations=0x01,
cig_id=0x00,
cis_id=0x00,
presentation_delay=40000,
qos_config=None,
codec_set_name=None,
codec_ltvs=None,
metadata_ltvs=None,
mono=None)

def hap_wid_hdl(wid, description, test_case_name):
log(f'{hap_wid_hdl.__name__}, {wid}, {description}, {test_case_name}')
Expand Down Expand Up @@ -86,13 +104,30 @@ def hdl_wid_482(_: WIDParams):
addr_type = pts_addr_type_get()
stack = get_stack()

# TODO: Rewrite this test to use CAP API instead

audio_dir = AudioDir.SINK
coding_format = 0x06
audio_location_list = [defs.PACS_AUDIO_LOCATION_FRONT_LEFT, defs.PACS_AUDIO_LOCATION_FRONT_RIGHT]
ase_ids = []

default_config = create_default_config()
default_config.codec_set_name = '24_1'
default_config.qos_set_name = '24_1_1'
default_config.metadata_ltvs = struct.pack('<BBH', 3, AUDIO_METADATA_STREAMING_AUDIO_CONTEXTS, 0x0200)

(default_config.sampling_freq,
default_config.frame_duration,
default_config.octets_per_frame) = CODEC_CONFIG_SETTINGS[default_config.codec_set_name]

(default_config.sdu_interval,
default_config.framing,
default_config.max_sdu_size,
default_config.retransmission_number,
default_config.max_transport_latency) = QOS_CONFIG_SETTINGS[default_config.qos_set_name]

ases= []
cig_id = 0x00
cis_id = 0x00

for audio_location in audio_location_list:
# Find ID of the ASE
ev = stack.bap.wait_ase_found_ev(addr_type, addr, audio_dir, 30, remove=True)
Expand All @@ -101,42 +136,45 @@ def hdl_wid_482(_: WIDParams):

_, _, _, ase_id = ev

logging.debug(f"About to configure ASE_ID: {ase_id} Location: {audio_location}")
logging.debug(f"Using ASE_ID: {ase_id} for Location: {audio_location}")

# 24_1
sampling_freq = SAMPLING_FREQ_STR_TO_CODE['24']
frame_duration = FRAME_DURATION_STR_TO_CODE['7.5']
octets_per_frame = 45
config = Namespace(**vars(default_config))
config.audio_dir = audio_dir
audio_locations = audio_location
frames_per_sdu = 0x01

# Perform Codec Config operation
codec_ltvs_bytes = create_lc3_ltvs_bytes(sampling_freq, frame_duration,
audio_locations, octets_per_frame,
frames_per_sdu)
btp.ascs_config_codec(ase_id, coding_format, 0x0000, 0x0000, codec_ltvs_bytes)
stack.ascs.wait_ascs_operation_complete_ev(addr_type, addr, ase_id, 30)
config.ase_id = ase_id
config.cig_id = cig_id
config.cis_id = cis_id
config.codec_ltvs = create_lc3_ltvs_bytes(config.sampling_freq, config.frame_duration,
audio_locations, config.octets_per_frame,
config.frames_per_sdu)

ase_ids.append(ase_id)
btp.cap_unicast_setup_ase(config, config.addr_type, config.addr)
stack.bap.ase_configs.append(config)

cig_id = 0x00
cis_id = 0x00
for ase_id in ase_ids:
btp.ascs_add_ase_to_cis(ase_id, cis_id, cig_id)
cis_id += 1
ases.append(config)

# Perform Config QOS operation
presentation_delay = 40000
qos_config = QOS_CONFIG_SETTINGS['24_1_1']
btp.ascs_config_qos(ase_ids[0], cig_id, 0x00, *qos_config, presentation_delay)
cis_id += 1

for ase_id in ase_ids:
stack.ascs.wait_ascs_operation_complete_ev(addr_type, addr, ase_id, 30)
btp.cap_unicast_audio_start(cig_id, defs.CAP_UNICAST_AUDIO_START_SET_TYPE_AD_HOC)
ev = stack.cap.wait_unicast_start_completed_ev(cig_id, 10)
if ev is None:
return False

for ase_id in ase_ids:
# Enable streams
btp.ascs_enable(ase_id)
stack.ascs.wait_ascs_operation_complete_ev(addr_type, addr, ase_id, 30)
# We could wait for this, but Zephyr controller has issue with the second CIS,
# so PTS does not send Streaming notification.
for config in ases:
# Wait for the ASE states to be changed to streaming
ev = stack.ascs.wait_ascs_ase_state_changed_ev(config.addr_type,
config.addr,
config.ase_id,
ASCSState.STREAMING,
20)
if ev is None:
log('hdl_wid_482 exit, not streaming')
return False

return True

Expand Down

0 comments on commit cdbd80c

Please sign in to comment.