Skip to content
This repository has been archived by the owner on Nov 24, 2020. It is now read-only.

Commit

Permalink
Query utxos from local database (#25)
Browse files Browse the repository at this point in the history
* Add Unspents model

* Add Market abstraction to comsume crawlet deposit event

* Add Unspent abstraction to comsume crawler balance event

* 🧶 Query utxos from datastore and lock them in tradePropose service
  • Loading branch information
tiero authored Apr 29, 2020
1 parent c495f2d commit 05d2205
Show file tree
Hide file tree
Showing 17 changed files with 504 additions and 165 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
dist
.DS_Store
/test/datadir/db
/test/datadir/config.json
# Logs
logs
*.log
Expand Down
2 changes: 1 addition & 1 deletion scripts/clean
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ PARENT_PATH=$(dirname $(cd $(dirname $0); pwd -P))

pushd $PARENT_PATH

rimraf dist test/datadir/db test/datadir/config.json
rimraf dist test/datadir/db

popd
1 change: 0 additions & 1 deletion scripts/test
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ nigiri start --liquid
sleep 10s
yarn clean
export TDEX_DAEMON_PATH=$(pwd)/test/datadir
export EXPLORER=http://localhost:3001
npx jest test --detectOpenHandles

popd
62 changes: 40 additions & 22 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import TradeServer from './grpc/tradeServer';
import OperatorServer from './grpc/operatorServer';
import Config, { ConfigInterface } from './config';
import { initVault, VaultInterface } from './components/vault';
import Crawler, { CrawlerInterface } from './components/crawler';
import Markets, { schemaFromPair } from './models/markets';
import Market from './components/market';
import Unspent from './components/unspent';
import Crawler, { CrawlerInterface, CrawlerType } from './components/crawler';
import { UtxoInterface } from './utils';

class App {
Expand All @@ -23,7 +24,10 @@ class App {
this.logger = createLogger();
this.config = Config();
this.datastore = new DB(this.config.datadir);
this.crawler = new Crawler(this.config.network);
this.crawler = new Crawler(
this.config.network,
this.config.explorer[this.config.network]
);

process.on('SIGINT', async () => {
await this.shutdown();
Expand All @@ -42,32 +46,45 @@ class App {
'crawler.deposit',
async (walletAddress: string, pair: Array<UtxoInterface>) => {
const { market, network } = this.config;
const model = new Markets(this.datastore.markets);

const baseAsset = market.baseAsset[network];
const { baseFundingTx, quoteFundingTx, quoteAsset } = schemaFromPair(
baseAsset,
pair
);

this.logger.info(
`New deposit for market ${quoteAsset} on address ${walletAddress}`
);

await model.updateMarketByWallet(
{ walletAddress },
await Market.fromFundingUtxos(
walletAddress,
pair,
this.datastore.markets,
this.logger,
{
baseAsset,
quoteAsset,
baseFundingTx,
quoteFundingTx,
baseAsset: market.baseAsset[network],
fee: market.fee,
tradable: true,
}
);
}
);

this.crawler.on(
'crawler.balance',
async (walletAddress: string, utxos: Array<UtxoInterface>) => {
await Unspent.fromUtxos(
walletAddress,
utxos,
this.datastore.unspents,
this.logger
);
}
);

const walletOfMarkets = await Market.getWallets(
this.datastore.markets,
this.logger
);
const walletOfFeeAccount = this.vault.derive(
0,
this.config.network,
true
);
this.crawler.startAll(
CrawlerType.BALANCE,
walletOfMarkets.concat(walletOfFeeAccount.address)
);

this.operatorGrpc = new OperatorServer(
this.datastore,
this.vault,
Expand All @@ -79,6 +96,7 @@ class App {
this.datastore,
this.vault,
this.config.network,
this.config.explorer[this.config.network],
this.logger
);
const { grpcTrader, grpcOperator } = this.config;
Expand Down
53 changes: 53 additions & 0 deletions src/components/balance.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import Datastore from 'nedb';
import { isValidUrl, fetchUtxosWithUrl, groupByAsset } from '../utils';
import Unspents from '../models/unspents';

export default class Balance {
constructor(private datastore: Datastore, private explorer: string) {
if (!isValidUrl(this.explorer)) throw new Error('Not a valid explorer url');
}

async fromAsset(walletAddress: string, asset: string) {
let unspents = [];
try {
const model = new Unspents(this.datastore);
unspents = await model.getUnspents({
address: walletAddress,
asset,
spent: false,
});
} catch (error) {
unspents = await fetchUtxosWithUrl(walletAddress, this.explorer);
}

const balance = (unspents as any)
.map((x: { value: any }) => x.value)
.reduce((a: number, b: number) => a + b, 0);

return {
[asset]: { utxos: unspents, balance },
};
}

async fromMarket(
walletAddress: string,
{ baseAsset, quoteAsset }: { baseAsset: string; quoteAsset: string }
) {
let unspents = [];
try {
const model = new Unspents(this.datastore);
unspents = await model.getUnspents({
address: walletAddress,
spent: false,
});
} catch (error) {
unspents = await fetchUtxosWithUrl(walletAddress, this.explorer);
}

const groupedBy = groupByAsset(unspents);
return {
[baseAsset]: groupedBy[baseAsset],
[quoteAsset]: groupedBy[quoteAsset],
};
}
}
79 changes: 53 additions & 26 deletions src/components/crawler.ts
Original file line number Diff line number Diff line change
@@ -1,64 +1,90 @@
import { EventEmitter } from 'events';
import { fetchUtxos, UtxoInterface, isValidNetwork } from '../utils';
import { fetchUtxosWithUrl, UtxoInterface } from '../utils';

export enum CrawlerType {
DEPOSIT = 'DEPOSIT',
BALANCE = 'BALANCE',
}

export interface CrawlerInterface {
running: boolean;
interval: number;
storage: any;
timer: any;

start(address: string): this;
stop(address: string): void;
start(type: string, address: string, interval?: number): this;
startAll(type: string, addresses: Array<string>, interval?: number): void;
stop(type: string, address: string): void;
stopAll(): void;

on(
event: 'crawler.deposit',
listener: (address: string, pair: Array<UtxoInterface>) => void
): this;

on(
event: 'crawler.balance',
listener: (address: string, utxos: Array<UtxoInterface>) => void
): this;
}

export default class Crawler extends EventEmitter implements CrawlerInterface {
running: boolean;
interval: number;
storage: any;
timer: any;

constructor(private network: string, private interval: number = 200) {
constructor(private network: string, private explorer: string) {
super();

if (!isValidNetwork(this.network))
throw new Error('Network not support by the explorer');

this.running = false;
this.interval = this.network === 'liquid' ? 60 * 1000 : 200;
this.storage = {};
this.timer = {};
}

start(address: string) {
if (this.running) return this;
start(type: string, address: string, interval: number = this.interval) {
// It's the first time we run the crawler
if (!this.timer.hasOwnProperty(type))
this.timer = { ...this.timer, [type]: {} };

this.running = true;
// We have already a crwaler running for this address
if (this.timer[type].hasOwnProperty(address)) return this;

this.timer[address] = setInterval(
async () => await this.process(address),
this.interval
);
//eslint-disable-next-line
let processorFunction = () => { };
if (type === CrawlerType.DEPOSIT)
processorFunction = async () => await this.processDeposit(address);
if (type === CrawlerType.BALANCE)
processorFunction = async () => await this.processBalance(address);

this.timer[type][address] = setInterval(processorFunction, interval);

return this;
}

stop(address: string): void {
this.running = false;
clearInterval(this.timer[address]);
delete this.timer[address];
stop(type: string, address: string): void {
clearInterval(this.timer[type][address]);
delete this.timer[type][address];
}

startAll(type: string, addresses: Array<string>, interval?: number): void {
addresses.forEach((a) => this.start(type, a, interval));
}

stopAll(): void {
Object.keys(this.timer).forEach((key) => {
this.stop(key);
Object.keys(this.timer[CrawlerType.DEPOSIT]).forEach((address) => {
this.stop(CrawlerType.DEPOSIT, address);
});
Object.keys(this.timer[CrawlerType.BALANCE]).forEach((address) => {
this.stop(CrawlerType.BALANCE, address);
});
}

private async processBalance(address: string) {
const fetchedUtxos = await fetchUtxosWithUrl(address, this.explorer);
this.emit('crawler.balance', address, fetchedUtxos);
}

private async process(address: string) {
const fetchedUtxos = await fetchUtxos(address, this.network);
private async processDeposit(address: string) {
const fetchedUtxos = await fetchUtxosWithUrl(address, this.explorer);

if (!this.storage.hasOwnProperty(address)) this.storage[address] = [];

Expand All @@ -79,7 +105,8 @@ export default class Crawler extends EventEmitter implements CrawlerInterface {
if (first.asset !== second.asset) {
this.storage[address].push(first, second);
this.emit('crawler.deposit', address, [first, second]);
this.stop(address);
this.stop(CrawlerType.DEPOSIT, address);
this.start(CrawlerType.BALANCE, address);
}
}
}
Expand Down
57 changes: 57 additions & 0 deletions src/components/market.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import Datastore from 'nedb';
import { UtxoInterface } from '../utils';
import Markets from '../models/markets';
import winston from 'winston';

export default class Market {
static async getWallets(
datastore: Datastore,
logger: winston.Logger
): Promise<Array<string>> {
const wallets: string[] | PromiseLike<string[]> = [];
try {
const model = new Markets(datastore);
const markets = await model.getMarkets();
const wallets = markets.map((m) => m.walletAddress);
return wallets;
} catch (ignore) {
logger.error(`Cannot fetch markets from datastore`);
return wallets;
}
}

static async fromFundingUtxos(
walletAddress: string,
fundingUtxos: Array<UtxoInterface>,
datastore: Datastore,
logger: winston.Logger,
{ baseAsset, fee }: { baseAsset: string; fee: number }
) {
const [first, second] = fundingUtxos;
const quoteAsset = first.asset !== baseAsset ? first.asset : second.asset;
const baseFundingTx = first.asset === baseAsset ? first.txid : second.txid;
const quoteFundingTx = first.asset !== baseAsset ? first.txid : second.txid;

try {
const model = new Markets(datastore);
await model.updateMarketByWallet(
{ walletAddress },
{
baseAsset,
quoteAsset,
baseFundingTx,
quoteFundingTx,
fee,
tradable: true,
}
);
logger.info(
`New deposit for market ${quoteAsset} on address ${walletAddress}`
);
} catch (ignore) {
logger.error(
`Error on creation market ${quoteAsset} on address ${walletAddress}`
);
}
}
}
Loading

0 comments on commit 05d2205

Please sign in to comment.