-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbiased_tree.py
540 lines (496 loc) · 23.3 KB
/
biased_tree.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
# -*- coding: utf-8 -*-
"""
construct trees for biased sampling optimal task planning for multi-robots
"""
from random import uniform
from networkx.classes.digraph import DiGraph
from networkx.algorithms import dfs_labeled_edges
import math
import numpy as np
from collections import OrderedDict
import pyvisgraph as vg
from shapely.geometry import Point, LineString
from uniform_geometry import sample_uniform_geometry
from scipy.stats import truncnorm
class BiasedTree(object):
"""
biased tree for prefix and suffix parts
"""
def __init__(self, workspace, buchi, init_state, init_label, segment, para):
"""
initialization of the tree
:param workspace: workspace
:param buchi: buchi automaton
:param init_state: initial location of the robots
:param init_label: label generated by the initial location
:param segment: prefix or suffix part
:param para: parameters regarding biased-sampling method
"""
# parameters regarding workspace
self.workspace = workspace.workspace
self.dim = len(self.workspace)
self.regions = workspace.regions
self.obstacles = workspace.obs
self.robot = buchi.number_of_robots
# parameters regarding task
self.buchi = buchi
self.accept = self.buchi.buchi_graph.graph['accept']
self.init = init_state
# initlizing the tree
self.biased_tree = DiGraph(type='PBA', init=self.init)
self.biased_tree.add_node(self.init, cost=0, label=init_label)
# parameters regarding TL-RRT* algorithm
self.goals = set() #self.goals 用于存储目标节点。
self.step_size = para['step_size'] #定义了路径规划中的步长
self.segment = segment
self.lite = para['is_lite']
# size of the ball used in function near
uni_v = np.power(np.pi, self.robot*self.dim/2) / math.gamma(self.robot*self.dim/2+1) #计算单位球体的体积(uni_v)
#gamma 的定义涉及 uni_v 的倒数的根,用来调整邻域半径。
#gamma 会决定在近邻搜索时的采样密度,保证了采样的区域在不同维度和机器人数量下均匀分布。
self.gamma = np.ceil(4 * np.power(1/uni_v, 1./(self.dim*self.robot))) # unit workspace
# parameters regarding biased sampling
# group the nodes in the tree by the buchi state
self.group = dict()
self.add_group(self.init)
# print(self.buchi.buchi_graph.graph['accept'])
# select final buchi states
if self.segment == 'prefix':
self.b_final = self.buchi.buchi_graph.graph['accept'][0]
else:
self.b_final = self.buchi.buchi_graph.graph['accept']
self.min_dis = np.inf
self.q_min2final = []
self.not_q_min2final = []
self.update_min_dis2final_and_partition(self.init)
# probability of selecting q_p_closest
self.p_closest = para['p_closest']
# weight when selecting x_rand
self.y_rand = para['y_rand']
# threshold for collision avoidance
self.threshold = para['threshold']
# polygon obstacle for visibility-based method
#多边形障碍物处理
polys = []
#收集了障碍物的顶点坐标,构成可见性图的障碍物集合
for poly in self.obstacles.values():
polys.append([vg.Point(x[0], x[1]) for x in list(poly.exterior.coords)[:-1]])
self.g = vg.VisGraph() #构建了可见性图,用于检测路径中障碍物的影响
self.g.build(polys, status=False)
#限制机器人的位置,使其保持在工作空间的范围内,防止超出边界
def trunc(self, i, value):
"""
limit the robot in the range of workspace
:param i: robot i, starting from 0
:param value: value to be adjusted
:return: adjusted value
"""
if value < 0:
return 0
elif value > self.workspace[i]:
return self.workspace[i]
else:
return value
#带偏置的采样,生成一个偏向性的采样点
def biased_sample(self):
#定义了一个基于 Büchi 自动机的引导采样函数。返回值是采样点 x_rand 和在转换上最接近目标的节点 q_p_closest
"""
buchi guided biased sample
:return: sampled point x_rand, closest node q_p_closest in terms of transitions
"""
# sample nodes as q_p_closest from two partitioned sets
p_rand = np.random.uniform(0, 1, 1) #随机生成一个0到1之间的数
q_p_closest = None
if (p_rand <= self.p_closest and len(self.q_min2final) > 0) or not self.not_q_min2final:
q_p_closest = sample_uniform_geometry(self.q_min2final)
elif p_rand > self.p_closest or not self.q_min2final:
q_p_closest = sample_uniform_geometry(self.not_q_min2final)
# find the reachable sets of buchi state of q_p_closest
reachable_q_b_closest = []
for b_state in self.buchi.buchi_graph.succ[q_p_closest[1]]:
if self.check_transition_b_helper(self.biased_tree.nodes[q_p_closest]['label'],
self.buchi.buchi_graph.edges[(q_p_closest[1], b_state)]['truth']):
reachable_q_b_closest.append(b_state)
# if reachable_q_b_closest is empty
if not reachable_q_b_closest:
return [], []
# collect the buchi states in the reachable set of q_p_closest with minimum distance to the final state
b_min_from_q_b_closest = self.get_min2final_from_subset(reachable_q_b_closest)
# collect the buchi states in the reachable set b_min_from_q_b_closest whose successors is 1 step less from
# the final state than the it is
reachable_decr = dict()
m_q_b_closest = []
for b_state in b_min_from_q_b_closest:
candidate = []
for succ in self.buchi.buchi_graph.succ[b_state]:
if self.buchi.min_length[(b_state, self.b_final)] - 1 == self.buchi.min_length[(succ, self.b_final)] \
or succ in self.buchi.buchi_graph.graph['accept']:
candidate.append(succ)
if candidate:
reachable_decr[b_state] = candidate
m_q_b_closest.append(b_state)
# if empty
if not m_q_b_closest:
return [], []
# sample q_b_min and q_b_decr
q_b_min = sample_uniform_geometry(m_q_b_closest)
q_b_decr = sample_uniform_geometry(reachable_decr[q_b_min])
# get the guarding symbol
truth = self.buchi.buchi_graph.edges[(q_b_min, q_b_decr)]['truth']
x_rand = list(q_p_closest[0])
return self.buchi_guided_sample_by_truthvalue(truth, x_rand, q_p_closest,
self.biased_tree.nodes[q_p_closest]['label'])
def buchi_guided_sample_by_truthvalue(self, truth, x_rand, q_p_closest, x_label):
"""
sample a point moving towards the region corresponding to the guarding symbol
:param truth: guarding symbol that enables the transition
:param q_p_closest: the node q_p_closest
:param x_rand: point to be sampled
:param x_label: label of position of q_p_closest
:return: sampled point x_rand, q_p_closest
"""
if truth == '1':
return q_p_closest[0], q_p_closest
else:
for key in truth:
# move towards the target position
if truth[key] and key not in x_label:
pair = key.split('_') # region-robot pair
robot_index = int(pair[1]) - 1
orig_x_rand = x_rand[robot_index] # save for further recover
while True:
x_rand[robot_index] = orig_x_rand # recover
if np.random.uniform(0, 1, 1) <= self.y_rand:
target = self.get_target(orig_x_rand, pair[0])
x_rand[robot_index] = self.gaussian_guided_towards_target(orig_x_rand, target)
else:
x_rand_i = [uniform(0, self.workspace[i]) for i in range(self.dim)]
x_rand[robot_index] = tuple(x_rand_i)
# sampled point lies within obstacles
if 'o' in self.get_label(x_rand[robot_index]): continue
# collision avoidance
if self.collision_avoidance(x_rand, robot_index): break
return tuple(x_rand), q_p_closest
def add_group(self, q_p):
"""
add q_p to the group within which all states have the same buchi state
:param q_p: a product state
"""
try:
self.group[q_p[1]].append(q_p)
except KeyError:
self.group[q_p[1]] = [q_p]
def get_min2final_from_subset(self, subset):
"""
collect the buchi state from the subset of nodes with minimum distance to the final state
:param subset: set of nodes
:return: list of buchi states with minimum distance to the final state
"""
l_min = np.inf
b_min = set()
for b_state in subset:
if self.buchi.min_length[(b_state, self.b_final)] < l_min:
l_min = self.buchi.min_length[(b_state, self.b_final)]
b_min = set([b_state])
elif self.buchi.min_length[(b_state, self.b_final)] == l_min:
b_min.add(b_state)
return b_min
def update_min_dis2final_and_partition(self, q_p_new):
"""
check whether q_p_new has the buchi component with minimum distance to the final state
if so, update the set b_min which collects the buchi states with minimum distance to the final state
and the set q_min2final which collects nodes in the tree with buchi states in b_min
:param q_p_new: new product state
"""
# smaller than the current nodes with minimum distance
if self.buchi.min_length[(q_p_new[1], self.b_final)] < self.min_dis:
self.min_dis = self.buchi.min_length[(q_p_new[1], self.b_final)]
self.not_q_min2final = self.not_q_min2final + self.q_min2final
self.q_min2final = [q_p_new]
# equivalent to
elif self.buchi.min_length[(q_p_new[1], self.b_final)] == self.min_dis:
self.q_min2final = self.q_min2final + [q_p_new]
# larger than
else:
self.not_q_min2final = self.not_q_min2final + [q_p_new]
def get_target(self, init, target):
"""
find the second vertex in the shortest path from initial point to the target region
:param init: initial point
:param target: target labeled region
:return: the second vertex
"""
tg = self.regions[target].centroid.coords[0]
shortest = self.g.shortest_path(vg.Point(init[0], init[1]), vg.Point(tg[0], tg[1]))
return shortest[1].x, shortest[1].y
def get_truncated_normal(self, mean=0, sd=1, low=0, upp=10):
"""
truncated normal distribution
:param mean: mean value
:param sd: standard deviation
:param low: lower bound of the random variable
:param upp: upper bound of the random variable
:return: value of the random variable
"""
return truncnorm((low - mean) / sd, (upp - mean) / sd, loc=mean, scale=sd)
def gaussian_guided_towards_target(self, x, target):
"""
calculate new point x_rand guided by the target
distance and angle follow the gaussian distribution
:param x: initial point
:param target: target point
:return: new point x_rand
"""
d = self.get_truncated_normal(0, 1/3*self.workspace[0], 0, np.inf).rvs()
# d = self.get_truncated_normal(np.linalg.norm(np.subtract(x, target)), 1/3/3, 0, np.inf)
angle = np.random.normal(0, np.pi/12/3/3, 1) + np.arctan2(target[1] - x[1], target[0] - x[0])
x_rand = np.add(x, np.append(d * np.cos(angle), d * np.sin(angle)))
x_rand = [self.trunc(i, x_rand_i) for i, x_rand_i in enumerate(x_rand)]
return tuple(x_rand)
def collision_avoidance(self, x, robot_index):
"""
check whether robots with smaller index than robot_index collide with the robot of index robot_index
:param x: position of robots
:param robot_index: index of the specific robot
:return: true if collision free
"""
for i in range(len(x)):
if i != robot_index and np.fabs(x[i][0] - x[robot_index][0]) <= self.threshold and \
np.fabs(x[i][1] - x[robot_index][1]) <= self.threshold:
return False
return True
def nearest(self, x_rand):
"""
find the nearest class of vertices in the tree
:param: x_rand randomly sampled point form: single point ()
:return: nearest class of vertices form: single point ()
"""
min_dis = math.inf
q_p_nearest = []
for node in self.biased_tree.nodes:
x = self.mulp2single(node[0])
dis = np.linalg.norm(np.subtract(x_rand, x))
if dis < min_dis:
q_p_nearest = [node]
min_dis = dis
elif dis == min_dis:
q_p_nearest.append(node)
return q_p_nearest
def steer(self, x_rand, x_nearest):
"""
steer
:param: x_rand randomly sampled point form: single point ()
:param: x_nearest nearest point in the tree form: single point ()
:return: new point single point ()
"""
if np.linalg.norm(np.subtract(x_rand, x_nearest)) <= self.step_size:
return x_rand
else:
return tuple(map(tuple, np.asarray(x_nearest) + self.step_size * (np.subtract(x_rand, x_nearest))/
np.linalg.norm(np.subtract(x_rand, x_nearest))))
def extend(self, q_new, near_nodes, label, obs_check):
"""
add the new sate q_new to the tree
:param: q_new: new state
:param: near_nodes: near state
:param: obs_check: check the line connecting two points are inside the freespace
:return: the tree after extension
"""
added = False
cost = np.inf
q_min = ()
# loop over all nodes in near_nodes
for node in near_nodes:
if q_new != node and obs_check[(q_new[0], node[0])] and \
self.check_transition_b(node[1], self.biased_tree.nodes[node]['label'], q_new[1]):
c = self.biased_tree.nodes[node]['cost'] \
+ np.linalg.norm(np.subtract(self.mulp2single(q_new[0]), self.mulp2single(node[0])))
if c < cost:
added = True
q_min = node
cost = c
if added:
self.biased_tree.add_node(q_new, cost=cost, label=label)
self.biased_tree.add_edge(q_min, q_new)
self.add_group(q_new)
self.update_min_dis2final_and_partition(q_new)
if self.segment == 'prefix' and q_new[1] in self.accept:
q_n = list(list(self.biased_tree.pred[q_new].keys())[0])
cost = self.biased_tree.nodes[tuple(q_n)]['cost']
label = self.biased_tree.nodes[tuple(q_n)]['label']
q_n[1] = q_new[1]
q_n = tuple(q_n)
if q_n != q_min:
self.biased_tree.add_node(q_n, cost=cost, label=label)
self.biased_tree.add_edge(q_min, q_n)
self.add_group(q_n)
self.update_min_dis2final_and_partition(q_n)
self.goals.add(q_n)
# if self.segment == 'suffix' and \
# self.obstacle_check([self.init], q_new[0], label)[(q_new[0], self.init[0])] \
# and self.check_transition_b(q_new[1], label, self.init[1]):
# self.goals.add(q_new)
elif self.segment == 'suffix' and self.init[1] == q_new[1]:
self.goals.add(q_new)
return added
def rewire(self, q_new, near_nodes, obs_check):
"""
:param: q_new: new state
:param: near_nodes: states returned near
:param: obs_check: check whether obstacle-free
:return: the tree after rewiring
"""
for node in near_nodes:
if obs_check[(q_new[0], node[0])] \
and self.check_transition_b(q_new[1], self.biased_tree.nodes[q_new]['label'], node[1]):
c = self.biased_tree.nodes[q_new]['cost'] \
+ np.linalg.norm(np.subtract(self.mulp2single(q_new[0]), self.mulp2single(node[0])))
delta_c = self.biased_tree.nodes[node]['cost'] - c
# update the cost of node in the subtree rooted at the rewired node
if delta_c > 0:
self.biased_tree.remove_edge(list(self.biased_tree.pred[node].keys())[0], node)
self.biased_tree.add_edge(q_new, node)
edges = dfs_labeled_edges(self.biased_tree, source=node)
for _, v, d in edges:
if d == 'forward':
self.biased_tree.nodes[v]['cost'] = self.biased_tree.nodes[v]['cost'] - delta_c
def near(self, x_new):
"""
find the states in the near ball
:param x_new: new point form: single point
:return: p_near: near state, form: tuple (mulp, buchi)
"""
near_nodes = []
radius = min(self.gamma * np.power(np.log(self.biased_tree.number_of_nodes()+1)/self.biased_tree.number_of_nodes(),
1./(self.dim*self.robot)), self.step_size)
for node in self.biased_tree.nodes:
if np.linalg.norm(np.subtract(x_new, self.mulp2single(node[0]))) <= radius:
near_nodes.append(node)
return near_nodes
def obstacle_check(self, near_node, x_new, label):
"""
check whether line from x_near to x_new is obstacle-free
:param near_node: nodes returned by near function
:param x_new: new position component
:param label: label of x_new
:return: a dictionary indicating whether the line connecting two points are obstacle-free
"""
obs_check = {}
checked = set()
for node in near_node:
# whether the position component of nodes has been checked
if node[0] in checked:
continue
checked.add(node[0])
obs_check[(x_new, node[0])] = True
flag = True # indicate whether break and jump to outer loop
for r in range(self.robot):
# the line connecting two points crosses an obstacle
for (obs, boundary) in iter(self.obstacles.items()):
if LineString([Point(node[0][r]), Point(x_new[r])]).intersects(boundary):
obs_check[(x_new, node[0])] = False
flag = False
break
# no need to check further
if not flag:
break
for (region, boundary) in iter(self.regions.items()):
if LineString([Point(node[0][r]), Point(x_new[r])]).intersects(boundary) \
and region + '_' + str(r + 1) != label[r] \
and region + '_' + str(r + 1) != self.biased_tree.nodes[node]['label'][r]:
obs_check[(x_new, node[0])] = False
flag = False
break
# no need to check further
if not flag:
break
return obs_check
def get_label(self, x):
"""
generating the label of position component
:param x: position
:return: label
"""
point = Point(x)
# whether x lies within obstacle
for (obs, boundary) in iter(self.obstacles.items()):
if point.within(boundary):
return obs
# whether x lies within regions
for (region, boundary) in iter(self.regions.items()):
if point.within(boundary):
return region
# x lies within unlabeled region
return ''
def check_transition_b(self, q_b, x_label, q_b_new):
"""
check whether q_b -- x_label ---> q_b_new
:param q_b: buchi state
:param x_label: label of x
:param q_b_new: buchi state
:return True if satisfied
"""
b_state_succ = self.buchi.buchi_graph.succ[q_b]
# q_b_new is not the successor of b_state
if q_b_new not in b_state_succ:
return False
# check whether label of x enables the transition
truth = self.buchi.buchi_graph.edges[(q_b, q_b_new)]['truth']
if self.check_transition_b_helper(x_label, truth):
return True
return False
def check_transition_b_helper(self, x_label, truth):
"""
check whether transition enabled with current generated label
:param x_label: label of the current position
:param truth: symbol enabling the transition
:return: true or false
"""
if truth == '1':
return True
# all true propositions should be satisdied
true_label = [true_label for true_label in truth.keys() if truth[true_label]]
for label in true_label:
if label not in x_label: return False
# all fasle propositions should not be satisfied
false_label = [false_label for false_label in truth.keys() if not truth[false_label]]
for label in false_label:
if label in x_label: return False
return True
def find_path(self, goals):
"""
find the path backwards
:param goals: found all goal states
:return: the path leading to the goal state and the corresponding cost
"""
paths = OrderedDict()
for i in range(len(goals)):
goals = list(goals)
goal = goals[i]
path = [goal]
s = goal
while s != self.init:
s = list(self.biased_tree.pred[s].keys())[0]
path.insert(0, s)
if self.segment == 'prefix':
paths[i] = [self.biased_tree.nodes[goal]['cost'], path]
elif self.segment == 'suffix':
path.append(self.init)
paths[i] = [self.biased_tree.nodes[goal]['cost'] + np.linalg.norm(np.subtract(self.mulp2single(goal[0]),
self.mulp2single(self.init[0]))), path]
return paths
def mulp2single(self, point):
"""
convert a point, which in the form of a tuple of tuple ((),(),(),...) to point in the form of a flat tuple
:param point: point((position of robot 1), (position of robot2), (), ...)
:return: point (position of robot1, position of robot2, ...)
"""
return tuple([p for r in point for p in r])
def single2mulp(self, point):
"""
convert a point in the form of flat tuple to point in the form of a tuple of tuple ((),(),(),...)
:param point: point (position of robot1, position of robot2, ...)
:return: point((position of robot 1), (position of robot2), (), ...)
"""
mp = [point[i*self.dim:(i+1)*self.dim] for i in range(self.robot)]
return tuple(mp)