Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Break 'Exporter' into KafkaStorage and ZookeeperState classes #200

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/blockchains/erc20/erc20_worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';
import { logger } from '../../lib/logger';
import { Exporter } from '../../lib/kafka_storage';
import { KafkaStorage } from '../../lib/kafka_storage';
import { constructRPCClient } from '../../lib/http_client';
import { extendEventsWithPrimaryKey } from './lib/extend_events_key';
import { ContractOverwrite, changeContractAddresses, extractChangedContractAddresses } from './lib/contract_overwrite';
Expand Down Expand Up @@ -59,7 +59,7 @@ export class ERC20Worker extends BaseWorker {
this.allOldContracts = [];
}

async init(exporter?: Exporter) {
async init(exporter?: KafkaStorage) {
this.lastConfirmedBlock = await this.web3Wrapper.getBlockNumber() - this.settings.CONFIRMATIONS;

if (this.settings.EXPORT_BLOCKS_LIST) {
Expand Down
4 changes: 2 additions & 2 deletions src/blockchains/utxo/utxo_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import { logger } from '../../lib/logger';
import { constructRPCClient } from '../../lib/http_client';
import { BaseWorker } from '../../lib/worker_base';
import { Exporter } from '../../lib/kafka_storage';
import { KafkaStorage } from '../../lib/kafka_storage';
import { HTTPClientInterface } from '../../types';


Expand Down Expand Up @@ -33,7 +33,7 @@ export class UTXOWorker extends BaseWorker {
this.client = constructRPCClient(this.NODE_URL, this.RPC_USERNAME, this.RPC_PASSWORD, this.DEFAULT_TIMEOUT);
}

async init(exporter: Exporter) {
async init(exporter: KafkaStorage) {
const blockchainInfo = await this.sendRequestWithRetry('getblockchaininfo', []);
this.lastConfirmedBlock = blockchainInfo.blocks - this.CONFIRMATIONS;
await exporter.initPartitioner((event: any) => event['height']);
Expand Down
30 changes: 15 additions & 15 deletions src/e2e/producer-transaction.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Exporter } from '../lib/kafka_storage';
import { KafkaStorage } from '../lib/kafka_storage';
import Kafka from 'node-rdkafka';
const KAFKA_URL: string = assertStringEnv(process.env.KAFKA_URL);
const KAFKA_TOPIC: string = assertStringEnv(process.env.KAFKA_TOPIC);
Expand Down Expand Up @@ -81,26 +81,26 @@ class TestConsumer {


describe('Producer transactions', function () {
let exporter: Exporter;
let kafkaStorage: KafkaStorage;
let testConsumer: TestConsumer;
let num_messages_test = 3;

beforeEach(function (done) {
this.timeout(20000);

exporter = new Exporter('test-exporter', true, KAFKA_TOPIC);
exporter.connect().then(() => {
kafkaStorage = new KafkaStorage('test-exporter', true, KAFKA_TOPIC);
kafkaStorage.connect().then(() => {
testConsumer = new TestConsumer(KAFKA_TOPIC, num_messages_test);
done();
});
});

afterEach(function (done) {
afterEach(async function (done) {
this.timeout(10000);
exporter.disconnect(() => {
testConsumer.disconnect(function () {
done();
});
await kafkaStorage.disconnect()

testConsumer.disconnect(function () {
done();
});
});

Expand All @@ -109,21 +109,21 @@ describe('Producer transactions', function () {

await testConsumer.waitSubscribed();

await exporter.initTransactions();
await exporter.beginTransaction();
await kafkaStorage.initTransactions();
await kafkaStorage.beginTransaction();

// Do a small delay before starting writing messages, otherwise the consumer is missing them.
// This should not really be needed, because we have received the 'subscribed' event in the
// consumer but there is something I am missing.
setTimeout(async function () {
for (let i = 0; i < num_messages_test; i++) {
exporter.sendDataWithKey({
kafkaStorage.sendDataWithKey({
timestamp: 10000000,
iso_date: new Date().toISOString(),
key: 1
}, 'key', null);
}
await exporter.commitTransaction();
await kafkaStorage.commitTransaction();
}, 2000);

await testConsumer.waitData();
Expand All @@ -132,7 +132,7 @@ describe('Producer transactions', function () {
it('using the \'storeEvents\' function should begin and commit a transaction', async function () {
// We need the huge timeout because starting and closing a transaction takes around 1 sec
this.timeout(10000);
await exporter.initTransactions();
await kafkaStorage.initTransactions();

const testEvent = {
'contract': '0xdac17f958d2ee523a2206206994597c13d831ec7',
Expand All @@ -151,7 +151,7 @@ describe('Producer transactions', function () {

setTimeout(async function () {
for (let i = 0; i < num_messages_test; i++) {
await exporter.storeEvents([testEvent], false);
await kafkaStorage.storeEvents([testEvent], false);
}
}, 1000);

Expand Down
169 changes: 15 additions & 154 deletions src/lib/kafka_storage.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import crypto from 'crypto';
import Kafka, { LibrdKafkaError, ProducerGlobalConfig } from 'node-rdkafka';
import Kafka, { ProducerGlobalConfig } from 'node-rdkafka';
import { BLOCKCHAIN } from './constants';
import ZookeeperClientAsync from './zookeeper_client_async';
import { log_according_to_syslog_level, logger, SYSLOG_LOG_LEVEL } from './logger';


const ZOOKEEPER_URL: string = process.env.ZOOKEEPER_URL || 'localhost:2181';
const ZOOKEEPER_RETRIES: number = parseInt(process.env.ZOOKEEPER_RETRIES || '0');
const ZOOKEEPER_SPIN_DELAY: number = parseInt(process.env.ZOOKEEPER_SPIN_DELAY || '1000');
const ZOOKEEPER_SESSION_TIMEOUT: number = parseInt(process.env.ZOOKEEPER_SESSION_TIMEOUT || '30000');

const FORMAT_HEADER: string = 'format=json;';
const RDKAFKA_DEBUG: string | null = process.env.RDKAFKA_DEBUG || null;
Expand All @@ -19,17 +15,6 @@ const BUFFERING_MAX_MESSAGES: number = parseInt(process.env.BUFFERING_MAX_MESSAG
const TRANSACTIONS_TIMEOUT_MS: number = parseInt(process.env.TRANSACTIONS_TIMEOUT_MS || '60000');
const KAFKA_MESSAGE_MAX_BYTES: number = parseInt(process.env.KAFKA_MESSAGE_MAX_BYTES || '10485760');

process.on('unhandledRejection', (reason: unknown, p: Promise<unknown>): void => {
// Otherwise unhandled promises are not possible to trace with the information logged
if (reason instanceof Error) {
logger.error('Unhandled Rejection at: ', p, 'reason:', reason, 'error stack:', (reason as Error).stack);
}
else {
logger.error('Unhandled Rejection at: ', p, 'reason:', reason);
}
process.exit(1);
});

/**
* A class to pick partition for an event.
*/
Expand Down Expand Up @@ -99,11 +84,10 @@ function castCompression(compression: string): 'none' | 'gzip' | 'snappy' | 'lz4
throw new Error(`Invalid compression value: ${compression}`);
}

export class Exporter {
export class KafkaStorage {
private readonly exporter_name: string;
private readonly producer: Kafka.Producer;
private readonly topicName: string;
private readonly zookeeperClient: ZookeeperClientAsync;
private partitioner: Partitioner | null;

constructor(exporter_name: string, transactional: boolean, topicName: string) {
Expand Down Expand Up @@ -139,14 +123,6 @@ export class Exporter {
log_according_to_syslog_level(log.severity, log.fac, log.message);
});

this.zookeeperClient = new ZookeeperClientAsync(ZOOKEEPER_URL,
{
sessionTimeout: ZOOKEEPER_SESSION_TIMEOUT,
spinDelay: ZOOKEEPER_SPIN_DELAY,
retries: ZOOKEEPER_RETRIES
}
);

this.partitioner = null;
}

Expand All @@ -162,20 +138,13 @@ export class Exporter {
/**
* @returns {Promise} Promise, resolved on connection completed.
*/
async connect() {
async connect(): Promise<void> {
logger.info(`Connecting to zookeeper host ${ZOOKEEPER_URL}`);

try {
await this.zookeeperClient.connectAsync();
}
catch (ex) {
console.error('Error connecting to Zookeeper: ', ex);
throw ex;
}

logger.info(`Connecting to kafka host ${KAFKA_URL}`);
const promise_result = new Promise((resolve, reject) => {
this.producer.on('ready', resolve);
const promise_result = new Promise<void>((resolve, reject) => {
this.producer.on('ready', () => resolve());
this.producer.on('event.error', reject);
// The user can provide a callback for delivery reports with the
// dedicated method 'subscribeDeliveryReports'.
Expand All @@ -190,131 +159,23 @@ export class Exporter {
}

/**
* Disconnect from Zookeeper and Kafka.
* Disconnect from Kafka.
* This method is completed once the callback is invoked.
*/
disconnect(callback?: () => void) {
logger.info(`Disconnecting from zookeeper host ${ZOOKEEPER_URL}`);
this.zookeeperClient.closeAsync().then(() => {
if (this.producer.isConnected()) {
logger.info(`Disconnecting from kafka host ${KAFKA_URL}`);
this.producer.disconnect(callback);
}
else {
logger.info(`Producer is NOT connected to kafka host ${KAFKA_URL}`);
}
});
}

async getLastPosition() {
if (await this.zookeeperClient.existsAsync(this.zookeeperPositionNode)) {
const previousPosition = await this.zookeeperClient.getDataAsync(
this.zookeeperPositionNode
);

try {
if (Buffer.isBuffer(previousPosition && previousPosition.data)) {
const value = previousPosition.data.toString('utf8');

if (value.startsWith(FORMAT_HEADER)) {
return JSON.parse(value.replace(FORMAT_HEADER, ''));
} else {
return previousPosition.data;
}
}
} catch (err) {
logger.error(err);
}
}

return null;
}

async getLastBlockTimestamp() {
if (await this.zookeeperClient.existsAsync(this.zookeeperTimestampNode)) {
const previousPosition = await this.zookeeperClient.getDataAsync(
this.zookeeperTimestampNode
);

try {
if (Buffer.isBuffer(previousPosition && previousPosition.data)) {
const value = previousPosition.data.toString('utf8');

if (value.startsWith(FORMAT_HEADER)) {
return JSON.parse(value.replace(FORMAT_HEADER, ''));
} else {
return previousPosition.data;
}
}
} catch (err) {
logger.error(err);
}
async disconnect(): Promise<void> {
if (!this.producer.isConnected()) {
logger.info(`Producer is NOT connected to kafka host ${KAFKA_URL}`);
return;
}

return null;
}

async savePosition(position: object) {
if (typeof position !== 'undefined') {
const newNodeValue = Buffer.from(
FORMAT_HEADER + JSON.stringify(position),
'utf-8'
);

if (await this.zookeeperClient.existsAsync(this.zookeeperPositionNode)) {
return this.zookeeperClient.setDataAsync(
this.zookeeperPositionNode,
newNodeValue
);
} else {
return this.zookeeperClient.mkdirpAsync(
this.zookeeperPositionNode,
newNodeValue
);
}
}
}
logger.info(`Disconnecting from kafka host ${KAFKA_URL}`);
const promise_result = new Promise<void>((resolve, reject) => {
this.producer.disconnect(() => resolve());
})

async saveLastBlockTimestamp(blockTimestamp: number) {
if (typeof blockTimestamp !== 'undefined') {
const newNodeValue = Buffer.from(
FORMAT_HEADER + JSON.stringify(blockTimestamp),
'utf-8'
);

if (await this.zookeeperClient.existsAsync(this.zookeeperTimestampNode)) {
return this.zookeeperClient.setDataAsync(
this.zookeeperTimestampNode,
newNodeValue
);
} else {
return this.zookeeperClient.mkdirpAsync(
this.zookeeperTimestampNode,
newNodeValue
);
}
}
await promise_result;
}

async sendData(events: Array<any>) {
if (events.constructor !== Array) {
events = [events];
}

events = events.map(
event => (typeof event === 'object' ? JSON.stringify(event) : event)
);
events.forEach(event => {
this.producer.produce(this.topicName, null, Buffer.from(event));
});

return new Promise<void>((resolve, reject) =>
this.producer.flush(KAFKA_FLUSH_TIMEOUT, (err: LibrdKafkaError) => {
if (err) return reject(err);
resolve();
})
);
}

async sendDataWithKey(events: object | Array<object>, keyField: string, signalRecordData: object | null) {
const arrayEvents: Array<object> = (events.constructor !== Array) ? [events] : events
Expand Down
4 changes: 2 additions & 2 deletions src/lib/worker_base.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';
import { logger } from './logger';
import { Exporter } from './kafka_storage';
import { KafkaStorage } from './kafka_storage';
import { ExporterPosition } from '../types'

export class BaseWorker {
Expand Down Expand Up @@ -34,7 +34,7 @@ export class BaseWorker {
throw new Error('"work" method need to be overriden');
}
// To be implemented on inheritance.
async init(_exporter: Exporter) {
async init(_exporter: KafkaStorage) {
throw new Error('"init" method need to be overriden');
}

Expand Down
Loading