Skip to content
This repository has been archived by the owner on Jul 17, 2021. It is now read-only.

Commit

Permalink
Merge pull request #5 from stockmlbot/memory-leak-fix
Browse files Browse the repository at this point in the history
Fix memory leak at Orderbook emitter
  • Loading branch information
valamidev authored Mar 16, 2020
2 parents 21270d6 + f94aa8c commit 9a93b1d
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 45 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
"description": "",
"main": "./build/index.js",
"scripts": {
"start": "node --experimental-worker ./build/index.js",
"debug": "tsc && node --experimental-worker --inspect=5858 -r ts-node/register ./src/index.ts",
"start": "node ./build/index.js",
"dev": "tsc && node --inspect ./build/index.js",
"test": "jest --config jestconfig.json",
"prebuild": "rimraf build",
"build": "tsc",
Expand Down
93 changes: 53 additions & 40 deletions src/emitter/events/orderbook_emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { Redis, RedisPub } from '../../redis/redis';
import { TableTemplates } from '../../database/queries/enums';

const memoryLimit = 512;
const Orderbooks = {};
const OrderBookExchangeCache: Map<string, any> = new Map();

interface OrderBookDepth {
symbol: string;
Expand All @@ -27,69 +27,82 @@ class OrderbookEmitter {
// Event listeners
logger.verbose('Orderbook Emitter started!');

Emitter.on('Orderbook', (exchange: string, depth: OrderBookDepth) => {
exchange = exchange.toLowerCase();
Emitter.on(
'Orderbook',
async (exchange: string, depth: OrderBookDepth): Promise<void> => {
exchange = exchange.toLowerCase();

if (typeof Orderbooks[exchange] == 'undefined') {
Orderbooks[exchange] = new OrderBookStore(memoryLimit);
}
if (!OrderBookExchangeCache[exchange]) {
OrderBookExchangeCache[exchange] = new OrderBookStore(memoryLimit);
}

const { symbol, asks, bids } = depth;
const { symbol, asks, bids } = depth;

if (Orderbooks[exchange]._symbols.indexOf(symbol) == -1) {
setImmediate(async () => {
if (OrderBookExchangeCache[exchange]._symbols.indexOf(symbol) === -1) {
try {
const tableName = util.orderbookName(exchange, symbol);
const orderbookSnapshot = await Redis.get(tableName);

if (orderbookSnapshot !== null) {
const parsedOrderbookSnapshot = JSON.parse(orderbookSnapshot);

if (parsedOrderbookSnapshot?.ask && parsedOrderbookSnapshot?.bid) {
Orderbooks[exchange].updateOrderBook(symbol, parsedOrderbookSnapshot.ask, parsedOrderbookSnapshot.bid);
OrderBookExchangeCache[exchange].updateOrderBook(symbol, parsedOrderbookSnapshot.ask, parsedOrderbookSnapshot.bid);
}
}
Orderbooks[exchange].updateOrderBook(symbol, asks, bids);
OrderBookExchangeCache[exchange].updateOrderBook(symbol, asks, bids);
} catch (e) {
logger.error('Orderbook snapshot error', e);
logger.error('Orderbook loading error', e);
}
});
} else {
Orderbooks[exchange].updateOrderBook(symbol, asks, bids);
} else {
try {
OrderBookExchangeCache[exchange].updateOrderBook(symbol, asks, bids);

if (typeof Orderbooks[exchange]._data[symbol] != 'undefined') {
const data = Orderbooks[exchange]._data[symbol];
const data = { ...OrderBookExchangeCache[exchange]._data[symbol] };

RedisPub.publish('OrderBookUpdate', JSON.stringify({ exchange, symbol, ask: data.best_ask, bid: data.best_bid }));
if (data.best_ask && data.best_bid) {
await RedisPub.publish('OrderBookUpdate', JSON.stringify({ exchange, symbol, ask: data.best_ask, bid: data.best_bid }));
}
} catch (e) {
logger.error('Orderbook update error', e);
}
}
}
});
},
);

Emitter.on('OrderbookSnapshot', (snapshotTime: number) => {
Object.keys(Orderbooks).forEach(exchange => {
const symbols = Orderbooks[exchange]._symbols;
Emitter.on(
'OrderbookSnapshot',
async (snapshotTime: number): Promise<void> => {
const exchanges = Object.keys(OrderBookExchangeCache);

symbols.forEach((symbol: string) => {
setImmediate(async () => {
const orderbook = Orderbooks[exchange].getOrderBook(symbol);
// Get CCXT standard symbol
const ccxtSymbol = await TradepairQueries.idToSymbol(exchange, symbol);
for (const exchange of exchanges) {
const symbols = OrderBookExchangeCache[exchange]._symbols;

if (ccxtSymbol) {
const tableName = util.orderbookName(exchange, ccxtSymbol);
for (const symbol of symbols) {
try {
const orderbook = { ...OrderBookExchangeCache[exchange].getOrderBook(symbol) };
// Get CCXT standard symbol
const ccxtSymbol = await TradepairQueries.idToSymbol(exchange, symbol);

if (!(await DBQueries.tableCheck(tableName))) {
await DBQueries.createNewTableFromTemplate(TableTemplates.Orderbook, tableName);
}
if (ccxtSymbol) {
const tableName = util.orderbookName(exchange, ccxtSymbol);

if (!(await DBQueries.tableCheck(tableName))) {
await DBQueries.createNewTableFromTemplate(TableTemplates.Orderbook, tableName);
}

await DBQueries.orderbookReplace(tableName, { time: snapshotTime, orderbook });
await DBQueries.orderbookReplace(tableName, { time: snapshotTime, orderbook });

// Store snapshot in redis for 600 sec
Redis.set(tableName, JSON.stringify(orderbook), 'EX', 600);
// Store snapshot in redis for 600 sec
Redis.set(tableName, JSON.stringify(orderbook), 'EX', 600);
}
} catch (e) {
logger.error('Orderbook snapshot error', e);
}
});
});
});
});
}
}
},
);
}
}

Expand Down
7 changes: 4 additions & 3 deletions src/exchange/ws_exchanges/kucoin_ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ const exchangeName = 'kucoin';

const KucoinAPI = require('kucoin-websocket-api');

const client = new KucoinAPI();
// Kucoin things

export const openSocket = (symbol: any) => {
const client = new KucoinAPI();

// eslint-disable-next-line @typescript-eslint/no-unused-vars
const socketTrades = client.MarketMatches(symbol, (trade: any) => {
client.MarketMatches(symbol, (trade: any) => {
trade = {
time: Math.floor(trade.time / 10e5), // Kucoin use ns for timestamp
symbol: trade.symbol,
Expand All @@ -28,7 +29,7 @@ export const openSocket = (symbol: any) => {
});

// eslint-disable-next-line @typescript-eslint/no-unused-vars
const socketOrderbook = client.MarketLevel2(symbol, (depth: any) => {
client.MarketLevel2(symbol, (depth: any) => {
// Kucoin use ns for timestamp
/*
sequenceStart: 1556425985882,
Expand Down

0 comments on commit 9a93b1d

Please sign in to comment.