Skip to content

Commit

Permalink
Add GMAP
Browse files Browse the repository at this point in the history
  • Loading branch information
mringwal committed Oct 21, 2024
1 parent 1e6a737 commit 9e65ec4
Show file tree
Hide file tree
Showing 11 changed files with 1,306 additions and 0 deletions.
1 change: 1 addition & 0 deletions autopts/ptsprojects/stack/layers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@
from .gtbs import *
from .tmap import *
from .ots import *
from .gmap import *
# GENERATOR append 1
34 changes: 34 additions & 0 deletions autopts/ptsprojects/stack/layers/gmap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# auto-pts - The Bluetooth PTS Automation Framework
#
# Copyright (c) 2024, BlueKitchen GmbH.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms and conditions of the GNU General Public License,
# version 2, as published by the Free Software Foundation.
#
# This program is distributed in the hope it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#

from autopts.ptsprojects.stack.common import wait_for_queue_event
from autopts.pybtp import defs


class GMAP:
def __init__(self):
self.event_queues = {
defs.GMAP_EV_DISCOVERY_COMPLETED: [],
}

def event_received(self, event_type, event_data):
self.event_queues[event_type].append(event_data)

def wait_discovery_completed_ev(self, addr_type, addr, timeout, remove=True):
return wait_for_queue_event(
self.event_queues[defs.TMAP_EV_DISCOVERY_COMPLETED],
lambda _addr_type, _addr, *_:
(addr_type, addr) == (_addr_type, _addr),
timeout, remove)
8 changes: 8 additions & 0 deletions autopts/ptsprojects/stack/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"TBS": 1 << defs.BTP_SERVICE_ID_TBS,
"TMAP": 1 << defs.BTP_SERVICE_ID_TMAP,
"OTS": 1 << defs.BTP_SERVICE_ID_OTS,
"GMAP": 1 << defs.BTP_SERVICE_ID_GMAP,
# GENERATOR append 1
}

Expand Down Expand Up @@ -87,6 +88,7 @@ def __init__(self):
self.gtbs = None
self.tmap = None
self.ots = None
self.gmap = None
# GENERATOR append 2

def is_svc_supported(self, svc):
Expand Down Expand Up @@ -186,6 +188,9 @@ def tmap_init(self):
def ots_init(self):
self.ots = OTS()

def gmap_init(self):
self.gmap = GMAP()

# GENERATOR append 3

def cleanup(self):
Expand Down Expand Up @@ -267,6 +272,9 @@ def cleanup(self):
if self.ots:
self.ots_init()

if self.gmap:
self.gmap_init()

# GENERATOR append 4


Expand Down
1 change: 1 addition & 0 deletions autopts/pybtp/btp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@
from autopts.pybtp.btp.tbs import *
from autopts.pybtp.btp.tmap import *
from autopts.pybtp.btp.ots import *
from autopts.pybtp.btp.gmap import *
# GENERATOR append 1
11 changes: 11 additions & 0 deletions autopts/pybtp/btp/btp.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ def read_supp_svcs():
defs.BTP_INDEX_NONE, defs.BTP_SERVICE_ID_TMAP),
"ots_reg": (defs.BTP_SERVICE_ID_CORE, defs.CORE_REGISTER_SERVICE,
defs.BTP_INDEX_NONE, defs.BTP_SERVICE_ID_OTS),
"gmap_reg": (defs.BTP_SERVICE_ID_CORE, defs.CORE_REGISTER_SERVICE,
defs.BTP_INDEX_NONE, defs.BTP_SERVICE_ID_GMAP),
# GENERATOR append 4
"read_supp_cmds": (defs.BTP_SERVICE_ID_CORE,
defs.CORE_READ_SUPPORTED_COMMANDS,
Expand Down Expand Up @@ -704,6 +706,13 @@ def core_reg_svc_ots():
iutctl.btp_socket.send_wait_rsp(*CORE['ots_reg'])


def core_reg_svc_gmap():
logging.debug("%s", core_reg_svc_gmap.__name__)

iutctl = get_iut()
iutctl.btp_socket.send_wait_rsp(*CORE['gmap_reg'])


# GENERATOR append 1

def core_reg_svc_rsp_succ():
Expand Down Expand Up @@ -805,6 +814,7 @@ def init(get_iut_method):
from .tbs import TBS_EV
from .tmap import TMAP_EV
from .ots import OTS_EV
from .gmap import GMAP_EV
# GENERATOR append 2

from autopts.pybtp.iutctl_common import set_event_handler
Expand Down Expand Up @@ -844,6 +854,7 @@ def event_handler(hdr, data):
defs.BTP_SERVICE_ID_TBS: (TBS_EV, stack.tbs),
defs.BTP_SERVICE_ID_TMAP: (TMAP_EV, stack.tmap),
defs.BTP_SERVICE_ID_OTS: (OTS_EV, stack.ots),
defs.BTP_SERVICE_ID_GMAP: (GMAP_EV, stack.gmap),
# GENERATOR append 3
}

