diff --git a/arm_robots/launch/med.launch b/arm_robots/launch/med.launch
new file mode 100644
index 0000000..fdebabc
--- /dev/null
+++ b/arm_robots/launch/med.launch
@@ -0,0 +1,58 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/arm_robots/launch/trajectory_follower.launch b/arm_robots/launch/trajectory_follower.launch
index dc91017..4958f05 100644
--- a/arm_robots/launch/trajectory_follower.launch
+++ b/arm_robots/launch/trajectory_follower.launch
@@ -3,7 +3,18 @@
+
+
+
+
+
+
+
+
+
diff --git a/arm_robots/scripts/med_motion.py b/arm_robots/scripts/med_motion.py
old mode 100644
new mode 100755
index 6abe525..2f7c2af
--- a/arm_robots/scripts/med_motion.py
+++ b/arm_robots/scripts/med_motion.py
@@ -2,70 +2,50 @@
import rospy
from arm_robots.med import Med
import numpy as np
-import pdb
-import moveit_commander
-from arc_utilities.conversions import convert_to_pose_msg
+from victor_hardware_interface_msgs.msg import ControlMode
+
def main():
rospy.init_node('med_motion')
- med = Med(manual_execute=True)
- med.connect()
- # med.set_grasping_force(40.0)
- # med.open_gripper()
+ # Are we in sim? Currently WSG50 gripper not supported in Gazebo.
+ sim = rospy.get_param('~sim', default=True)
- med.plan_to_joint_config(med.arm_group, [0,1.0,0,1.0,0,1.0,0])
- med.plan_to_joint_config(med.arm_group, [0,0,0,0,0,0,0])
+ med = Med(display_goals=False)
+ med.connect()
+ med.set_control_mode(ControlMode.JOINT_POSITION, vel=0.1)
- exit()
+ if not sim:
+ med.set_grasping_force(40.0)
+ med.open_gripper()
- # Add table plane.
- # scene = moveit_commander.PlanningSceneInterface(ns="victor")
- # scene.add_plane('table_plane', convert_to_pose_msg([0,0,0,0,0,0]))
+ # Example: Planning to joint configuration.
+ med.plan_to_joint_config(med.arm_group, [0, 1.0, 0, -1.0, 0, 1.0, 0])
- # Pick and place "demo"
- med.plan_to_joint_config(med.arm_group, [0,0,0,0,0,0,0])
+ # Example: Planning to pose.
+ med.plan_to_pose(med.arm_group, med.wrist, [0.6, 0.12, 0.45, 0.0, np.pi, 0.0], frame_id='med_base')
- while True:
- _, result, _ = med.plan_to_pose(med.arm_group, med.wrist, [0.6, 0.12, 0.45, 0.0, np.pi, 0.0], frame_id='med_base')
- if result.error_code < 0:
- replan = input('Replan? [y/n]')
- if replan != 'y':
- exit()
- else:
- break
+ # Example: Cartesian planning.
+ med.plan_to_position_cartesian(med.arm_group, med.wrist, target_position=[0.6, 0.12, 0.35])
- med.plan_to_joint_config(med.arm_group, [0,0,0,0,0,0,0])
- exit()
+ # Example: Grasp object of approx. width 15mm
+ if not sim:
+ med.grasp(width=15.0)
- med.plan_to_position_cartesian(med.arm_group, med.wrist, target_position=[0.6, -0.12, 0.35])
- med.grasp(15.0)
- med.plan_to_position_cartesian(med.arm_group, med.wrist, target_position=[0.6, -0.12, 0.45])
med.plan_to_position_cartesian(med.arm_group, med.wrist, target_position=[0.6, 0.12, 0.45])
- med.plan_to_position_cartesian(med.arm_group, med.wrist, target_position=[0.6, 0.12, 0.365])
- med.release()
- med.plan_to_position_cartesian(med.arm_group, med.wrist, target_position=[0.6, 0.12, 0.45])
- med.plan_to_joint_config(med.arm_group, [0,0,0,0,0,0,0])
-
- # med.grasp(width=10.0)
- # Plan to joint config.
- # med.plan_to_joint_config(med.arm_group, [0,1.0,0,1.0,0,1.0,0])
- # med.plan_to_joint_config(med.arm_group, [0,0,0,0,0,0,0])
+ med.plan_to_position_cartesian(med.arm_group, med.wrist, target_position=[0.6, -0.12, 0.45])
+ med.plan_to_position_cartesian(med.arm_group, med.wrist, target_position=[0.6, -0.12, 0.365])
- # Plan to position (x,y,z) relative to robot base.
- # med.plan_to_position(med.arm_group, med.wrist, [0.6, 0.0, 0.6])
- # med.plan_to_joint_config(med.arm_group, [0,0,0,0,0,0,0])
+ # Example: Release grasp.
+ if not sim:
+ med.release()
- # # Plan to pose relative to robot base.
+ # Example: Getting the robot state.
+ print("Current pose at the wrist:")
+ print(med.get_link_pose(med.wrist))
- # med.release()
- # med.plan_to_joint_config(med.arm_group, [0,0,0,0,0,0,0])
+ med.plan_to_joint_config(med.arm_group, [0, 0, 0, 0, 0, 0, 0])
- # # Get and print current pose!
- # print(med.get_link_pose(med.arm_group, med.wrist))
- # med.plan_to_position_cartesian(med.arm_group, med.wrist, target_position=[0.6, 0.1, 0.35])
- # med.plan_to_position_cartesian(med.arm_group, med.wrist, target_position=[0.6, -0.1, 0.35])
- # med.plan_to_joint_config(med.arm_group, [0,0,0,0,0,0,0])
if __name__ == "__main__":
main()
diff --git a/arm_robots/src/arm_robots/base_robot.py b/arm_robots/src/arm_robots/base_robot.py
index d5a0769..c837d3e 100644
--- a/arm_robots/src/arm_robots/base_robot.py
+++ b/arm_robots/src/arm_robots/base_robot.py
@@ -10,7 +10,8 @@
from trajectory_msgs.msg import JointTrajectoryPoint
-class DualArmRobot:
+class BaseRobot:
+
def __init__(self, robot_namespace: str = ''):
"""
This class is designed around the needs of the trajectory_follower.TrajectoryFollower
@@ -24,10 +25,6 @@ def __init__(self, robot_namespace: str = ''):
joint_states_topic = ns_join(self.robot_namespace, 'joint_states')
self._joint_state_listener = Listener(joint_states_topic, JointState)
- # NOTE: derived classes must set these values
- self.right_gripper_command_pub = None
- self.left_gripper_command_pub = None
-
self.tf_wrapper = TF2Wrapper()
try:
@@ -78,29 +75,3 @@ def check_inputs(self, group_name: str, ee_link_name: str):
if group_name not in groups:
rospy.logwarn_throttle(1, f"Group [{group_name}] does not exist. Existing groups are:")
rospy.logwarn_throttle(1, groups)
-
- def get_right_gripper_links(self):
- return self.robot_commander.get_link_names("right_gripper")
-
- def get_left_gripper_links(self):
- return self.robot_commander.get_link_names("left_gripper")
-
- def open_left_gripper(self):
- self.left_gripper_command_pub.publish(self.get_open_gripper_msg())
-
- def open_right_gripper(self):
- self.right_gripper_command_pub.publish(self.get_open_gripper_msg())
-
- def close_left_gripper(self):
- # TODO: implementing blocking grasping
- self.left_gripper_command_pub.publish(self.get_close_gripper_msg())
-
- def close_right_gripper(self):
- self.right_gripper_command_pub.publish(self.get_close_gripper_msg())
-
- def get_close_gripper_msg(self):
- # FIXME: this is a bad abstraction, not all grippers work like this
- raise NotImplementedError()
-
- def get_open_gripper_msg(self):
- raise NotImplementedError()
diff --git a/arm_robots/src/arm_robots/config/med_config.py b/arm_robots/src/arm_robots/config/med_config.py
new file mode 100644
index 0000000..a1d2a21
--- /dev/null
+++ b/arm_robots/src/arm_robots/config/med_config.py
@@ -0,0 +1,19 @@
+import numpy as np
+
+# Joint Names
+
+ARM_JOINT_NAMES = (
+ "med_kuka_joint_1",
+ "med_kuka_joint_2",
+ "med_kuka_joint_3",
+ "med_kuka_joint_4",
+ "med_kuka_joint_5",
+ "med_kuka_joint_6",
+ "med_kuka_joint_7",
+)
+
+# From Med manual 4.3.2 and copied to kuka_iiwa_interface joint_limits.yaml
+KUKA_MED_MAX_JOINT_VELOCITIES_DEG_S = (85.0, 85.0, 100.0, 75.0, 130.0, 135.0, 135.0)
+KUKA_MED_MAX_JOINT_VELOCITIES = list(s * np.pi / 180 for s in KUKA_MED_MAX_JOINT_VELOCITIES_DEG_S)
+KUKA_MED_MAX_JOINT_ACCEL_DEG_S2 = (50.0, 50.0, 100.0, 100.0, 1000.0, 650.0, 1500.0)
+KUKA_MED_MAX_JOINT_ACCEL = list(s * np.pi / 180 for s in KUKA_MED_MAX_JOINT_ACCEL_DEG_S2)
diff --git a/arm_robots/src/arm_robots/get_robot.py b/arm_robots/src/arm_robots/get_robot.py
index be9c98f..bfa0b1e 100644
--- a/arm_robots/src/arm_robots/get_robot.py
+++ b/arm_robots/src/arm_robots/get_robot.py
@@ -4,17 +4,18 @@
from arm_robots.hdt_michigan import Val, BaseVal
from arm_robots.robot import MoveitEnabledRobot
from arm_robots.victor import BaseVictor, Victor
+from arm_robots.med import BaseMed, Med
def get_moveit_robot(robot_name: Optional[str] = None, **kwargs):
- return get_robot(Val, Victor, robot_name, **kwargs)
+ return get_robot(Val, Victor, Med, robot_name, **kwargs)
def get_base_robot(robot_name: Optional[str] = None, **kwargs):
- return get_robot(BaseVal, BaseVictor, robot_name, **kwargs)
+ return get_robot(BaseVal, BaseVictor, BaseMed, robot_name, **kwargs)
-def get_robot(val_type: Type, victor_type: Type, robot_name: Optional[str] = None, **kwargs) -> MoveitEnabledRobot:
+def get_robot(val_type: Type, victor_type: Type, med_type: Type, robot_name: Optional[str] = None, **kwargs) -> MoveitEnabledRobot:
"""
Get the right robot. It considers first the robot_name argument,
then checks the ros parameter server,
@@ -41,5 +42,7 @@ def get_robot(val_type: Type, victor_type: Type, robot_name: Optional[str] = Non
return victor_type(robot_name, **kwargs)
elif robot_name in ['val', 'hdt_michigan']:
return val_type('hdt_michigan', **kwargs)
+ elif robot_name == 'med':
+ return med_type(robot_name, **kwargs)
else:
raise NotImplementedError(f"robot with name {robot_name} not implemented")
diff --git a/arm_robots/src/arm_robots/hdt_michigan.py b/arm_robots/src/arm_robots/hdt_michigan.py
index 254596e..dfaeec2 100644
--- a/arm_robots/src/arm_robots/hdt_michigan.py
+++ b/arm_robots/src/arm_robots/hdt_michigan.py
@@ -9,16 +9,16 @@
from colorama import Fore
import rospy
-from arm_robots.base_robot import DualArmRobot
+from arm_robots.base_robot import BaseRobot
from arm_robots.robot import MoveitEnabledRobot
from sensor_msgs.msg import JointState
from trajectory_msgs.msg import JointTrajectoryPoint
-class BaseVal(DualArmRobot):
+class BaseVal(BaseRobot):
def __init__(self, robot_namespace: str):
- DualArmRobot.__init__(self, robot_namespace=robot_namespace)
+ BaseRobot.__init__(self, robot_namespace=robot_namespace)
self.first_valid_command = False
self.should_disconnect = False
self.min_velocity = 0.05
@@ -107,6 +107,11 @@ def send_joint_command(self, joint_names: List[str], trajectory_point: JointTraj
error_msg = ""
return failed, error_msg
+ def get_right_gripper_links(self):
+ return self.robot_commander.get_link_names("right_gripper")
+
+ def get_left_gripper_links(self):
+ return self.robot_commander.get_link_names("left_gripper")
left_arm_joints = [
'joint_1',
@@ -177,12 +182,19 @@ def close_left_gripper(self):
def close_right_gripper(self):
return self.set_right_gripper(self.gripper_closed_position)
+ def get_both_arm_joints(self):
+ return self.get_left_arm_joints() + self.get_right_arm_joints()
+
def get_right_arm_joints(self):
return right_arm_joints
def get_left_arm_joints(self):
return left_arm_joints
+ def get_gripper_positions(self):
+ # NOTE: this function requires that gazebo be playing
+ return self.get_link_pose(self.left_tool_name).position, self.get_link_pose(self.right_tool_name).position
+
def connect(self, **kwargs):
super().connect(**kwargs)
rospy.loginfo(Fore.GREEN + "Val ready!")
@@ -196,3 +208,9 @@ def is_gripper_closed(self, gripper: str):
raise NotImplementedError(f"invalid gripper {gripper}")
current_joint_values = move_group.get_current_joint_values()
return np.allclose(current_joint_values, self.gripper_closed_position, atol=0.01)
+
+ def is_left_gripper_closed(self):
+ return self.is_gripper_closed('left')
+
+ def is_right_gripper_closed(self):
+ return self.is_gripper_closed('right')
diff --git a/arm_robots/src/arm_robots/med.py b/arm_robots/src/arm_robots/med.py
new file mode 100644
index 0000000..f7d2a5b
--- /dev/null
+++ b/arm_robots/src/arm_robots/med.py
@@ -0,0 +1,220 @@
+#! /usr/bin/env python
+import collections
+from typing import List, Dict, Tuple, Sequence
+
+import numpy as np
+from colorama import Fore
+from scipy import signal
+
+import moveit_commander
+import rospy
+from arc_utilities.conversions import convert_to_pose_msg, normalize_quaternion
+from arc_utilities.listener import Listener
+from arm_robots.base_robot import BaseRobot
+from arm_robots.config.med_config import ARM_JOINT_NAMES
+from arm_robots.robot import MoveitEnabledRobot
+from moveit_msgs.msg import DisplayRobotState
+from trajectory_msgs.msg import JointTrajectoryPoint
+from victor_hardware_interface.victor_utils import get_control_mode_params, list_to_jvq, jvq_to_list
+from victor_hardware_interface_msgs.msg import ControlMode, MotionStatus, MotionCommand
+from victor_hardware_interface_msgs.srv import SetControlMode, GetControlMode, GetControlModeRequest, \
+ GetControlModeResponse, SetControlModeResponse
+from wsg_50_common.srv import Move, Conf
+
+
+# TODO: Since we have only one set of arms, this really just makes sure everythings in the right order. Could probably simplify but I'll keep it for now.
+def delegate_to_arms(positions: List, joint_names: Sequence[str]) -> Tuple[Dict[str, List], bool, str]:
+ """
+ Given a list (e.g. of positions) and a corresponding list of joint names,
+ assign and order by victor's joint groups.
+
+ Args:
+ positions: values to delegate
+ joint_names: list of joint_names
+
+ Returns:
+ object: (map from joint_names to values, abort?, abort_msg)
+ """
+ assert len(positions) == len(joint_names), "positions and joint_names must be same length"
+
+ # TODO: Why can't joint_names be a combination of arm and gripper joints?
+ ok = set(joint_names) in [set(ARM_JOINT_NAMES)]
+
+ if not ok:
+ blank_positions = {n: None for n in ['arm']}
+ return blank_positions, True, f"Invalid joint_names {joint_names}"
+
+ joint_position_of = dict(zip(joint_names, positions))
+
+ def fill_using(joint_ordering: Sequence[str]):
+ if not all(j in joint_position_of for j in joint_ordering):
+ return None
+ return [joint_position_of[name] for name in joint_ordering]
+
+ positions_by_interface = {
+ 'arm': fill_using(ARM_JOINT_NAMES),
+ }
+ # set equality ignores order
+
+ return positions_by_interface, False, ""
+
+
+class BaseMed(BaseRobot):
+
+ def __init__(self, robot_namespace: str):
+ BaseRobot.__init__(self, robot_namespace=robot_namespace)
+
+ self.arm_command_pub = rospy.Publisher(self.ns("motion_command"), MotionCommand, queue_size=10)
+ self.set_control_mode_srv = rospy.ServiceProxy(self.ns("set_control_mode_service"), SetControlMode)
+ self.get_control_mode_srv = rospy.ServiceProxy(self.ns("get_control_mode_service"), GetControlMode)
+ self.arm_status_listener = Listener(self.ns("motion_status"), MotionStatus)
+ self.waypoint_state_pub = rospy.Publisher(self.ns("waypoint_robot_state"), DisplayRobotState, queue_size=10)
+
+ def send_joint_command(self, joint_names: Sequence[str], trajectory_point: JointTrajectoryPoint) -> Tuple[
+ bool, str]:
+ # TODO: in med's impedance mode, we want to modify the setpoint so that there is a limit
+ # on the force we will apply
+ positions, abort, msg = delegate_to_arms(trajectory_point.positions, joint_names)
+ if abort:
+ return True, msg
+
+ velocities, _, _ = delegate_to_arms([0.0] * len(ARM_JOINT_NAMES), ARM_JOINT_NAMES)
+ if len(trajectory_point.velocities) != 0:
+ velocities, abort, msg = delegate_to_arms(trajectory_point.velocities, joint_names)
+ if abort:
+ return True, msg
+
+ # This is a bit silly.
+ positions = positions['arm']
+ velocities = velocities['arm']
+
+ control_mode = self.get_control_mode()
+
+ def trunc(values, decs=0):
+ return np.trunc(values * 10 ** decs) / (10 ** decs)
+
+ velocities = trunc(np.array(velocities), 3) # Kuka does not like sending small but non-zero velocity commands
+ # FIXME: what if we allow the BaseRobot class to use moveit, but just don't have it require that
+ # any actions are running?
+ # NOTE: why are these values not checked by the lower-level code? the Java code knows what the joint limits
+ # are so why does it not enforce them?
+
+ # TODO: use enforce bounds? https://github.com/ros-planning/moveit/pull/2356
+ limit_enforced_positions = []
+ for i, joint_name in enumerate(ARM_JOINT_NAMES):
+ joint: moveit_commander.RobotCommander.Joint = self.robot_commander.get_joint(joint_name)
+ limit_enforced_position = np.clip(positions[i], joint.min_bound() + 1e-2, joint.max_bound() - 1e-2)
+ limit_enforced_positions.append(limit_enforced_position)
+
+ # TODO: enforce velocity limits
+ cmd = MotionCommand(joint_position=list_to_jvq(limit_enforced_positions),
+ joint_velocity=list_to_jvq(velocities),
+ control_mode=control_mode)
+ cmd.header.stamp = rospy.Time.now()
+ self.arm_command_pub.publish(cmd)
+
+ return False, ""
+
+ def get_control_mode(self):
+ control_mode_res: GetControlModeResponse = self.get_control_mode_srv(GetControlModeRequest())
+ return control_mode_res.active_control_mode.control_mode
+
+ def set_control_mode(self, control_mode: ControlMode, vel, **kwargs):
+ new_control_mode = get_control_mode_params(control_mode, vel=vel, **kwargs)
+ res: SetControlModeResponse = self.set_control_mode_srv(new_control_mode)
+
+ if not res.success:
+ rospy.logerr("Failed to switch arm to control mode: " + str(control_mode))
+ rospy.logerr(res.message)
+ return res
+
+ def get_arm_status(self):
+ return self.arm_status_listener.get()
+
+ def send_cartesian_command(self, pose_stamped):
+ pose_stamped = convert_to_pose_msg(pose_stamped)
+ pose_stamped.pose.orientation = normalize_quaternion(pose_stamped.pose.orientation)
+
+ left_arm_command = MotionCommand()
+ left_arm_command.header.frame_id = 'med_base'
+ left_arm_command.cartesian_pose = pose_stamped.pose
+ left_arm_command.control_mode = self.get_control_mode()
+ while self.arm_command_pub.get_num_connections() < 1:
+ rospy.sleep(0.01)
+
+ self.arm_command_pub.publish(left_arm_command)
+
+ # TODO: send_delta_cartesian_command
+
+ def get_joint_positions_map(self) -> Dict[str, float]:
+ arm_joint_vals = jvq_to_list(self.arm_status_listener.get().measured_joint_position)
+ return {n: val for n, val in zip(ARM_JOINT_NAMES, arm_joint_vals)}
+
+ def get_joint_positions(self, joint_names: Sequence[str] = ARM_JOINT_NAMES):
+ """
+ :args joint_names an optional list of names if you want to have a specific order or a subset
+ """
+ position_of_joint = self.get_joint_positions_map()
+
+ print(position_of_joint)
+ return [position_of_joint[name] for name in joint_names]
+
+
+class Med(MoveitEnabledRobot, BaseMed):
+
+ def __init__(self, robot_namespace: str = 'med', force_trigger: float = -0.0, **kwargs):
+ MoveitEnabledRobot.__init__(self,
+ robot_namespace=robot_namespace,
+ arms_controller_name='arm_trajectory_controller',
+ force_trigger=force_trigger,
+ **kwargs)
+ BaseMed.__init__(self, robot_namespace=robot_namespace)
+ self.arm_group = 'kuka_arm'
+ self.wrist = 'med_kuka_link_ee'
+
+ def get_arm_joints(self):
+ return ARM_JOINT_NAMES
+
+ def set_control_mode(self, control_mode: ControlMode, vel=0.1, **kwargs):
+ super().set_control_mode(control_mode, vel, **kwargs)
+ self._max_velocity_scale_factor = vel
+
+ def set_grasping_force(self, force):
+ if type(force) != float or force > 80.0 or force <= 0.0:
+ print("Bad grasp force value provided! Not setting grasping force.")
+ return None
+
+ rospy.wait_for_service('wsg_50_driver/set_force')
+ try:
+ force_proxy = rospy.ServiceProxy('wsg_50_driver/set_force', Conf)
+ force_resp = force_proxy(force)
+ return force_resp.error
+ except rospy.ServiceException as e:
+ print("Service call failed: %s" % e)
+
+ def grasp(self, width, speed=50.0):
+ rospy.wait_for_service('wsg_50_driver/grasp')
+ try:
+ close_proxy = rospy.ServiceProxy('wsg_50_driver/grasp', Move)
+ grasp_resp = close_proxy(width, speed)
+ return grasp_resp.error
+ except rospy.ServiceException as e:
+ print("Service call failed: %s" % e)
+
+ def open_gripper(self):
+ rospy.wait_for_service('wsg_50_driver/move')
+ try:
+ move_proxy = rospy.ServiceProxy('wsg_50_driver/move', Move)
+ move_resp = move_proxy(100.0, 50.0)
+ return move_resp.error
+ except rospy.ServiceException as e:
+ print("Service call failed: %s" % e)
+
+ def release(self, width=110.0, speed=50.0):
+ rospy.wait_for_service('wsg_50_driver/release')
+ try:
+ release_proxy = rospy.ServiceProxy('wsg_50_driver/release', Move)
+ release_resp = release_proxy(width, speed)
+ return release_resp.error
+ except rospy.ServiceException as e:
+ print("Service call failed: %s" % e)
diff --git a/arm_robots/src/arm_robots/robot.py b/arm_robots/src/arm_robots/robot.py
index 35a454c..a9828f2 100644
--- a/arm_robots/src/arm_robots/robot.py
+++ b/arm_robots/src/arm_robots/robot.py
@@ -12,7 +12,7 @@
from actionlib import SimpleActionClient
from arc_utilities.conversions import convert_to_pose_msg
from arc_utilities.ros_helpers import try_to_connect
-from arm_robots.base_robot import DualArmRobot
+from arm_robots.base_robot import BaseRobot
from arm_robots.robot_utils import make_follow_joint_trajectory_goal, PlanningResult, PlanningAndExecutionResult, \
ExecutionResult, is_empty_trajectory, merge_joint_state_and_scene_msg
from control_msgs.msg import FollowJointTrajectoryAction, FollowJointTrajectoryFeedback, FollowJointTrajectoryResult, \
@@ -44,7 +44,7 @@ def on_error(message):
urdf_parser_py.xml_reflection.core.on_error = on_error
-class MoveitEnabledRobot(DualArmRobot):
+class MoveitEnabledRobot(BaseRobot):
def __init__(self,
robot_namespace: str,
@@ -69,10 +69,6 @@ def __init__(self,
self.display_goal_position_pub = rospy.Publisher('goal_position', Marker, queue_size=10)
self.display_robot_state_pubs = {}
- # Override these in base classes!
- self.left_tool_name = None
- self.right_tool_name = None
-
self.arms_client = None
self.jacobian_follower = pyjacobian_follower.JacobianFollower(robot_namespace=self.robot_namespace,
translation_step_size=0.005,
@@ -340,7 +336,8 @@ def follow_jacobian_to_position_from_scene_and_state(self,
rospy.logerr(err_msg)
raise ValueError(err_msg)
- preferred_tool_orientations = self.merge_tool_orientations_with_defaults(tool_names, preferred_tool_orientations)
+ preferred_tool_orientations = self.merge_tool_orientations_with_defaults(tool_names,
+ preferred_tool_orientations)
if len(preferred_tool_orientations) == 0:
return ExecutionResult(trajectory=None,
execution_result=None,
@@ -383,7 +380,8 @@ def follow_jacobian_to_position(self,
rospy.logerr(err_msg)
raise ValueError(err_msg)
- preferred_tool_orientations = self.merge_tool_orientations_with_defaults(tool_names, preferred_tool_orientations)
+ preferred_tool_orientations = self.merge_tool_orientations_with_defaults(tool_names,
+ preferred_tool_orientations)
if len(preferred_tool_orientations) == 0:
return ExecutionResult(trajectory=None,
execution_result=None,
@@ -409,9 +407,6 @@ def follow_jacobian_to_position(self,
stop_condition=stop_condition)
return PlanningAndExecutionResult(planning_result, execution_result)
- def get_both_arm_joints(self):
- return self.get_left_arm_joints() + self.get_right_arm_joints()
-
def get_joint_names(self, group_name: Optional[str] = None):
# NOTE: Getting a list of joint names should not require anything besides a lookup on the ros parameter server.
# However, MoveIt does not have this implemented that way, it requires connecting to the move group node.
@@ -428,34 +423,9 @@ def get_joint_names(self, group_name: Optional[str] = None):
def get_num_joints(self, group_name: Optional[str] = None):
return len(self.get_joint_names(group_name))
- def get_right_arm_joints(self):
- raise NotImplementedError()
-
- def get_left_arm_joints(self):
- raise NotImplementedError()
-
- def get_right_gripper_joints(self):
- raise NotImplementedError()
-
- def get_left_gripper_joints(self):
- raise NotImplementedError()
-
def send_joint_command(self, joint_names: List[str], trajectory_point: JointTrajectoryPoint) -> Tuple[bool, str]:
raise NotImplementedError()
- def get_gripper_positions(self):
- # NOTE: this function requires that gazebo be playing
- return self.get_link_pose(self.left_tool_name).position, self.get_link_pose(self.right_tool_name).position
-
- def is_gripper_closed(self, gripper: str):
- raise NotImplementedError()
-
- def is_left_gripper_closed(self):
- return self.is_gripper_closed('left')
-
- def is_right_gripper_closed(self):
- return self.is_gripper_closed('right')
-
def display_robot_state(self, robot_state: RobotState, label: str, color: Optional = None):
"""
diff --git a/arm_robots/src/arm_robots/trajectory_follower.py b/arm_robots/src/arm_robots/trajectory_follower.py
index 195759e..abac34a 100755
--- a/arm_robots/src/arm_robots/trajectory_follower.py
+++ b/arm_robots/src/arm_robots/trajectory_follower.py
@@ -8,13 +8,13 @@
from rosgraph.names import ns_join
from trajectory_msgs.msg import JointTrajectoryPoint
-from arm_robots.base_robot import DualArmRobot
+from arm_robots.base_robot import BaseRobot
from arm_robots.robot_utils import get_ordered_tolerance_list, interpolate_joint_trajectory_points, is_waypoint_reached, \
waypoint_error
class TrajectoryFollower:
- def __init__(self, robot: DualArmRobot, controller_name: str):
+ def __init__(self, robot: BaseRobot, controller_name: str):
self.robot = robot
self.action_name = ns_join(robot.robot_namespace, ns_join(controller_name, "follow_joint_trajectory"))
self.server = actionlib.SimpleActionServer(self.action_name, FollowJointTrajectoryAction,
diff --git a/arm_robots/src/arm_robots/victor.py b/arm_robots/src/arm_robots/victor.py
index ef5b223..fd68092 100644
--- a/arm_robots/src/arm_robots/victor.py
+++ b/arm_robots/src/arm_robots/victor.py
@@ -19,7 +19,7 @@
from arc_utilities.conversions import convert_to_pose_msg, normalize_quaternion, convert_to_positions
from arc_utilities.listener import Listener
-from arm_robots.base_robot import DualArmRobot
+from arm_robots.base_robot import BaseRobot
from arm_robots.config.victor_config import NAMED_POSITIONS, default_robotiq_command, \
KUKA_MIN_PATH_JOINT_POSITION_TOLERANCE, KUKA_FULL_SPEED_PATH_JOINT_POSITION_TOLERANCE, LEFT_GRIPPER_JOINT_NAMES, \
RIGHT_GRIPPER_JOINT_NAMES, LEFT_ARM_JOINT_NAMES, RIGHT_ARM_JOINT_NAMES, BOTH_ARM_JOINT_NAMES, ALL_JOINT_NAMES, \
@@ -75,10 +75,10 @@ def fill_using(joint_ordering: Sequence[str]):
return positions_by_interface, False, ""
-class BaseVictor(DualArmRobot):
+class BaseVictor(BaseRobot):
def __init__(self, robot_namespace: str):
- DualArmRobot.__init__(self, robot_namespace=robot_namespace)
+ BaseRobot.__init__(self, robot_namespace=robot_namespace)
self.left_arm_command_pub = rospy.Publisher(self.ns("left_arm/motion_command"), MotionCommand, queue_size=10)
self.right_arm_command_pub = rospy.Publisher(self.ns("right_arm/motion_command"), MotionCommand, queue_size=10)
@@ -162,6 +162,25 @@ def trunc(values, decs=0):
cmd.header.stamp = rospy.Time.now()
command_pub.publish(cmd)
+ def get_right_gripper_links(self):
+ return self.robot_commander.get_link_names("right_gripper")
+
+ def get_left_gripper_links(self):
+ return self.robot_commander.get_link_names("left_gripper")
+
+ def open_left_gripper(self):
+ self.left_gripper_command_pub.publish(self.get_open_gripper_msg())
+
+ def open_right_gripper(self):
+ self.right_gripper_command_pub.publish(self.get_open_gripper_msg())
+
+ def close_left_gripper(self):
+ # TODO: implementing blocking grasping
+ self.left_gripper_command_pub.publish(self.get_close_gripper_msg())
+
+ def close_right_gripper(self):
+ self.right_gripper_command_pub.publish(self.get_close_gripper_msg())
+
def get_gripper_statuses(self):
return {'left': self.get_left_gripper_status(),
'right': self.get_right_gripper_status()}
@@ -416,12 +435,19 @@ def speak(self, message: str):
rospy.loginfo(f"Victor says: {message}")
self.polly_pub.publish(String(data=message))
+ def get_both_arm_joints(self):
+ return self.get_left_arm_joints() + self.get_right_arm_joints()
+
def get_right_arm_joints(self):
return RIGHT_ARM_JOINT_NAMES
def get_left_arm_joints(self):
return LEFT_ARM_JOINT_NAMES
+ def get_gripper_positions(self):
+ # NOTE: this function requires that gazebo be playing
+ return self.get_link_pose(self.left_tool_name).position, self.get_link_pose(self.right_tool_name).position
+
def follow_joint_trajectory_feedback_cb(self,
client: SimpleActionClient,
goal: FollowJointTrajectoryGoal,