forked from CURG-archive/trajectory_planner
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstaubli_manager.py
336 lines (268 loc) · 13.4 KB
/
staubli_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
import roslib
roslib.load_manifest("staubliTX60")
import rospy
from ft_manager import *
import staubliTX60
from staubliTX60.srv import *
import staubliTX60.msg
import geometry_msgs.msg
import tf
import tf.transformations
import tf_conversions.posemath as pm
from numpy import *
import actionlib
import copy as cp
import actionlib_msgs.msg
_delay_num = 5
_hits = _delay_num
_threshold = -14
def point_list_to_msg_list( dof_traj_list ):
"""@brief - Structure a list of dof trajectory points as a ros message along with appropriate
staubli controller parameters
@param dof_traj_list - list of dof trajectory points
Returns list of ros joint trajectory messages.
"""
p = staubliTX60.msg.StaubliMovementParams(jointVelocity = 0.4, jointAcc = 0.04, jointDec= 0.04,
endEffectorMaxTranslationVel = 9999.0,
endEffectorMaxRotationalVel = 9999.0,
movementType = 1,
distBlendPrev = 0.01,
distBlendNext = 0.01)
joint_point_msg_list = []
for point in dof_traj_list:
pf = [float(p2) for p2 in point]
joint_point_msg_list.append(staubliTX60.msg.JointTrajectoryPoint(jointValues = cp.deepcopy(pf), params = cp.deepcopy(p)))
joint_point_msg_list[-1].params.distBlendPrev = 0
joint_point_msg_list[-1].params.distBlendNext = 0
joint_point_msg_list[-1].params.movementType = 0
return joint_point_msg_list
def run_staubli_on_joint_goal(j, blocking = False):
"""@brief - Move staubli to a specified joint position
@param j - Desired joint position
@param blocking - Whether to block on completion of action before returning.
Returns whether motion was successful if blocking or successfully enqueued if not blocking
"""
client = actionlib.SimpleActionClient('setJoints', staubliTX60.msg.SetJointsAction)
if not client.wait_for_server():
print "SetJointTrajectory action server could not be found "
return False, []
action_goal = staubliTX60.msg.SetJointsGoal(j = j)
client.send_goal(action_goal)
if blocking:
client.wait_for_result()
return client.get_state() == actionlib_msgs.msg.GoalStatus.SUCCEEDED or client.get_state() == actionlib_msgs.msg.GoalStatus.PENDING or client.get_state() == actionlib_msgs.msg.GoalStatus.ACTIVE, client, j
#def fix_traj_list(dof_traj_list):
# return [[j%(2*pi) for j in traj_point] for traj_point in dof_traj_list]]
def run_staubli_on_trajectory_point_message( dof_traj_list , blocking = False ):
"""@brief - take a list of trajectory point messages and run the staubli
@param traj_message_list - set of trajectory point messages
@param blocking - should we run the robot to finish trajectory
Returns whether motion was successful if blocking or successfully enqueued if not blocking.
"""
traj_message_list = point_list_to_msg_list( dof_traj_list )
client = actionlib.SimpleActionClient('setJointTrajectory', staubliTX60.msg.SetJointTrajectoryAction)
if not client.wait_for_server():
print "SetJointTrajectory action server could not be found "
return False, client
action_goal = staubliTX60.msg.SetJointTrajectoryGoal(jointTrajectory = traj_message_list)
client.send_goal(action_goal)
if blocking:
client.wait_for_result()
return client.get_state() == actionlib_msgs.msg.GoalStatus.SUCCEEDED or client.get_state() == actionlib_msgs.msg.GoalStatus.PENDING or client.get_state() == actionlib_msgs.msg.GoalStatus.ACTIVE, client
def get_staubli_joints():
"""@brief - Read the staubli's joints.
Returns joint vector on success, empty list on failure.
"""
rospy.wait_for_service('getJoints')
try:
get_joints = rospy.ServiceProxy( 'getJoints', GetJoints )
resp1 = get_joints()
return resp1.j
except rospy.ServiceException, e:
print "Service call failed: %s"%e
return []
def get_staubli_cartesian_as_pose_msg():
"""@brief - Read the staubli end effector's cartesian position as a pose message
Returns pose message on success or empty message on failure
"""
rospy.wait_for_service('getCartesian')
try:
get_cartesian = rospy.ServiceProxy( 'getCartesian', GetCartesian )
resp1 = get_cartesian()
# make srv x y z rx ry rz euler angle representation into pose message
pos = geometry_msgs.msg.Point( x = resp1.x, y = resp1.y, z = resp1.z )
q = tf.transformations.quaternion_from_euler( resp1.rx , resp1.ry, resp1.rz ,'rxyz' )
quat = geometry_msgs.msg.Quaternion( x = q[0], y = q[1], z = q[2], w = q[3] )
return geometry_msgs.msg.Pose( position = pos, orientation = quat )
except rospy.ServiceException, e:
print "Service call failed: %s"%e
return []
def get_staubli_cartesian_as_tran():
"""@brief - Read the staubli end effector's cartesian position as a 4x4 numpy matrix
Returns 4x4 numpy message on success, empty message on failure
"""
current_pose = get_staubli_cartesian_as_pose_msg()
return pm.toMatrix(pm.fromMsg(current_pose))
def send_cartesian_goal(end_effector_tran, blocking = False, movement_params = []):
"""@brief - Send a cartesian goal to the robot. Return action client
@param end_effector_tran - Desired end effector position
@param blocking - Whether to block on completion of action
@param movement_params - parameters for staubli arm motion. See staubli
documentation for details
Returns action client and desired end effector transform.
"""
if movement_params == []:
p = staubliTX60.msg.StaubliMovementParams(jointVelocity = 0.4,
jointAcc = 0.04,
jointDec= 0.04,
endEffectorMaxTranslationVel = 9999.0,
endEffectorMaxRotationalVel = 9999.0,
movementType = 1,
distBlendPrev = 0.01,
distBlendNext = 0.01)
else:
p = staubliTX60.msg.StaubliMovementParams(jointVelocity = movement_params[0],
jointAcc = movement_params[1],
jointDec= movement_params[2],
endEffectorMaxTranslationVel = 9999.0,
endEffectorMaxRotationalVel = 9999.0,
movementType = 1,
distBlendPrev = 0.01,
distBlendNext = 0.01)
client = actionlib.SimpleActionClient('setCartesian', staubliTX60.msg.SetCartesianAction)
if not client.wait_for_server():
print "SetCartesian action server could not be found "
return False, []
action_goal = staubliTX60.msg.SetCartesianGoal()
action_goal.x,action_goal.y,action_goal.z = end_effector_tran[0:3,3]
action_goal.rx,action_goal.ry,action_goal.rz = tf.transformations.euler_from_matrix( end_effector_tran, 'rxyz' )
action_goal.lineCtrl = 1
action_goal.params = p
client.send_goal(action_goal)
if blocking:
client.wait_for_result()
return client, end_effector_tran
def cancel_arm_motion():
"""@brief - Cancel any active movement actions and stop arm.
Returns true if service call is successful, false otherwise.
"""
reset_motion = rospy.ServiceProxy('/cancelMotion', ResetMotion)
try:
reset_motion()
print "arm motion cancelled"
return 1
rospy.logwarn("motion cancelled")
except Exception as e:
rospy.logwarn("cancelMotion failed %s"%e)
return 0
def lift_arm(dist, blocking = False, movement_params = []):
"""@brief - Move arm in Z direction of world coordinate system.
@param dist - Distance to move in meters.
@param blocking - Whether to block on completion of action
@param movement_params - parameters for staubli arm motion. See staubli
documentation for details
Returns whether motion was successful if blocking or successfully enqueued if not blocking
"""
current_arm_tran = get_staubli_cartesian_as_tran()
offset_tran = eye(4)
offset_tran[2,3] = dist
ee_tran = dot(offset_tran, current_arm_tran)
client, end_effector_tran = send_cartesian_goal(ee_tran, blocking, movement_params)
return client.get_state() == actionlib_msgs.msg.GoalStatus.SUCCEEDED or client.get_state() == actionlib_msgs.msg.GoalStatus.PENDING or client.get_state() == actionlib_msgs.msg.GoalStatus.ACTIVE
def move_forward(dist, blocking = False, movementParams = []):
"""@brief - Move arm in Z direction of hand coordinate system.
@param dist - Distance to move in meters.
@param blocking - Whether to block on completion of action
@param movement_params - parameters for staubli arm motion. See staubli
documentation for details
Returns whether motion was successful if blocking or successfully enqueued if not blocking
"""
current_arm_tran = get_staubli_cartesian_as_tran()
offset_tran = eye(4)
offset_tran[2,3] = dist
ee_tran = dot(current_arm_tran, offset_tran)
client, end_effector_tran = send_cartesian_goal(ee_tran, blocking, movementParams)
return client.get_state() == actionlib_msgs.msg.GoalStatus.SUCCEEDED or client.get_state() == actionlib_msgs.msg.GoalStatus.PENDING or client.get_state() == actionlib_msgs.msg.GoalStatus.ACTIVE
def move_forward_to_contact2(dist, blocking = False, movementParams = []):
"""@brief - DEPRECATED: Move arm in Z direction of hand coordinate system by a particular distance or until contact is made.
@param dist - Distance to move in meters.
@param blocking - Whether to block on completion of action
@param movement_params - parameters for staubli arm motion. See staubli
documentation for details
Returns whether motion resulted in contacted.
"""
tare_FT()
# SetFTCallback(setFTData, FTThresholdCallback)
move_forward(dist, blocking, movementParams)
#sleep(10)
return unset_ft_callback(global_data)
def SetFTCallback(global_data, callback_func):
"""@brief - DEPRECATED: enable force-torque sensor client
@param global_data - data structure for storing force-torque subscriber.
@param callback_func - processes force-torque data
"""
if not global_data.ft_sub == []:
sub = global_data.ft_sub
global_data.ft_sub = []
sub.unregister()
del sub
global_data.ft_sub = rospy.Subscriber("/ForceTorque/Readings", geometry_msgs.msg.Wrench, callback_func)
def UnsetFTCallback(global_data):
"""@brief - DEPRECATED: disable force-torque sensor client.
@param global_data - data structure for storing force-torque subscriber.
"""
if not global_data.ft_sub == []:
sub = global_data.ft_sub
global_data.ft_sub = []
sub.unregister()
del sub
return True
return False
def tare_force_torque(blocking = True):
"""@brief - reset baseline for force-torque sensor
@param blocking - block on return of service call
Returns true of service call succeeds, false if service call fails.
"""
global global_data
if blocking:
try:
tare = rospy.ServiceProxy('/ForceTorque/TareService', EmptySrv)
resp1 = tare()
return resp1
except:
rospy.logwarn('Service Call to tare service failed')
return 0
else:
if global_data.ft_tare_pub == []:
global_data.ft_tare_pub = rospy.Publisher("/ForceTorque/Tare", EmptyMsg)
global_data.ft_tare_pub.publish()
sleep(4)
return 1
def move_forward_to_contact(dist, blocking = True, wait_time = 10):
"""@brief - Move arm in Z direction of hand coordinate system by a particular distance or until contact is made.
@param dist - Distance to move in meters.
@param blocking - Whether to block on completion of action
@param wait_time - Time in seconds to wait for completion
Returns whether motion resulted in contacted.
"""
tare_FT()
ftw = FTWatcher(cancel_arm_motion)
ftw.set_FT_callback()
move_forward(dist, blocking)
if not blocking:
sleep(wait_time)
return ftw.unset_FT_callback()
def lift_arm_to_contact(dist, blocking=True, wait_time = 10):
"""@brief - Move arm in Z direction of world coordinate system by a particular distance or until contact is made.
@param dist - Distance to move in meters.
@param blocking - Whether to block on completion of action
@param wait_time - Time in seconds to wait for completion
Returns whether motion resulted in contacted.
"""
tare_FT()
ftw = FTWatcher(cancel_arm_motion)
ftw.set_FT_callback()
lift_arm(dist, blocking)
if not blocking:
sleep(wait_time)
return ftw.unset_FT_callback()