Expand Down
95 changes: 95 additions & 0 deletions autopts/pybtp/btp/gmap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#
# auto-pts - The Bluetooth PTS Automation Framework
#
# Copyright (c) 2024, BlueKitchen GmbH.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms and conditions of the GNU General Public License,
# version 2, as published by the Free Software Foundation.
#
# This program is distributed in the hope it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#

import binascii
import logging
import struct

from autopts.pybtp import defs
from autopts.pybtp.btp.btp import CONTROLLER_INDEX, get_iut_method as get_iut, \
btp_hdr_check, pts_addr_get, pts_addr_type_get
from autopts.pybtp.types import BTPError, addr2btp_ba

log = logging.debug


def address_to_ba(bd_addr_type=None, bd_addr=None):
data = bytearray()
bd_addr_ba = addr2btp_ba(pts_addr_get(bd_addr))
bd_addr_type_ba = chr(pts_addr_type_get(bd_addr_type)).encode('utf-8')
data.extend(bd_addr_type_ba)
data.extend(bd_addr_ba)
return data


GMAP = {
'read_supported_cmds': (defs.BTP_SERVICE_ID_GMAP,
defs.GMAP_READ_SUPPORTED_COMMANDS,
CONTROLLER_INDEX),
'discover': (defs.BTP_SERVICE_ID_GMAP,
defs.GMAP_DISCOVER,
CONTROLLER_INDEX),
}


def gmap_command_rsp_succ(timeout=20.0):
logging.debug("%s", gmap_command_rsp_succ.__name__)

iutctl = get_iut()

tuple_hdr, tuple_data = iutctl.btp_socket.read(timeout)
logging.debug("received %r %r", tuple_hdr, tuple_data)

btp_hdr_check(tuple_hdr, defs.BTP_SERVICE_ID_GMAP)

return tuple_data


def gmap_discover(bd_addr_type=None, bd_addr=None):
logging.debug(f"{gmap_discover.__name__}")

data = address_to_ba(bd_addr_type, bd_addr)

iutctl = get_iut()
iutctl.btp_socket.send(*GMAP['discover'], data=data)

gmap_command_rsp_succ()

def gmap_ev_discovery_completed(tmap, data, data_len):
logging.debug('%s %r', gmap_ev_discovery_completed.__name__, data)

fmt = '<B6sBBBBBB'
if len(data) < struct.calcsize(fmt):
raise BTPError('Invalid data length')

addr_type, addr, status, gmap_role, ugg_features, ugt_features, bgs_features, bgr_features = struct.unpack_from(fmt, data)

addr = binascii.hexlify(addr[::-1]).lower().decode('utf-8')

logging.debug(f'GMAP Discovery completed: addr {addr} addr_type '
f'{addr_type} status {status}'
f'role {gmap_role}'
f'ugg_features {ugg_features}'
f'ugt_features {ugt_features}'
f'bgs_features {bgs_features}'
f'bgr_features {bgr_features}'
)

tmap.event_received(defs.TMAP_EV_DISCOVERY_COMPLETED, (addr_type, addr, status, gmap_role, ugg_features, ugt_features, bgs_features, bgr_features))


GMAP_EV = {
defs.GMAP_EV_DISCOVERY_COMPLETED: gmap_ev_discovery_completed,
}
5 changes: 5 additions & 0 deletions autopts/pybtp/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def BIT(bit):
BTP_SERVICE_ID_TBS = 27
BTP_SERVICE_ID_TMAP = 28
BTP_SERVICE_ID_OTS = 29
BTP_SERVICE_ID_GMAP = 31
# GENERATOR append 1

BTP_STATUS_SUCCESS = 0x00
Expand Down Expand Up @@ -884,4 +885,8 @@ def BIT(bit):
OTS_READ_SUPPORTED_COMMANDS = 0x01
OTS_REGISTER_OBJECT = 0x02

GMAP_READ_SUPPORTED_COMMANDS = 0x01
GMAP_DISCOVER = 0x02
GMAP_EV_DISCOVERY_COMPLETED = 0x80

# GENERATOR append 2
1 change: 1 addition & 0 deletions autopts/wid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@
from .gtbs import gtbs_wid_hdl
from .tmap import tmap_wid_hdl
from .ots import ots_wid_hdl
from .gmap import gmap_wid_hdl
# GENERATOR append 1
Loading

0 comments on commit 9e65ec4

Please sign in to comment.