forked from Am6er/nanopro
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
204 lines (193 loc) · 7.7 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import shproto.dispatcher
import shproto.alert
import time
import threading
import re
import argparse
import os
from datetime import datetime, timezone, timedelta
spec_dir = os.environ["HOME"] + "/nanopro/"
# spec_file = spec_dir + "spectrum.csv"
shproto.dispatcher.start_timestamp = datetime.now(timezone.utc)
def helptxt():
print("""
Some non-hazardous commands for text mode:
-inf
Prints debug information and variables
-sta
Starts collecting impulses for histogram
-sto
Stops collecting impulses for histogram
-rst
Resets collecting
-nos <number>
Sets number adc value for peak detection (default value - 30).
Lower number (for ex 12) - lowest energies peaks collected to histogram.
Other common commands:
spec_sta
Start saving spectra to file
spec_sto
Stop saving spectra to file
alert_sta
Alert mode. Start writing individual spectra if cps > cps * ratio
alert_sto
Alert mode stop.
stat
Show statistics while spectra gathering
rst
send "-rst", "-cal", "-inf"
spd <number>
Set port speed to <number>
quit or exit
Exits terminal
Type serial number to use this device.
""")
if __name__ == '__main__':
helptxt()
parser = argparse.ArgumentParser(
prog='ProgramName',
description='What the program does',
epilog='Text at the bottom of help')
parser.add_argument('file', default='spectrum.csv')
parser.add_argument('-d', '--device', default='')
parser.add_argument('-c', '--csv', action='store_true')
parser.add_argument('-i', '--interpec_csv', action='store_true')
parser.add_argument('-x', '--xml', action='store_true')
parser.add_argument('-a', '--autostart', action='store_true')
parser.add_argument('-v', '--verbose', action='store_true')
args = parser.parse_args()
if args.device != '':
shproto.port.getportbyserialnumber(args.device)
if re.search("^[/\.].*", args.file):
spec_file = args.file
else:
spec_file = spec_dir + args.file
if not re.search("\.csv$", spec_file, flags=re.IGNORECASE):
spec_file += ".csv"
if args.csv:
shproto.dispatcher.csv_out = 1
else:
shproto.dispatcher.csv_out = 0
if args.xml:
shproto.dispatcher.xml_out = 1
else:
shproto.dispatcher.xml_out = 0
if args.interpec_csv:
shproto.dispatcher.csv_out = 1
shproto.dispatcher.interspec_csv = 1
else:
shproto.dispatcher.interspec_csv = 0
if args.autostart:
autostart = 1
else:
autostart = 0
if args.verbose:
shproto.dispatcher.verbose = 1
else:
shproto.dispatcher.verbose = 0
print("Found devices: {}".format(shproto.port.getallportsastext()))
dispatcher = threading.Thread(target=shproto.dispatcher.start)
dispatcher.start()
time.sleep(1)
spec = threading.Thread(target=shproto.dispatcher.process_01, args=(spec_file,))
shproto.dispatcher.spec_stopflag = 1
alert = threading.Thread(target=shproto.alert.alertmode, args=(spec_dir, 1.5,))
shproto.alert.alert_stop = 1
command = ""
auto_command = ""
if autostart:
auto_command = "spec_sta"
while True:
if auto_command != "":
command = auto_command
auto_command = ""
else:
command = input(">> ")
if command == "exit" or command == "quit":
shproto.dispatcher.stop()
exit(0)
else:
if command == "help":
helptxt()
continue
if command == "rst":
shproto.dispatcher.process_03("-rst")
time.sleep(2)
with shproto.dispatcher.hide_next_responce_lock:
shproto.dispatcher.hide_next_responce = True
shproto.dispatcher.process_03("-cal")
time.sleep(2)
shproto.dispatcher.process_03("-inf")
continue
if command == "spec_sta":
with shproto.dispatcher.hide_next_responce_lock:
shproto.dispatcher.hide_next_responce = True
shproto.dispatcher.process_03("-cal")
time.sleep(2)
shproto.dispatcher.process_03("-inf")
time.sleep(1)
shproto.dispatcher.process_03("-sta")
if shproto.dispatcher.spec_stopflag == 0:
print("Collecting thread allready running")
continue
spec.start()
continue
if command == "spec_sto":
shproto.dispatcher.spec_stop()
spec = threading.Thread(target=shproto.dispatcher.process_01, args=(spec_file,))
continue
if command == "alert_sta":
if shproto.alert.alert_stop == 0:
print("Alert thread allready running")
continue
alert.start()
continue
if command == "alert_sto":
shproto.alert.stop()
alert = threading.Thread(target=shproto.alert.alertmode, args=(spec_dir, 1.5,))
continue
m = re.search("^(spd|speed)\s+(\S+)", command)
if m is not None and len(m.groups()) == 2:
shproto.port.port_speed = m.group(2)
print("port speed set to {}... reconnect".format(shproto.port.port_speed))
shproto.dispatcher.stop()
time.sleep(1)
with shproto.dispatcher.stopflag_lock:
shproto.dispatcher.stopflag = 0
dispatcher = threading.Thread(target=shproto.dispatcher.start)
dispatcher.start()
time.sleep(1)
continue
if command in shproto.port.getallportssn() or re.match("^/", command):
print("Connect to device: {}".format(shproto.port.getportbyserialnumber(command)))
shproto.dispatcher.stop()
time.sleep(1)
with shproto.dispatcher.stopflag_lock:
shproto.dispatcher.stopflag = 0
dispatcher = threading.Thread(target=shproto.dispatcher.start)
dispatcher.start()
time.sleep(1)
continue
if command == "stat":
if shproto.dispatcher.total_pkts == 0:
percent = 0
else:
percent = round(100 * shproto.dispatcher.dropped / shproto.dispatcher.total_pkts, 2)
print(
"Histograms 0x01: {}, Commands 0x03: {}, Commands 0x04: {}, Total packets: {},"
" Dropped packets: {} ({})%"
.format(
shproto.dispatcher.pkts01,
shproto.dispatcher.pkts03,
shproto.dispatcher.pkts04,
shproto.dispatcher.total_pkts,
shproto.dispatcher.dropped,
percent
)
)
print("Total time: {}, cps: {}, cpu_load: {}, lost_imp: {}".format(shproto.dispatcher.total_time,
shproto.dispatcher.cps,
shproto.dispatcher.cpu_load,
shproto.dispatcher.lost_impulses))
else:
shproto.dispatcher.process_03(command)