Skip to content

Commit

Permalink
[otci] add diag radio related commands support (openthread#11112)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanglongxia authored Jan 6, 2025
1 parent 10a0a65 commit 69b91f9
Showing 1 changed file with 84 additions and 2 deletions.
86 changes: 84 additions & 2 deletions tools/otci/otci/otci.py
Original file line number Diff line number Diff line change
Expand Up @@ -2550,9 +2550,29 @@ def diag_cw_stop(self):
"""Stop transmitting continuous carrier wave."""
self.execute_command('diag cw stop')

def diag_frame(self, frame: str):
def diag_frame(self,
frame: str,
max_csma_backoffs: Optional[int] = None,
csma_ca_enabled: Optional[bool] = None,
rx_channel_after_tx_done: Optional[int] = None,
tx_delay: Optional[int] = None,
tx_power: Optional[int] = None,
max_frame_retries: Optional[int] = None,
is_security_processed: Optional[bool] = None,
is_header_updated: Optional[bool] = None):
"""Set the frame (hex encoded) to be used by `diag send` and `diag repeat`."""
self.execute_command(f'diag frame {frame}')
command = f'diag frame '
command += self.__get_optional_int_argument('-b', max_csma_backoffs)
command += self.__get_optional_int_argument('-d', tx_delay)
command += self.__get_optional_int_argument('-C', rx_channel_after_tx_done)
command += self.__get_optional_int_argument('-p', tx_power)
command += self.__get_optional_int_argument('-r', max_frame_retries)
command += self.__get_optional_bool_argument('-c', csma_ca_enabled)
command += self.__get_optional_bool_argument('-s', is_security_processed)
command += self.__get_optional_bool_argument('-u', is_header_updated)
command += f'{frame}'

self.execute_command(command)

def diag_stream_start(self):
"""Start transmitting a stream of characters."""
Expand Down Expand Up @@ -2586,10 +2606,66 @@ def diag_radio_sleep(self):
"""Enter radio sleep mode."""
self.execute_command('diag radio sleep')

def diag_radio_enable(self):
"""Enable the radio."""
self.execute_command('diag radio enable')

def diag_radio_disable(self):
"""Disable the radio."""
self.execute_command('diag radio disable')

def diag_radio_receive(self):
"""Set radio to receive mode."""
self.execute_command('diag radio receive')

def diag_radio_receive_number(self, number: int):
"""Set radio to receive mode and receive specified number of packets."""
#
# The `diag radio receive <number> [lpr]` command example:
#
# > diag radio receive 5 lpr
# 0, rssi:-49, lqi:119, len:10, psdu:000102030405060771e
# 1, rssi:-51, lqi:112, len:10, psdu:000102030405060771e
# 2, rssi:-42, lqi:120, len:10, psdu:000102030405060771e
# 3, rssi:-54, lqi:111, len:10, psdu:000102030405060771e
# 4, rssi:-56, lqi:108, len:10, psdu:000102030405060771e
# Done
#

output = self.execute_command(f'diag radio receive {number} lpr')

if len(output) != number:
raise UnexpectedCommandOutput(output)

result = []

for line in output:
data = line.split(',')

if len(data) != 5:
raise UnexpectedCommandOutput(data)

result.append({
'rssi': int(data[1].split(":")[1]),
'lqi': int(data[2].split(":")[1]),
'len': int(data[3].split(":")[1]),
'psdu': data[4].split(":")[1],
})

return result

def diag_enable_radio_receive_filter(self):
"""Enable the radio receive filter."""
self.execute_command('diag radio receive filter enable')

def diag_disable_radio_receive_filter(self):
"""Disable the radio receive filter."""
self.execute_command('diag radio receive filter disable')

def diag_set_radio_receive_filter_dest_mac_address(self, dest_mac_address: str):
"""Set the destination mac address of the radio receive filter."""
self.execute_command(f'diag radio receive filter {dest_mac_address}')

def diag_get_radio_state(self) -> str:
"""Get the state of the radio."""
return self.__parse_str(self.execute_command('diag radio state'))
Expand Down Expand Up @@ -3036,6 +3112,12 @@ def __txt_to_hex(self, txt: Dict[str, Union[str, bytes, bool]]) -> str:

return ''.join('%02x' % b for b in txt_bin)

def __get_optional_int_argument(self, arg_name: str, arg_value: Optional[int] = None):
return arg_name + f' {arg_value} ' if arg_value is not None else ''

def __get_optional_bool_argument(self, arg_name: str, arg_value: Optional[bool] = None):
return arg_name + ' ' if arg_value is not None and arg_value else ''


def connect_cli_sim(executable: str, nodeid: int, simulator: Optional[Simulator] = None) -> OTCI:
cli_handler = connectors.OtCliSim(executable, nodeid, simulator=simulator)
Expand Down

0 comments on commit 69b91f9

Please sign in to comment.