Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #16 InfluxDB feed, multiple issues. #74

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 70 additions & 52 deletions backtrader/feeds/influxfeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from __future__ import absolute_import, division, print_function, unicode_literals

import backtrader as bt
import backtrader.feed as feed
Expand All @@ -28,72 +27,89 @@

TIMEFRAMES = dict(
(
(bt.TimeFrame.Seconds, 's'),
(bt.TimeFrame.Minutes, 'm'),
(bt.TimeFrame.Days, 'd'),
(bt.TimeFrame.Weeks, 'w'),
(bt.TimeFrame.Months, 'm'),
(bt.TimeFrame.Years, 'y'),
(bt.TimeFrame.Seconds, "s"),
(bt.TimeFrame.Minutes, "m"),
(bt.TimeFrame.Days, "d"),
(bt.TimeFrame.Weeks, "w"),
(bt.TimeFrame.Months, "m"),
(bt.TimeFrame.Years, "y"),
)
)


class InfluxDB(feed.DataBase):
frompackages = (
('influxdb', [('InfluxDBClient', 'idbclient')]),
('influxdb.exceptions', 'InfluxDBClientError')
("influxdb", [("InfluxDBClient", "idbclient")]),
("influxdb.exceptions", "InfluxDBClientError"),
)

params = (
('host', '127.0.0.1'),
('port', '8086'),
('username', None),
('password', None),
('database', None),
('timeframe', bt.TimeFrame.Days),
('startdate', None),
('high', 'high_p'),
('low', 'low_p'),
('open', 'open_p'),
('close', 'close_p'),
('volume', 'volume'),
('ointerest', 'oi'),
("host", "localhost"),
("port", 8086),
("username", None),
("password", None),
("database", None),
("timeframe", bt.TimeFrame.Days),
("high", "high"),
("low", "low"),
("open", "open"),
("close", "close"),
("volume", "volume"),
("openinterest", "oi"),
)

def start(self):
super(InfluxDB, self).start()
try:
self.ndb = idbclient(self.p.host, self.p.port, self.p.username,
self.p.password, self.p.database)
self.ndb = idbclient(
self.p.host,
self.p.port,
self.p.username,
self.p.password,
self.p.database,
)
except InfluxDBClientError as err:
print('Failed to establish connection to InfluxDB: %s' % err)
print("Failed to establish connection to InfluxDB: %s" % err)

tf = '{multiple}{timeframe}'.format(
tf = "{multiple}{timeframe}".format(
multiple=(self.p.compression if self.p.compression else 1),
timeframe=TIMEFRAMES.get(self.p.timeframe, 'd'))
timeframe=TIMEFRAMES.get(self.p.timeframe, "d"),
)

if not self.p.startdate:
st = '<= now()'
if self.p.fromdate and self.p.todate:
tcond = "time >= '{fromdate}' AND time <= '{todate}'".format(
fromdate=self.p.fromdate, todate=self.p.todate
)
elif self.p.fromdate:
tcond = "time >= '{fromdate}'".format(fromdate=self.p.fromdate)
elif self.p.todate:
tcond = "time <= '{todate}'".format(todate=self.p.todate)
else:
st = '>= \'%s\'' % self.p.startdate

# The query could already consider parameters like fromdate and todate
# to have the database skip them and not the internal code
qstr = ('SELECT mean("{open_f}") AS "open", mean("{high_f}") AS "high", '
'mean("{low_f}") AS "low", mean("{close_f}") AS "close", '
'mean("{vol_f}") AS "volume", mean("{oi_f}") AS "openinterest" '
'FROM "{dataname}" '
'WHERE time {begin} '
'GROUP BY time({timeframe}) fill(none)').format(
open_f=self.p.open, high_f=self.p.high,
low_f=self.p.low, close_f=self.p.close,
vol_f=self.p.volume, oi_f=self.p.ointerest,
timeframe=tf, begin=st, dataname=self.p.dataname)
tcond = "time <= now()"

qstr = (
'SELECT FIRST("{open_f}") AS "open", MAX("{high_f}") as "high", MIN("{low_f}") as "low", '
'LAST("{close_f}") AS "close", SUM("{vol_f}") as "volume", SUM("{oi_f}") as "openinterest" '
'FROM "{dataname}" '
"WHERE {tcond} "
"GROUP BY time({timeframe}) fill(none) "
"ORDER BY time ASC".format(
open_f=self.p.open,
high_f=self.p.high,
low_f=self.p.low,
close_f=self.p.close,
vol_f=self.p.volume,
oi_f=self.p.openinterest,
dataname=self.p.dataname,
tcond=tcond,
timeframe=tf,
)
)
try:
dbars = list(self.ndb.query(qstr).get_points())
except InfluxDBClientError as err:
print('InfluxDB query failed: %s' % err)
print("InfluxDB query failed: %s" % err)
dbars = []

self.biter = iter(dbars)

Expand All @@ -103,13 +119,15 @@ def _load(self):
except StopIteration:
return False

self.l.datetime[0] = date2num(dt.datetime.strptime(bar['time'],
'%Y-%m-%dT%H:%M:%SZ'))
self.l.datetime[0] = date2num(
dt.datetime.strptime(bar["time"], "%Y-%m-%dT%H:%M:%SZ")
)

self.l.open[0] = bar['open']
self.l.high[0] = bar['high']
self.l.low[0] = bar['low']
self.l.close[0] = bar['close']
self.l.volume[0] = bar['volume']
self.l.open[0] = bar["open"]
self.l.high[0] = bar["high"]
self.l.low[0] = bar["low"]
self.l.close[0] = bar["close"]
self.l.volume[0] = bar["volume"] if bar["volume"] else 0.0
self.l.openinterest[0] = bar["openinterest"] if bar["openinterest"] else 0.0

return True