-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnicla_sense_me_i2c.py
196 lines (166 loc) · 5.85 KB
/
nicla_sense_me_i2c.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import struct
import time
import board
import busio
# I2C commands.
# Naming from the point of view of the sensor.
CMD_BEGIN = 0x60
CMD_SEND_CALIB = 0x5B
CMD_SEND_ORIENTATION = 0x1A
CMD_SEND_ACCELEROMETER = 0x08
CMD_SEND_LINEAR_ACCELERATION = 0x28
CMD_SEND_GYROSCOPE = 0x14
CMD_SEND_MAGNETOMETER = 0x0E
CMD_SEND_ROTATION = 0x20
CMD_SEND_TEMPERATURE = 0x50
CMD_FETCH_CALIB = 0x5A
CMD_RECEIVE_CALIB = 0x5D
CMD_APPLY_CALIB = 0x5E
class NiclaSenseMe:
def __init__(self, i2c=None, address=0x28):
if i2c is None:
i2c = busio.I2C(board.SCL, board.SDA)
self.i2c = i2c
self.address = address
self.check_device()
def _writeto(self, data):
if isinstance(data, int):
data = bytes((data,))
while not self.i2c.try_lock():
time.sleep(0)
try:
self.i2c.writeto(self.address, data)
finally:
self.i2c.unlock()
return self
def _writeto_then_readfrom(self, data, read_format):
if isinstance(data, int):
data = bytes((data,))
buffer = bytearray(struct.calcsize(read_format))
while not self.i2c.try_lock():
time.sleep(0)
try:
self.i2c.writeto_then_readfrom(self.address, data, buffer)
finally:
self.i2c.unlock()
return struct.unpack_from(read_format, buffer)
def start(self):
self._writeto(CMD_BEGIN)
# Wait for the sensor to be ready, otherwise first data are zeros.
time.sleep(0.7)
return self
def orientation(self):
heading, roll, pitch = self._writeto_then_readfrom(
data=CMD_SEND_ORIENTATION, read_format="<hhh"
)
return (
heading / 91,
pitch / 91,
roll / 91,
) # In degrees. [0, 360], [-180, 180], [-90, 90]
def acceleration(self):
acc_x, acc_y, acc_z = self._writeto_then_readfrom(
data=CMD_SEND_ACCELEROMETER, read_format="<hhh"
)
acceleration_range = 8 # In earth g.
scale = acceleration_range / 32767
return acc_x * scale, acc_y * scale, acc_z * scale # In earth g.
def linear_acceleration(self):
lin_acc_x, lin_acc_y, lin_acc_z = self._writeto_then_readfrom(
data=CMD_SEND_LINEAR_ACCELERATION, read_format="<hhh"
)
acceleration_range = 8 # In earth g.
scale = acceleration_range / 32767
return lin_acc_x * scale, lin_acc_y * scale, lin_acc_z * scale # In earth g.
def gyroscope(self):
gyr_x, gyr_y, gyr_z = self._writeto_then_readfrom(
data=CMD_SEND_GYROSCOPE, read_format="<hhh"
)
gyroscope_range = 16.4
scale = gyroscope_range / 32767
return gyr_x * scale, gyr_y * scale, gyr_z * scale # In degrees/second.
def magnetometer(self):
mag_x, mag_y, mag_z = self._writeto_then_readfrom(
data=CMD_SEND_MAGNETOMETER, read_format="<hhh"
)
magnetometer_range = 16
scale = magnetometer_range / 32767
return mag_x * scale, mag_y * scale, mag_z * scale # In µT.
def quaternion(self):
rot_x, rot_y, rot_z, rot_w = self._writeto_then_readfrom(
data=CMD_SEND_ROTATION, read_format="<hhhh"
)
scale = 1 / 32767
return rot_x * scale, rot_y * scale, rot_z * scale, rot_w * scale
def temperature(self):
temp = self._writeto_then_readfrom(data=CMD_SEND_TEMPERATURE, read_format="<h")[
0
]
scale = 1 / 100
return temp * scale # In degree celsius.
def get_calib(self):
# Ask to prepare the calib on the device.
self._writeto(data=CMD_FETCH_CALIB)
# The sensor needs some time to prepare the calib.
time.sleep(0.5)
# Receive the calib by chunks of 16 bytes.
acc_calib = [
list(
self._writeto_then_readfrom(
data=CMD_SEND_CALIB, read_format=f"<{'B' * 16}"
)
)
for _ in range(32)
]
gyr_calib = [
list(
self._writeto_then_readfrom(
data=CMD_SEND_CALIB, read_format=f"<{'B' * 16}"
)
)
for _ in range(32)
]
mag_calib = [
list(
self._writeto_then_readfrom(
data=CMD_SEND_CALIB, read_format=f"<{'B' * 16}"
)
)
for _ in range(32)
]
acc_calib = sum(acc_calib, [])
gyr_calib = sum(gyr_calib, [])
mag_calib = sum(mag_calib, [])
assert len(acc_calib) == len(gyr_calib) == len(mag_calib) == 512
return {
"acc_calib": acc_calib,
"gyr_calib": gyr_calib,
"mag_calib": mag_calib,
}
def send_calib(self, acc_calib, gyr_calib, mag_calib):
# Need to be called before start() to be applied.
# See datasheet 13.3.3 BSX Algorithm Parameters.
assert len(acc_calib) == len(gyr_calib) == len(mag_calib) == 512
self._writeto(data=CMD_RECEIVE_CALIB)
for calib_data in (acc_calib, gyr_calib, mag_calib):
for i in range(32):
chunk = struct.pack(f"<{'B' * 16}", *calib_data[i * 16 : (i + 1) * 16])
self._writeto(data=chunk)
# Apply the calib.
self._writeto(data=CMD_APPLY_CALIB)
time.sleep(0.5)
return self
def check_device(self):
while not self.i2c.try_lock():
time.sleep(0)
try:
self.i2c.writeto(self.address, b"")
except OSError:
try:
result = bytearray(1)
self.i2c.readfrom_into(self.address, result)
except OSError:
raise ValueError(f"No I2C device at address {hex(self.address)}.")
finally:
self.i2c.unlock()
return self