-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcentralised_test.py
executable file
·126 lines (92 loc) · 3.08 KB
/
centralised_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#!/usr/bin/env python
import logging
import time
import numpy as np
import cflib.crtp
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.log import LogConfig
from cflib.crazyflie.syncLogger import SyncLogger
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
import rospy
import rosbag
from geometry_msgs.msg import Point
URI = 'radio://0/100/2M/E7E7E7E703'
# Only output errors from the logging framework
logging.basicConfig(level=logging.ERROR)
# Position Logging
def position_callback(timestamp, data, logconf):
x = data['kalman.stateX']
y = data['kalman.stateY']
z = data['kalman.stateZ']
msg.x = x
msg.y = y
msg.z = z
rospy.loginfo(msg)
pub.publish(msg)
bag.write('CF1_position', msg)
#print('pos: ({}, {}, {})'.format(x, y, z))
def start_position_printing(scf):
log_conf = LogConfig(name='Position', period_in_ms=100)
log_conf.add_variable('kalman.stateX', 'float')
log_conf.add_variable('kalman.stateY', 'float')
log_conf.add_variable('kalman.stateZ', 'float')
scf.cf.log.add_config(log_conf)
log_conf.data_received_cb.add_callback(position_callback)
log_conf.start()
# Main Centralised algorithm
def centr_avoid(scf):
cf = scf.cf
# Set Initial Values
cf.param.set_value('kalman.initialX', '0.6')
time.sleep(0.1)
cf.param.set_value('kalman.initialY', '0.0')
time.sleep(1)
# Reset the Kalman Filter (Important!)
cf.param.set_value('kalman.resetEstimation', '1')
time.sleep(0.1)
cf.param.set_value('kalman.resetEstimation', '0')
time.sleep(2)
# Read the inputs
# MPC application size
nsim = 100
ctrl_applied = inputReader(nsim,'testinputs1.txt')
print(ctrl_applied[99, 0])
print(ctrl_applied[99, 1])
# Send the velocity commands
# commander.send_hover_setpoint(vx,vy,yaw_rate,z)
# start up
for y in range(20):
cf.commander.send_hover_setpoint(0, 0, 0, 0.15)
time.sleep(0.1)
for y in range(10):
cf.commander.send_hover_setpoint(0, 0, 0, 0.2)
time.sleep(0.1)
#
for y in range(nsim):
cf.commander.send_hover_setpoint(ctrl_applied[y, 0], ctrl_applied[y, 1], 0, 0.2)
time.sleep(0.1)
for y in range(10):
cf.commander.send_hover_setpoint(0, 0, 0, 0.1)
time.sleep(0.1)
#
bag.close()
cf.commander.send_stop_setpoint()
def inputReader(nsim,doc):
input_matrix = np.zeros((nsim, 2))
f = open("/home/antonis/admm_collsion_avoidance/"+doc, "r")
for i, line in enumerate(f):
a = line.split(",")
input_matrix[i, :] = [a[0], a[1]]
f.close()
return input_matrix
if __name__ == '__main__':
#ROS variables
pub = rospy.Publisher('CF1_position', Point, queue_size=10)
rospy.init_node('centr', anonymous=True)
msg = Point()
bag = rosbag.Bag('RosBags/cf1.bag', 'w')
# Initialize the low-level drivers (don't list the debug drivers)
cflib.crtp.init_drivers(enable_debug_driver=False)
with SyncCrazyflie(URI, cf=Crazyflie(rw_cache='./cache')) as scf:
start_position_printing(scf)
centr_avoid(scf)