Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

twister: support segger real-time-transfer (rtt) for serial_pty via west #81837

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions scripts/pylib/twister/twisterlib/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,18 @@ def parse_arguments(
logger.error("west-flash requires device-testing to be enabled")
sys.exit(1)

if options.device_serial_pty and options.device_serial_pty == "rtt":
if options.west_flash is None:
logger.error("--device-serial-pty rtt requires --west-flash")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--device-serial-pty accept a script name, can be anything, here you assume that the script name (is this a script?) is always going to be rtt, or asking that if someone want to use this, they have to call their script rtt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, in this case rtt is a keyword, and twister simply re-uses west's rtt integration.

Normally, the argument for device-serial-pty is a script through, so that hasn't changed.

sys.exit(1)

# add the following options
cfriedt marked this conversation as resolved.
Show resolved Hide resolved
options.extra_args += ['CONFIG_USE_SEGGER_RTT=y',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a fan of injecting HW related kconfigs into twister like this, this should be done on the platform/hardware map/ or test level, not in twister. Using a snippet for example is an option.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've found actually that this works better with a snippet specified in testcase.yml.

I think this can be removed.

'CONFIG_RTT_CONSOLE=y', 'CONFIG_CONSOLE=y',
# This option is needed to ensure the uart console is not selected
# when CONFIG_RTT_CONSOLE is enabled due to #81798
'CONFIG_UART_CONSOLE=n']

if not options.testsuite_root:
# if we specify a test scenario which is part of a suite directly, do
# not set testsuite root to default, just point to the test directory
Expand Down Expand Up @@ -939,10 +951,6 @@ def parse_arguments(
logger.error("--device-flash-with-test does not apply when --flash-before is used")
sys.exit(1)

if options.flash_before and options.device_serial_pty:
logger.error("--device-serial-pty cannot be used when --flash-before is set (for now)")
sys.exit(1)

if options.shuffle_tests and options.subset is None:
logger.error("--shuffle-tests requires --subset")
sys.exit(1)
Expand Down
103 changes: 72 additions & 31 deletions scripts/pylib/twister/twisterlib/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import select
import shlex
import signal
import stat
import subprocess
import sys
import threading
Expand Down Expand Up @@ -706,6 +707,23 @@ def _get_serial_device(self, serial_pty, hardware_serial):

return serial_device, ser_pty_process

def _create_serial_pty_script(self, runner):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so you are creating the pty script on the fly, if I understand this correctly, it is an abuse of the --device-serial-pty option, we probably need something else to cover this case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"abuse" is probably a strong term.

I prefer "reuse" - we're simply reusing west's rtt integration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    device.add_argument("--device-serial-pty",
                        help="""Script for controlling pseudoterminal.
                        Twister believes that it interacts with a terminal
                        when it actually interacts with the script.

                        E.g "twister --device-testing
                        --device-serial-pty <script>

Not using a script here is wrong usage. If you want to use a built-in rtt support you are adding, you need to find another way, maybe yet another option that deals with keywords

serial_pty = self.build_dir + '/rtt.sh'

rtt_cmd = f'west -qqqqq rtt -d {self.build_dir} --skip-rebuild --rtt-quiet'
if runner:
rtt_cmd += f' -r {runner}'

with open(serial_pty, 'w') as f:
f.write(f'#!/bin/sh\n{rtt_cmd}\n')

st = os.stat(serial_pty)
os.chmod(serial_pty, st.st_mode | stat.S_IEXEC)

logger.debug(f'RTT command is "{rtt_cmd}"')

return serial_pty

def handle(self, harness):
runner = None
hardware = self.get_hardware()
Expand All @@ -717,9 +735,14 @@ def handle(self, harness):
runner = hardware.runner or self.options.west_runner
serial_pty = hardware.serial_pty

serial_device, ser_pty_process = self._get_serial_device(serial_pty, hardware.serial)
if serial_pty == 'rtt':
serial_pty = self._create_serial_pty_script(runner)
logger.debug(f'Created RTT script {serial_pty}')

logger.debug(f"Using serial device {serial_device} @ {hardware.baud} baud")
if not hardware.flash_before:
serial_device, ser_pty_process = self._get_serial_device(
serial_pty, hardware.serial)
logger.debug(f"Using serial device {serial_device} @ {hardware.baud} baud")

command = self._create_command(runner, hardware)

Expand All @@ -738,28 +761,27 @@ def handle(self, harness):
if hardware.flash_with_test:
flash_timeout += self.get_test_timeout()

serial_port = None
if hardware.flash_before is False:
serial_port = serial_device

try:
ser = self._create_serial_connection(
hardware,
serial_port,
hardware.baud,
flash_timeout,
serial_pty,
ser_pty_process
)
except serial.SerialException:
return
halt_monitor_evt = None
t = None
if not hardware.flash_before:
try:
ser = self._create_serial_connection(
hardware,
serial_device,
hardware.baud,
flash_timeout,
serial_pty,
ser_pty_process
)
except serial.SerialException:
return

halt_monitor_evt = threading.Event()
halt_monitor_evt = threading.Event()

t = threading.Thread(target=self.monitor_serial, daemon=True,
args=(ser, halt_monitor_evt, harness))
start_time = time.time()
t.start()
t = threading.Thread(target=self.monitor_serial, daemon=True,
args=(ser, halt_monitor_evt, harness))
start_time = time.time()
t.start()

d_log = f"{self.instance.build_dir}/device.log"
logger.debug(f'Flash command: {command}', )
Expand All @@ -778,7 +800,8 @@ def handle(self, harness):
flash_error = True
with open(d_log, "w") as dlog_fp:
dlog_fp.write(stderr.decode())
halt_monitor_evt.set()
if halt_monitor_evt:
halt_monitor_evt.set()
except subprocess.TimeoutExpired:
logger.warning("Flash operation timed out.")
self.terminate(proc)
Expand All @@ -791,7 +814,8 @@ def handle(self, harness):
dlog_fp.write(stderr.decode())

except subprocess.CalledProcessError:
halt_monitor_evt.set()
if halt_monitor_evt:
halt_monitor_evt.set()
self.instance.status = TwisterStatus.ERROR
self.instance.reason = "Device issue (Flash error)"
flash_error = True
Expand All @@ -802,28 +826,45 @@ def handle(self, harness):
timeout = script_param.get("post_flash_timeout", timeout)
self.run_custom_script(post_flash_script, timeout)

# Connect to device after flashing it
if hardware.flash_before:
serial_device, ser_pty_process = self._get_serial_device(
serial_pty, hardware.serial)
logger.debug(f"Using serial device {serial_device} @ {hardware.baud} baud")

try:
logger.debug(f"Attach serial device {serial_device} @ {hardware.baud} baud")
ser.port = serial_device
ser.open()
ser = self._create_serial_connection(
hardware,
serial_device,
hardware.baud,
flash_timeout,
serial_pty,
ser_pty_process
)
except serial.SerialException as e:
self._handle_serial_exception(e, hardware, serial_pty, ser_pty_process)
return

halt_monitor_evt = threading.Event()

t = threading.Thread(target=self.monitor_serial, daemon=True,
args=(ser, halt_monitor_evt, harness))
start_time = time.time()
t.start()

if not flash_error:
# Always wait at most the test timeout here after flashing.
t.join(self.get_test_timeout())
if t:
t.join(self.get_test_timeout())
else:
# When the flash error is due exceptions,
# twister tell the monitor serial thread
# to close the serial. But it is necessary
# for this thread being run first and close
# have the change to close the serial.
t.join(0.1)
if t:
t.join(0.1)

if t.is_alive():
if t and t.is_alive():
logger.debug(
f"Timed out while monitoring serial output on {self.instance.platform.name}"
)
Expand Down
2 changes: 1 addition & 1 deletion scripts/pylib/twister/twisterlib/hardwaremap.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def discover(self):
True,
flash_timeout=self.options.device_flash_timeout,
flash_with_test=self.options.device_flash_with_test,
flash_before=False,
flash_before=self.options.flash_before
)

# the fixtures given by twister command explicitly should be assigned to each DUT
Expand Down
16 changes: 15 additions & 1 deletion scripts/west_commands/runners/jlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def __init__(self, cfg, device, dev_id=None,
gdb_host='',
gdb_port=DEFAULT_JLINK_GDB_PORT,
rtt_port=DEFAULT_JLINK_RTT_PORT,
rtt_quiet=False,
tui=False, tool_opt=None):
super().__init__(cfg)
self.file = cfg.file
Expand All @@ -112,6 +113,7 @@ def __init__(self, cfg, device, dev_id=None,
self.tui_arg = ['-tui'] if tui else []
self.loader = loader
self.rtt_port = rtt_port
self.rtt_quiet = rtt_quiet

self.tool_opt = []
if tool_opt is not None:
Expand Down Expand Up @@ -174,6 +176,8 @@ def do_add_parser(cls, parser):
help='RTT client, default is JLinkRTTClient')
parser.add_argument('--rtt-port', default=DEFAULT_JLINK_RTT_PORT,
help=f'jlink rtt port, defaults to {DEFAULT_JLINK_RTT_PORT}')
parser.add_argument('--rtt-quiet', action='store_true',
help='only output rtt to stdout, not that of subprocesses')

parser.set_defaults(reset=False)

Expand All @@ -192,9 +196,13 @@ def do_create(cls, cfg, args):
gdb_host=args.gdb_host,
gdb_port=args.gdb_port,
rtt_port=args.rtt_port,
rtt_quiet=args.rtt_quiet,
tui=args.tui, tool_opt=args.tool_opt)

def print_gdbserver_message(self):
if self.rtt_quiet:
return

if not self.thread_info_enabled:
thread_msg = '; no thread info available'
elif self.supports_thread_info:
Expand All @@ -205,6 +213,9 @@ def print_gdbserver_message(self):
f'{self.gdb_port}{thread_msg}')

def print_rttserver_message(self):
if self.rtt_quiet:
return

self.logger.info(f'J-Link RTT server running on port {self.rtt_port}')

@property
Expand Down Expand Up @@ -317,10 +328,13 @@ def do_run(self, command, **kwargs):
self.print_gdbserver_message()
self.check_call(server_cmd)
elif command == 'rtt':
rtt_quiet_kwargs = {'stdout': subprocess.DEVNULL,
'stderr': subprocess.DEVNULL} if self.rtt_quiet else {}

self.print_gdbserver_message()
self.print_rttserver_message()
server_cmd += ['-nohalt']
server_proc = self.popen_ignore_int(server_cmd)
server_proc = self.popen_ignore_int(server_cmd, **rtt_quiet_kwargs)
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# wait for the port to be open
Expand Down
22 changes: 18 additions & 4 deletions scripts/west_commands/runners/openocd.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def __init__(self, cfg, pre_init=None, reset_halt_cmd=DEFAULT_OPENOCD_RESET_HALT
gdb_client_port=DEFAULT_OPENOCD_GDB_PORT,
gdb_init=None, no_load=False,
target_handle=DEFAULT_OPENOCD_TARGET_HANDLE,
rtt_port=DEFAULT_OPENOCD_RTT_PORT):
rtt_port=DEFAULT_OPENOCD_RTT_PORT,
rtt_quiet=False):
super().__init__(cfg)

if not path.exists(cfg.board_dir):
Expand Down Expand Up @@ -113,6 +114,7 @@ def __init__(self, cfg, pre_init=None, reset_halt_cmd=DEFAULT_OPENOCD_RESET_HALT
self.load_arg = [] if no_load else ['-ex', 'load']
self.target_handle = target_handle
self.rtt_port = rtt_port
self.rtt_quiet = rtt_quiet

@classmethod
def name(cls):
Expand Down Expand Up @@ -184,6 +186,8 @@ def do_add_parser(cls, parser):
''')
parser.add_argument('--rtt-port', default=DEFAULT_OPENOCD_RTT_PORT,
help='openocd rtt port, defaults to 5555')
parser.add_argument('--rtt-quiet', action='store_true',
help='only output rtt to stdout, not that of subprocesses')


@classmethod
Expand All @@ -200,9 +204,12 @@ def do_create(cls, cfg, args):
telnet_port=args.telnet_port, gdb_port=args.gdb_port,
gdb_client_port=args.gdb_client_port, gdb_init=args.gdb_init,
no_load=args.no_load, target_handle=args.target_handle,
rtt_port=args.rtt_port)
rtt_port=args.rtt_port, rtt_quiet=args.rtt_quiet)

def print_gdbserver_message(self):
if self.rtt_quiet:
return

if not self.thread_info_enabled:
thread_msg = '; no thread info available'
elif self.supports_thread_info():
Expand All @@ -213,6 +220,9 @@ def print_gdbserver_message(self):
f'{self.gdb_port}{thread_msg}')

def print_rttserver_message(self):
if self.rtt_quiet:
return

self.logger.info(f'OpenOCD RTT server running on port {self.rtt_port}')

def read_version(self):
Expand Down Expand Up @@ -403,11 +413,15 @@ def do_attach_debug_rtt(self, command, **kwargs):
server_proc.terminate()
server_proc.wait()
elif command == 'rtt':
rtt_quiet_kwargs = {'stdout': subprocess.DEVNULL,
'stderr': subprocess.DEVNULL} if self.rtt_quiet else {}

self.print_rttserver_message()
server_proc = self.popen_ignore_int(server_cmd)

server_proc = self.popen_ignore_int(server_cmd, **rtt_quiet_kwargs)
try:
# run the binary with gdb, set up the rtt server (runs to completion)
subprocess.run(gdb_cmd)
subprocess.run(gdb_cmd, **rtt_quiet_kwargs)
# run the rtt client in the foreground
self.run_telnet_client('localhost', self.rtt_port)
finally:
Expand Down
Loading