-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsigrokWrapper.py
400 lines (310 loc) · 14 KB
/
sigrokWrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
"""Wrap sigrok python API
* sigrok-py hangs Python session if sr.Context created more than
once --> use class leven persistent state 'context'
* sigrok-py complains if device open/close state not managet
correctly && make it easier to make several actions on device -->
add support for python with statement (__enter__, __exit__ -methods)
* key-names must be converted to sigrok internal representation -->
hide this mapping
* wrap error in exceptions to document error context (and to help
user to solve the problem)
* add support for some Python constructs:
* manage device open/close state to support Python with -statement
* drivers/device documentation string
* provide utilites for parsing datacquisition callback (WORK IN PROGRESS)
* expose Sigrok Python API (i.e. no need to use import sigrok.core as sr)
"""
import sigrok.core as sr
import sys
import time
from datetime import datetime
import logging
class Device:
"""Responsibilities:
1) Manage device 'open' state
2) Map keyName to internal key-id in getter&setter
3) Get and set functions
4) __str__ support for the device
"""
# ------------------------------------------------------------------
# Constructor
def __init__( self, device):
self.device = device # sigrok device
self.isOpen = False # we are managing device state, initially 'Closed'
# Warning 'key.name' returned cannot be used for a keyName
# use sigrok-cli to find key names. For exampe, for the demo device
#
# sigrok-cli --driver=demo --show
def config_keys( self, channel_group=None ):
if not channel_group is None:
try:
configObject = self.device.channel_groups[channel_group]
except KeyError as err:
raise KeyError(f"Invalid channel_group '{channel_group}'. Expect one of {[ k for k,v in self.device.channel_groups.items()]}") from err
else:
configObject = self.device
config_key_names = [ key.name for key in configObject.config_keys() ]
return config_key_names
def channel_groups( self ):
channel_group_names = [ gr for gr in self.device.channel_groups ]
return channel_group_names
def channels( self ):
channel_names = [ ch.name for ch in self.device.channels ]
return channel_names
# Device getters and setters
def get(self, keyName, channel_group=None):
"""
:channel_group: access 'keyName' on channel_group if given
"""
logging.info( f"get: keyName={keyName}, channel_group={channel_group}")
if self.isOpen:
if not channel_group is None:
try:
channel_group_obj = self.device.channel_groups[channel_group]
except KeyError as err:
raise KeyError(f"Invalid channel_group '{channel_group}'. Expect one of {[ k for k,v in self.device.channel_groups.items()]}") from err
return self.getConfigObject( keyName=keyName, configObject=channel_group_obj)
else:
return self.getConfigObject( keyName=keyName, configObject=self.device)
else:
print( f"Device {self} not open", file=sys.stderr)
return None
def set(self, keyName, value, channel_group=None):
"""Set 'keyName' to 'value' on device or on 'channel_group'
:channel_group: access 'keyName' on channel_group if given
"""
logging.info( f"set: keyName={keyName}, value={value}, channel_group={channel_group}")
if self.isOpen:
if not channel_group is None:
try:
channel_group_obj = self.device.channel_groups[channel_group]
except KeyError as err:
raise KeyError(f"Invalid channel_group '{channel_group}'. Expect one of {[ k for k,v in self.device.channel_groups.items()]}") from err
return self.setConfigObject( keyName=keyName, value=value, configObject=channel_group_obj)
else:
return self.setConfigObject( keyName=keyName, value=value, configObject=self.device)
else:
print( f"Device {self} not open - nothin done", file=sys.stderr)
return None
def getConfigObject(self, keyName, configObject):
"""
:configObject: sigrok.Configurable (i.e. Device, Channel_Group etc)
"""
try:
key = self.keyName2key(keyName)
except ValueError as err:
valid_key_names = [ key.name for key in configObject.config_keys() ]
raise ValueError( f"Invalid key '{keyName}'. Expect one of: {valid_key_names}") from err
try:
value = configObject.config_get(key)
except ValueError as err:
valid_key_names = [ key.name for key in configObject.config_keys() ]
raise ValueError( f"Error reading '{keyName}'. Expect one of: {valid_key_names}") from err
return value
def setConfigObject(self, keyName, value, configObject):
try:
key = self.keyName2key(keyName)
except ValueError as err:
valid_key_names = [ key.name for key in configObject.config_keys() ]
raise ValueError( f"Invalid key '{keyName}'. Expect one of: {valid_key_names}") from err
try:
# print( f"settign {value}[{type(value)}]")
ret = configObject.config_set(key,value)
except Exception as err:
capabilities = configObject.config_capabilities(key)
print( f"{keyName}[{key}], capabilities={capabilities}, listable: {sr.Capability.LIST in capabilities:}")
try:
# Not possible to peek for value
if sr.Capability.LIST in capabilities:
valid_values = f" Valid values={ configObject.config_list(key) }"
# valid_values = f" Valid values={ [str(v)+type(v) for v in configObject.config_list(key)]}"
else:
valid_values = ""
except:
valid_values = "exception in valid values"
raise Exception( f"Error in setting '{keyName}' to value {value}[{type(value)}].{valid_values}") from err
return ret
@staticmethod
def keyName2key(keyName):
"""Map 'keyName' to key sigrok identifier"""
key = sr.ConfigKey.get_by_identifier(keyName)
return key
# ------------------------------------------------------------------
# Open close
def open(self):
if not self.isOpen:
self.isOpen = True
self.device.open()
return self
def close(self):
if self.isOpen:
self.isOpen = False
self.device.close()
# Device information string
def __str__(self):
return f"{self.device.vendor}, model :{self.device.model}, version: {self.device.version} - {len(self.device.channels)} channels: {', '.join([c.name for c in self.device.channels])}"
class SigrokDriver:
"""Wrap sigrok python API
- find device for driver string
- manage device open/close state = support Python with statement
- manage context (class attribute)
- manage session (class attribute)
- session interface (open, start, run, close, clenaup)
- utilities to parse session run callback
Attributes:
* 'driver': sigrok.Driver object
"""
# Persisent, shared context - one instance to avoid python REPL
# (Read-Eval-Print Loop) from freezing
context = None
# One context managed
session = None
# Contstructore
def __init__( self, driver="rdtech-dps"):
"""Create 'sigrok.context', locate 'driver' (default rdtechDps')
from this context, find device from from
:driver: sigrok driver string used in sigrok-cli
e.g. 'rdtech-dps:conn=/dev/ttyUSB0'
"""
# Create only one 'context' instance in one python session
if SigrokDriver.context is None:
SigrokDriver.context = sr.Context.create()
#
driver_spec = driver.split(':')
driver_name = driver_spec[0]
# locate driver bundle into libsigrok
if driver_name not in self.context.drivers:
raise KeyError( f"Unknown driver name '{driver_name}' in '{driver}'. Supported hardware drivers: {','.join(self.context.drivers.keys())}")
self.driver = self.context.drivers[driver_name]
driver_options = {}
for pair in driver_spec[1:]:
name, value = pair.split('=')
# key = self.driver.ConfigKey.get_by_identifier(name)
# driver_options[name] = key.parse_string(value)
driver_options[name] = value
# attach to sigrok.Device wrapped within Device class
self.device = self.findDevice(self.driver, driver_options)
def findDevice(self, driver, driver_options):
"""Locate first (=the one and only?) device found using
driver_options. Stderr message if not found.
:driver: Driver for the device we are looking for
:driver_options: Hash map for drivers options used to locate the
device
:return: Device wrapper for the first sigrok device scanned,
None if not found
"""
scanned = driver.scan(**driver_options)
if len(scanned) > 0:
# return first device found
return Device(scanned[0])
else:
raise ValueError( f"Could not find any device for driver '{self.driver.name}' with options '{ ','.join([k+'='+v for k,v in driver_options.items()])}'")
# ------------------------------------------------------------------
# support with statement
def open(self):
if self.device is not None:
self.device.open()
return self.device
def close(self):
if self.device is not None:
self.device.close()
def __enter__(self):
"""Called when entering with -statement.
:return: device (which is opened)
"""
return( self.open())
def __exit__( self, *args):
"""Called when exiting with -statement. Close 'device' (if it open)
:return: device (which is opened)
"""
self.close()
# ------------------------------------------------------------------
# support with statement
@staticmethod
def session_create():
if SigrokDriver.session is None:
logging.info( "session created")
SigrokDriver.session = SigrokDriver.context.create_session()
return SigrokDriver.session
@staticmethod
def session_get():
if SigrokDriver.session is None:
raise ValueError( "session_get: Session missing - should have called 'session_create'")
return SigrokDriver.session
@staticmethod
def session_add_device( device ):
session = SigrokDriver.session_get()
session.add_device(device.device)
logging.info( f"added device {device.device}[{type(device.device)}] to session ")
@staticmethod
def session_start( fRun, fStop=lambda device,frame: print( "Stopped" ) ):
session = SigrokDriver.session_get()
logging.debug( f"session_start: called: is_running={session.is_running()}")
if session.is_running():
raise ValueError( f"Session was already running - not started")
# def datafeed_in(device, packet):
# logging.info( f"datafeed_in: packet type {packet.type} ")
# logging.debug( f"datafeed_in: payload methods {dir(packet.payload)} ")
# # print( f"device:{device.name}")
# session.append( device, packet)
# session.begin_save(outputFile)
session.add_datafeed_callback(fRun)
logging.debug( f"session_start: before start is_running={session.is_running()}" )
session.start()
logging.debug( f"session_start: after start is_running={session.is_running()}" )
@staticmethod
def session_run():
""":return: False is already running, else True
"""
session = SigrokDriver.session_get()
logging.debug( f"session_run: called session is_running {session.is_running()}" )
# if session.is_running():
# logging.info( f"session_run: session already running {session.is_running()} - nothing done" )
# return False
logging.info( f"session_run: before session.run, is_running {session.is_running()}" )
session.run()
logging.info( f"session_run: after session.run, is_running {session.is_running()}" )
return True
@staticmethod
def session_stop():
session = SigrokDriver.session_get()
session.stop()
# SigrokDriver.session = None
@staticmethod
def session_close():
SigrokDriver.session = None
# ------------------------------------------------------------------
# utilities to process callback data
@classmethod
def isAnalogPacket(cls, packet):
return packet.type == sr.PacketType.ANALOG
@classmethod
def isLogicPacket(cls, packet):
return packet.type == sr.PacketType.LOGIC
@classmethod
def packetChannels(cls, packet):
return [ch.name for ch in packet.payload._channels() ]
@classmethod
def packetChannels(cls, packet):
return [ch.name for ch in packet.payload._channels() ]
@classmethod
def parsePacketData(cls, packet, data):
logging.info( f"parsePacketData: packet.type={packet.type}")
if cls.isAnalogPacket(packet):
for i, channel in enumerate(cls.packetChannels(packet)):
# previosly unseen channel?
if channel not in data: data[channel] = []
logging.info( f"parsePacketData: channel {channel} {len(packet.payload.data[i])}")
data[channel].extend(packet.payload.data[i])
# data[channel].append(packet.payload.data[i][-1])
elif cls.isLogicPacket(packet):
channel = "logic"
# if channel not in data: data[channel] = []
# logging.info( f"parsePacketData: channel {channel} {len(packet.payload.data)}")
# data[channel].extend(packet.payload.data)
# TODO: collect also logic data (sepately?)
return data
# ------------------------------------------------------------------
# print out
def __str__(self):
return str(self.device)