Skip to content

Commit

Permalink
Switch to astrobee_ops plan commands for stereo survey
Browse files Browse the repository at this point in the history
  • Loading branch information
trey0 committed Feb 20, 2024
1 parent 5b4e68b commit daf4ce6
Showing 1 changed file with 43 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ def get_ops_plan_path():
return None


def get_ops_path() -> pathlib.Path:
return pathlib.Path(get_ops_plan_path()).parent.parent.resolve()


# This class starts a new process and lets you monitor the input and output
# Mostly used for actions where user inteference might be required
class ProcessExecutor:
Expand Down Expand Up @@ -600,13 +604,13 @@ def move(self, from_name: str, to_name: str) -> int:
to_name,
)
if exposure_value != 0:
cexec = CommandExecutor(cmd_exec_ns)
cexec = CommandExecutor(self.cmd_exec_ns)
exit_code = first_non_zero(exit_code, cexec.change_exposure(exposure_value))

# Change map if needed
map_name = map_change(self.config_static, from_name, to_name)
if map_name != "":
cexec = CommandExecutor(cmd_exec_ns)
cexec = CommandExecutor(self.cmd_exec_ns)
exit_code = first_non_zero(exit_code, cexec.change_map(map_name))

return exit_code
Expand Down Expand Up @@ -750,21 +754,45 @@ def survey_manager_executor(args, run, config_static, process_executor, quick: b
command_executor.plan_status_needed = True
command_executor.plan_name = fplan_path.stem

cmd = [
"rosrun",
"executive",
"plan_pub",
str(fplan_path),
"-remote",
]
cmd.extend(ns)
use_astrobee_ops = True
if use_astrobee_ops:
# Use astrobee_ops tool which provides for user interaction
ops_path = get_ops_path()
cmd_path = ops_path / "dock_scripts" / "hsc" / "cmd"
old_env = os.environ.copy()
try:
os.environ["TOPIC_PREFIX"] = cmd_exec_ns

exit_code = first_non_zero(
exit_code, process_executor.send_command_recursive(cmd)
)
cmd = [str(cmd_path), "-c", "plan", "-load", str(fplan_path)]
exit_code = first_non_zero(
exit_code, process_executor.send_command_recursive(cmd)
)

cmd = [str(cmd_path), "-c", "plan", "-run"]
exit_code = first_non_zero(
exit_code, process_executor.send_command_recursive(cmd)
)
finally:
os.environ.clear()
os.environ.update(old_env)
else:
# Use FSW tools
cmd = [
"rosrun",
"executive",
"plan_pub",
str(fplan_path),
"-remote",
]
cmd.extend(ns)

exit_code = first_non_zero(
exit_code, process_executor.send_command_recursive(cmd)
)

if exit_code == 0:
exit_code = first_non_zero(exit_code, command_executor.wait_plan())

if exit_code == 0:
exit_code = first_non_zero(exit_code, command_executor.wait_plan())
exit_code = first_non_zero(exit_code, command_executor.stop_recording())

# Ensure robot ends in preferred attitude for its bay.
Expand Down

0 comments on commit daf4ce6

Please sign in to comment.