-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDataSet.py
96 lines (72 loc) · 3.29 KB
/
DataSet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""Supplies data for the AT model."""
import pandas as pd
#from sklearn.decomposition import PCA
import glob
import random
class DataSet:
"""Supplies data for the AT model."""
def __init__(self, maxMoves, useNormalGain=True):
"""Initialize the data set."""
self.useNormalGain = useNormalGain
self.split = 0.8
dfFeatures = self.collectData(maxMoves)
self.splitData(dfFeatures, maxMoves)
def collectData(self, maxMoves, pattern='data/AT/XTotal*.at'):
"""Collect data."""
files = [file for file in glob.glob(pattern)]
# Keep files that do not contain '.price.'.
filesShortList = [file for file in files if file.find('.price.') == -1]
# Generate random integer between zero and len(files)
randomIndex = random.randint(0, len(filesShortList) - 1)
path = filesShortList[randomIndex]
pathY = path.replace(".at", ".price.at")
self.dfY = pd.read_csv(pathY, header=None, names=['historic_close', 'future_close'])
totalNumRows = len(self.dfY.index)
# Get enough for testing.
numRowsRequired = maxMoves * (1 + (1 - self.split))
numRowsRequired = int(numRowsRequired)
# Create a random starting point so that we don't always start at zero.
startIndex = random.randint( 0, totalNumRows - numRowsRequired - 1 )
startIndex = 0
startIndex = random.randint( 0, 60 )
dfFeatures = pd.read_csv(path, header=None, skiprows=startIndex, nrows=maxMoves)
self.dfY = pd.read_csv(pathY, header=None, skiprows=startIndex, nrows=maxMoves, names=['historic_close', 'future_close'])
self.dfY['gain'] = self.dfY['future_close'] / self.dfY['historic_close']
if self.useNormalGain:
self.dfY['score'] = self.dfY['gain']
else:
# Subracting 1 from 'gain' to make losses negative.
self.dfY['score'] = self.dfY['gain'].sub(1)
return dfFeatures
def splitData(self, dfFeatures, maxMoves):
"""Establish the data."""
self.dataSize = len(dfFeatures.index)
#print(f'self.dataSize: {self.dataSize}')
self.trainSize = maxMoves
self.testSize = self.dataSize - self.trainSize
self.train_features = dfFeatures[:self.trainSize]
if len(self.train_features.index) != self.trainSize:
print(f'{len(self.train_features)} != {self.trainSize}')
input('train_features size mismatch with self.trainSize. Press <Enter> to continue')
self.test_features = dfFeatures[self.trainSize:]
if len(self.test_features) != self.testSize:
print(f'{len(self.test_features)} != {self.testSize}')
input('test_features size mismatch with self.testSize. Press <Enter> to continue')
def getFeatures(self, train):
"""Get feature data."""
if train:
return self.train_features
else:
return self.test_features
def getPrices(self, train):
"""Get price data."""
if train:
return self.dfY.head(self.trainSize)
else:
return self.dfY.tail(self.test_features)
def getSize(self, train):
"""Get size of the data."""
if train:
return self.trainSize
else:
return self.test_features