forked from john-38787364/antifier
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtrainer.py
125 lines (115 loc) · 4.76 KB
/
trainer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import usb.core, os
global reslist, trainer_type, possfov
#assume is imagic wheel unit (fixed return values) first
print "Assuming fixed resistance return value from trainer"
possfov=[1039, 1299, 1559, 1819, 2078, 2338, 2598, 2858, 3118, 3378, 3767, 4027, 4287, 4677]#possible force values to be recv from device
reslist=[1900, 2030, 2150, 2300, 2400, 2550, 2700, 2900, 3070, 3200, 3350, 3460, 3600, 3750]#possible resistance value to be transmitted to device
def fromcomp(val,bits):
if val>>(bits-1) == 1:
return 0-(val^(2**bits-1))-1
else: return val
def send(dev_trainer, resistance_level, pedecho=0):
global reslist
if 'reslist' in globals():#if not a simulation
r6=int(reslist[resistance_level])>>8 & 0xff #byte6
r5=int(reslist[resistance_level]) & 0xff #byte 5
byte_ints = [0x01, 0x08, 0x01, 0x00, r5, r6, pedecho, 0x00 ,0x02, 0x52, 0x10, 0x04]
byte_str = "".join(chr(n) for n in byte_ints)
try:
dev_trainer.write(0x02, byte_str, 30)#send data to device
except Exception, e:
print "TRAINER WRITE ERROR", str(e)
def receive(dev_trainer):
global trainer_type, possfov, reslist
try:
data = dev_trainer.read(0x82, 64, 30)
except Exception, e:
if "timeout error" in str(e):#trainer did not return any data
pass
else:
print "TRAINER READ ERROR", str(e)
return "Not Found", False, False, False, False
#if trainer_type == 0x1932:
#if len(data)>40:
#fs = int(data[33])<<8 | int(data[32])
#speed = round(fs/2.8054/100,1)#speed kph
#pedecho = data[42]
#heart_rate = int(data[12])
#cadence = int(data[44])
#force = fromcomp((data[39]<<8)|data[38],16)
#if force == 0:
#force = 1039
#force_index = possfov.index(force)
#return speed, pedecho, heart_rate, force_index, cadence
#else:
#return "Not Found", False, False, False, False
#elif trainer_type == 0x1942:#[0x0b,0x17,0x00,0x00,0x08,0x01,0x00,0x00,0x06,0x00,0x80,0xf8,0x00,0x00,0x00,0x00,0x59,0x02,0xdc,0x03,0xd0,0x07,0xd0,0x07,0x03,0x13,0x02,0x00,0x28,0x2b,0x00,0x00,0x00,0x00,0x28,0x63,0x00,0x00,0x00,0x00,0x41,0xf3,0x00,0x00,0x00,0x00,0x02,0xaa]
if len(data)>40:
fs = int(data[33])<<8 | int(data[32])
speed = round(fs/2.8054/100,1)#speed kph
pedecho = data[42]
heart_rate = int(data[12])
cadence = int(data[44])
force = fromcomp((data[39]<<8)|data[38],16)
#print force, possfov
#autodetect resistance return
#force = 0 could be either
if possfov[0] != 0 and force==0:#still fixed values, alter to minum force
force = 1039
if force not in possfov:#not an imagic fixed value return- find closest value
if possfov[0] != 0:
print "Found variable resistance return value from trainer"
reslist = [0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000, 13000]#possible resistance value to be transmitted to device
possfov = [0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000, 13000]
#find value with minimum distance to reported force
force = min(possfov, key=lambda x:abs(x-force))
force_index = possfov.index(force)
return speed, pedecho, heart_rate, force_index, cadence
else:
return "Not Found", False, False, False, False
def get_trainer():
global trainer_type, reslist, possfov
trainer_type = 0
idpl = [0x1932, 0x1942, 0xe6be]#iflow, fortius, uninitialised fortius
for idp in idpl:
dev = usb.core.find(idVendor=0x3561, idProduct=idp) #find trainer USB device
if dev != None:
trainer_type = idp
break
if trainer_type == 0:
return False
else:#found trainer
if trainer_type == 0xe6be:#unintialised 1942
print "Found uninitialised 1942 head unit"
try:
os.system("fxload-libusb.exe -I FortiusSWPID1942Renum.hex -t fx -vv")#load firmware
print "Initialising head unit, please wait 5 seconds"
time.sleep(5)
dev = usb.core.find(idVendor=0x3561, idProduct=0x1942)
if dev != None:
print "1942 head unit initialised"
trainer_type = 0x1942
else:
print "Unable to load firmware"
return False
except :#not found
print "Unable to initialise trainer"
return False
print hex(trainer_type)
dev.set_configuration()
return dev
def initialise_trainer(dev):
byte_ints = [2,0,0,0] # will not read cadence until initialisation byte is sent
byte_str = "".join(chr(n) for n in byte_ints)
dev.write(0x02,byte_str)
def parse_factors(filename):
temp = open(filename,'r').read().split('\n')
rtn ={}
for l in temp:
l=l.split("#")#get rid of comments
l=l[0].split(":")
if len(l)==2:
vals = l[1].split(",")
if len(vals)==2:
rtn[float(l[0])]=[float(vals[0]), float(vals[1])]
return rtn