-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
263 lines (210 loc) · 8.77 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# Fahrenheit Black Quantitative Trading System
import requests
import pandas as pd
import boto3
from typing import List, Dict, Series
class DataRetrieval:
def __init__(self, api_key):
self.api_key = api_key
def fetch_realtime_data(self, symbol):
url = f'https://api.twelvedata.com/time_series?symbol={symbol}&interval=1d&apikey={self.api_key}'
response = requests.get(url)
if response.status_code == 200:
data = response.json()
df = pd.DataFrame(data['values'])
df['datetime'] = pd.to_datetime(df['datetime'])
df.set_index('datetime', inplace=True)
return df
else:
raise Exception(f"Error fetching real-time data. Status code: {response.status_code}")
def fetch_historical_data(self, symbol, start_date, end_date):
url = f'https://api.twelvedata.com/time_series?symbol={symbol}&interval=1d&start_date={start_date}&end_date={end_date}&apikey={self.api_key}'
response = requests.get(url)
if response.status_code == 200:
data = response.json()
df = pd.DataFrame(data['values'])
df['datetime'] = pd.to_datetime(df['datetime'])
df.set_index('datetime', inplace=True)
return df
else:
raise Exception(f"Error fetching historical data. Status code: {response.status_code}")
# Example Usage
api_key = 'YOUR_API_KEY_HERE' # Replace with your Twelve Data API key
data_retrieval = DataRetrieval(api_key)
# Fetch real-time data for TSLA
realtime_data = data_retrieval.fetch_realtime_data('TSLA')
print("Real-time Data:")
print(realtime_data.head())
# Fetch historical data for TSLA (e.g., from 2022-01-01 to 2022-10-20)
start_date = '2022-01-01'
end_date = '2022-10-20'
historical_data = data_retrieval.fetch_historical_data('TSLA', start_date, end_date)
print("\nHistorical Data:")
print(historical_data.head())
class DataStorage:
"""
DataStorage handles the storage and retrieval of data in CSV format.
It provides functions to save data as a CSV file and upload to an S3 bucket.
"""
def __init__(self, data_path):
self.data_path = data_path
def save_to_csv(self, data, filename):
"""
Saves DataFrame data to a CSV file.
"""
data.to_csv(f"{self.data_path}/{filename}.csv", index=False)
def load_from_csv(self, filename, bucket_name):
"""
Loads data from a CSV file into a DataFrame and stores into an S3 bucket.
"""
df = pd.read_csv(f"{self.data_path}/{filename}.csv")
s3 = boto3.client('s3')
s3.upload_file(f"{self.data_path}/{filename}.csv", bucket_name, f"{filename}.csv")
# Example Usage
data_storage = DataStorage(data_path="/path/to/data")
# Assuming you have fetched historical data using DataRetrieval
historical_data = pd.DataFrame({
'Date': ['2022-01-01', '2022-01-02', '2022-01-03'],
'Open': [700.0, 710.0, 720.0],
'Close': [705.0, 715.0, 725.0]
})
# Save data as CSV and upload to S3 bucket
data_storage.save_to_csv(historical_data, "TSLA_Historical_Data")
data_storage.load_from_csv("TSLA_Historical_Data", "FarBlack")
class DataProcessing:
"""
DataProcessing performs data cleaning and transformation operations.
It provides functions to clean and transform data.
"""
def __init__(self, data):
self.data = data
def clean_data(self):
"""
Cleans and preprocesses the loaded data.
"""
# Assuming 'Date' is the column name for date and 'Close' is the column for closing price
self.data['Date'] = pd.to_datetime(self.data['Date'])
self.data.dropna(inplace=True) # Remove rows with missing values
self.data.reset_index(drop=True, inplace=True)
def transform_data(self):
"""
Applies necessary transformations or calculations to the data.
"""
# Example transformation: Calculate daily returns
self.data['Daily_Return'] = self.data['Close'].pct_change() * 100 # Multiply by 100 for percentage
# Example Usage
data_processing = DataProcessing(data=pd.read_csv('TSLA_Historical_Data.csv'))
# Clean and preprocess the data
data_processing.clean_data()
# Apply necessary transformations
data_processing.transform_data()
# Print the modified DataFrame
print(data_processing.data.head())
# 2. Trading Strategies Module
class StrategyInterface:
"""
StrategyInterface defines the interface for trading strategies.
It provides functions to analyze data and generate trading signals.
"""
def __init__(self, data):
self.data = data
self.signals = None
def analyze(self, short_window=20, long_window=50):
"""
Analyzes data and generates trading signals based on moving average crossover.
Returns a Series containing trading signals (1 for buy, -1 for sell, 0 for hold).
"""
short_ma = self.data['Close'].rolling(window=short_window).mean()
long_ma = self.data['Close'].rolling(window=long_window).mean()
# Generate signals
self.signals = pd.Series(0, index=self.data.index)
self.signals[short_ma > long_ma] = 1 # Buy signal
self.signals[short_ma < long_ma] = -1 # Sell signal
return self.signals
def execute(self):
"""
Executes trading signals and generates orders.
Returns a list of Order objects.
"""
orders = []
# Assuming 'signals' is a Series of trading signals
for date, signal in self.signals.iteritems():
if signal == 1: # Buy signal
orders.append(Order(date, 'BUY', 'TSLA'))
elif signal == -1: # Sell signal
orders.append(Order(date, 'SELL', 'TSLA'))
return orders
class Order:
"""
Represents a trading order.
"""
def __init__(self, date, action, symbol):
self.date = date
self.action = action # 'BUY' or 'SELL'
self.symbol = symbol
def __str__(self):
return f"Date: {self.date}, Action: {self.action}, Symbol: {self.symbol}"
# Example Usage
# Assuming you have historical data loaded in 'historical_data'
strategy_interface = StrategyInterface(historical_data)
# Analyze data and generate trading signals
signals = strategy_interface.analyze()
# Execute trading signals and generate orders
orders = strategy_interface.execute()
# Print the signals and orders
print("Trading Signals:")
print(signals.head())
print("\nOrders:")
for order in orders:
print(order)
class StrategyExecutor:
"""
StrategyExecutor is responsible for running and scaling trading strategies.
It provides functions to run and scale strategies.
"""
def __init__(self, strategy):
self.strategy = strategy
def run_strategy(self):
"""
Runs a specific strategy.
"""
self.strategy.analyze() # Generate trading signals
orders = self.strategy.execute() # Execute signals and generate orders
# Print orders (you can replace this with actual execution logic)
for order in orders:
print(order)
def scale_strategy(self, factor):
"""
Scales a strategy based on performance.
"""
# Placeholder for scaling logic (e.g., adjust parameters, allocate capital, etc.)
pass
# Example Usage
# Assuming you have instantiated StrategyInterface with historical data
strategy_interface = StrategyInterface(historical_data)
# Instantiate StrategyExecutor with the StrategyInterface
strategy_executor = StrategyExecutor(strategy_interface)
# Run the strategy
strategy_executor.run_strategy()
# 10. Continuous Monitoring
def continuous_monitoring():
"""
Continuously monitors the current state of orders, positions, and account details.
"""
while True:
open_orders = BrokerIntegration.query_open_orders()
current_positions = BrokerIntegration.query_positions()
account_details = BrokerIntegration.query_account_details()
# Use this information for continuous monitoring
print("Open Orders:", open_orders)
print("Current Positions:", current_positions)
print("Account Details:", account_details)
# Sleep for a specified interval (e.g., 60 seconds) before checking again
time.sleep(60)
# Example Usage
data_retrieval = DataRetrieval(provider="Interactive Brokers")
data_storage = DataStorage(data_path="/path/to/data")
data_processing = DataProcessing(data=pd.DataFrame())
# ... (Instantiate other modules as needed)
# Continuous Monitoring (runs indefinitely until manually stopped)
continuous_monitoring()