From 05463d4fe67200e138448b8fd490a3b84357b062 Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Mon, 13 Oct 2014 13:54:15 +0100 Subject: [PATCH] progress commit --- .gitignore | 4 +- dbin.js | 68 +++++++++++++++++++++++++ index.js | 19 +------ osm.js | 16 ++++++ package.json | 5 ++ reset.sh | 23 +++++++++ source.sh | 3 ++ src/Batch.js | 39 ++++++++++++++ src/BatchManager.js | 120 ++++++++++++++++++++++++++++++++++++++++++++ src/HealthCheck.js | 103 +++++++++++++++++++++++++++++++++++++ src/Task.js | 14 ++++++ src/client.js | 11 ++++ src/sink.js | 24 +++++++++ src/stats.js | 36 +++++++++++++ src/transaction.js | 96 +++++++++++++++++++++++++++++++++++ 15 files changed, 562 insertions(+), 19 deletions(-) create mode 100644 dbin.js create mode 100644 osm.js create mode 100755 reset.sh create mode 100755 source.sh create mode 100644 src/Batch.js create mode 100644 src/BatchManager.js create mode 100644 src/HealthCheck.js create mode 100644 src/Task.js create mode 100644 src/client.js create mode 100644 src/sink.js create mode 100644 src/stats.js create mode 100644 src/transaction.js diff --git a/.gitignore b/.gitignore index b512c09..9d542fa 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ -node_modules \ No newline at end of file +node_modules +*.log +*.pbf \ No newline at end of file diff --git a/dbin.js b/dbin.js new file mode 100644 index 0000000..bc5bb7e --- /dev/null +++ b/dbin.js @@ -0,0 +1,68 @@ + +var sink = require('./src/sink')(); + +// var fs = require('fs'), +// readline = require('readline'); + +// var rd = readline.createInterface({ +// input: process.stdin, +// output: process.stdout, +// terminal: false +// }); + +var byline = require('byline'); +var through = require('through2'); + +// rd.on('line', function(line) { + +process.stdin.on( 'error', function(){ + console.error('something fucked up', arguments); + process.exit(1); +}); + +process.stdin + .pipe( byline.createStream() ) + .pipe( through.obj(function( line, _, next ){ + + var chunk; + // process.stdout.write('.'); + + try { + line = line.toString('utf8'); + chunk = JSON.parse( line ); + // console.log( chunk ); + } + catch( e ){ + console.error('-----------------'); + console.error('-----------------'); + console.error('line>',line,'= this._current.free() ){ + debug( 'ENQUEUE!' ); + this._next(); + } +}; + +// flush batch +BatchManager.prototype._flush = function(){ + if( this._queued.length ){ + var batch = this._queued.shift(); + this._transient++; // record active transactions + + // perform the transaction + var _self = this; + transaction( client )( batch, function( err ){ + this._transient--; + + if( err ){ + stats.inc( 'batch_error', 1 ); + console.error( 'transaction error', err ); + } + + if( !err ){ + stats.inc( 'indexed', batch._slots.length ); + + var types = { node: 0, way: 0, relation: 0 }; + + batch._slots.forEach( function( task ){ + types[ task.data.type ]++; + }); + + stats.inc( 'batch_retries', batch.retries ); + stats.inc( 'nodes', types.node ); + stats.inc( 'ways', types.way ); + stats.inc( 'relations', types.relation ); + } + + // console.log( 'batch complete', err, batch._slots.length ); + debug( 'transaction returned', err || 'ok!' ); + this._end(); + }.bind(this)); + } +}; + +BatchManager.prototype._end = function(){ + // debug('try end', this._queued.length && !this._current._slots.length); + if( !this._queued.length && !this._transient && !this._current._slots.length ){ + debug( 'END!' ); + client.close(); + stats.end(); + hc.end(); + } +}; + +// add an item to the current batch +BatchManager.prototype.push = function( item, next ){ + // debug( 'BatchManager push' ); + this._enqueue(); // enqueue current batch if full + this._current.push( item ); + + // accept more data + // if( this._queued.length < this._total ){ + // debug( 'MOAR!' ); + return next(); + // } + + // pause pipeline until queue empties + // debug( 'PAWS' ); + // this._pause = next; +}; + +// resume paused pipeline +BatchManager.prototype.resume = function(){ + if( 'function' == typeof this._pause ){ + if( this._queued.length < this._total ){ + debug( 'UNPAWS' ); + this._pause(); + this._pause = null; + return true; + } + } + return false; +}; + +module.exports = BatchManager; \ No newline at end of file diff --git a/src/HealthCheck.js b/src/HealthCheck.js new file mode 100644 index 0000000..09a872e --- /dev/null +++ b/src/HealthCheck.js @@ -0,0 +1,103 @@ + +var fields = 'host,ip,bulk.active,bulk.queue,bulk.rejected'; + +function HealthCheck( client ){ + this._client = client; + this._interval = undefined; + this._status = { threadpool: new ThreadpoolStatus() }; + this.code = HealthCheck.code['UNKNOWN']; + + this.start(); +} + +// var grant = 'host ip bulk.active bulk.queue bulk.rejected\n'; +// grant += 'elasticsearch13.localdomain 127.0.1.1 0 0 0\n'; +// grant += 'elasticsearch14.localdomain 127.0.1.1 0 0 0\n'; +// grant += 'elasticsearch5.localdomain 127.0.1.1 0 0 686\n'; +// grant += 'elasticsearch1.localdomain 127.0.1.1 0 0 814\n'; +// grant += 'elasticsearch2.localdomain 127.0.1.1 0 0 829\n'; +// grant += 'elasticsearch6.localdomain 127.0.1.1 0 0 641\n'; +// grant += 'elasticsearch8.localdomain 127.0.1.1 0 0 0\n'; +// grant += 'elasticsearch3.localdomain 127.0.1.1 0 0 1201\n'; +// grant += 'elasticsearch4.localdomain 127.0.1.1 0 0 994\n'; +// grant += 'elasticsearch7.localdomain 127.0.1.1 0 0 20'; + +HealthCheck.code = { + 'UNKNOWN': 0, + 'CONTINUE': 1, + 'BACKOFF': 2, + 'STOP': 3, +}; + +HealthCheck.prototype.start = function(){ + this._interval = setInterval( this.probe.bind(this), 500 ); +}; + +HealthCheck.prototype.end = function(){ + clearInterval( this._interval ); +}; + +HealthCheck.prototype.probe = function(){ + this._client.cat.threadPool( { v: true, h: fields }, function( method, body, status ){ + if( body ){ + this._status.threadpool = new ThreadpoolStatus( body ); + this.evaluate(); + } + }.bind(this)); +}; + +// flood: allow x times as many batches in the queue as are currently active +// recover: allow x times as many batches in the queue as are currently active +HealthCheck.threshhold = { + flood: 8, + recover: 2 +}; + +HealthCheck.prototype.evaluate = function(){ + var magnitude = { max: 0, min: 999 }; + this._status.threadpool.nodes.forEach( function( node ){ + var mag = Math.ceil( node['bulk.queue'] / node['bulk.active'] ); + if( mag > magnitude.max ){ magnitude.max = mag; } + if( mag < magnitude.min ){ magnitude.min = mag; } + }); + + if( magnitude.max >= HealthCheck.threshhold.flood ){ + this.code = HealthCheck.code[ 'BACKOFF' ]; + } else if( this.code == HealthCheck.code[ 'BACKOFF' ] && magnitude.max <= HealthCheck.threshhold.recover ){ + this.code = HealthCheck.code[ 'CONTINUE' ]; + } else { + this.code = HealthCheck.code[ 'CONTINUE' ]; + } + + console.log( 'HealthCheck', this.code ); +}; + +/** +[{ + 'host': 'mini', + 'ip': '127.0.1.1', + 'bulk.active': 8, + 'bulk.queue': 57, + 'bulk.rejected': 10932, + 'index.active': 0, + 'index.queue': 0, + 'index.rejected': 0, + 'search.active': 0, + 'search.queue': 0, + 'search.rejected': 0 +}] +**/ +function ThreadpoolStatus( body ){ + var lines = ( body || '' ).trim().split('\n'); + var headers = lines.shift().trim().split(/\s+/); + this.nodes = lines.map( function( line ){ + var cols = line.trim().split(/\s+/); + var node = {}; + headers.forEach( function( header, i ){ + node[ header ] = ( i > 1 ) ? parseInt( cols[ i ], 10 ) : cols[ i ]; + }); + return node; + }); +} + +module.exports = HealthCheck; \ No newline at end of file diff --git a/src/Task.js b/src/Task.js new file mode 100644 index 0000000..d9fe85a --- /dev/null +++ b/src/Task.js @@ -0,0 +1,14 @@ + +function Task( record ){ + this.status = 999; + this.data = record.data; + this.cmd = { + index: { + _index: record._index, + _type: record._type, + _id: record._id + } + }; +} + +module.exports = Task; \ No newline at end of file diff --git a/src/client.js b/src/client.js new file mode 100644 index 0000000..cfaf9ed --- /dev/null +++ b/src/client.js @@ -0,0 +1,11 @@ + +var elasticsearch = require('elasticsearch'), + settings = require('pelias-config').generate(); + +module.exports = function(){ + + // Create new esclient with settings + var client = new elasticsearch.Client( settings.export().esclient || {} ); + + return client; +}; \ No newline at end of file diff --git a/src/sink.js b/src/sink.js new file mode 100644 index 0000000..62fbf3f --- /dev/null +++ b/src/sink.js @@ -0,0 +1,24 @@ + +var through = require('through2'), + BatchManager = require('./BatchManager'); + +function streamFactory(){ + + var manager = new BatchManager(); + + var stream = through.obj( function( item, enc, next ){ + + // push to batch manager + manager.push( item, next ); + + }, function(){ + + // clear remaining partial batch + manager._next(); + + }); + + return stream; +} + +module.exports = streamFactory; \ No newline at end of file diff --git a/src/stats.js b/src/stats.js new file mode 100644 index 0000000..e5ed63b --- /dev/null +++ b/src/stats.js @@ -0,0 +1,36 @@ + +function Stats(){ + this.data = { start: new Date().getTime() }; +} + +Stats.prototype.start = function(){ + this.end(); + this.interval = setInterval( function(){ + if( this.data.indexed ){ + if( this.lastIndexCount ){ + var indexPerSec = this.data.indexed - this.lastIndexCount; + this.data.persec = indexPerSec; + } + this.lastIndexCount = this.data.indexed; + } + this.flush(); + }.bind(this), 1000); +}; + +Stats.prototype.flush = function(){ + console.log( JSON.stringify( this.data, null, 2 ) ); +}; + +Stats.prototype.end = function(){ + this.flush(); + clearInterval( this.interval ); +}; + +Stats.prototype.inc = function( key, num ){ + if( !this.data.hasOwnProperty(key) ){ + this.data[key] = 0; + } + this.data[key] += num; +}; + +module.exports = new Stats(); \ No newline at end of file diff --git a/src/transaction.js b/src/transaction.js new file mode 100644 index 0000000..68cea0f --- /dev/null +++ b/src/transaction.js @@ -0,0 +1,96 @@ + +var max_retries = 5; +var o = 0; + +function wrapper( client ){ + + var transactionId = ++o; + + function transaction( batch, cb ){ + + // reached max retries + if( batch.retries >= max_retries ){ + return cb( 'reached max retries' ); + } + + // reseve some memory for the bulk index body + var payload = []; + + // map task object to bulk index format + batch._slots.forEach( function( task ){ + // filter only tasks that havn't been saved already + if( task.status > 201 ){ + payload.push( task.cmd, task.data ); + } + }); + + // invalid bulk body length + // @optimistic this should never happen + if( !payload.length ){ + console.log( JSON.stringify( payload, null, 2 ) ); + return cb( 'invalid bulk payload length' ); + } + + // console.log(transactionId, 'payload length', payload.length); + + // perform bulk operation + client.bulk( { body: payload }, function( err, resp ){ + + // major error + if( err ){ + console.log( 'esclient error', err ); + batch.status = 500; + } + + // response does not contain items + if( !resp || !resp.items ){ + console.error( 'invalid resp from es bulk index operation' ); + batch.status = 500; + } + + // update batch items with response status + else { + resp.items.forEach( function( item, i ){ + + var action = item.hasOwnProperty('create') ? item.create : item.index; + + var task = batch._slots[i]; + task.status = parseInt( action.status, 10 ) || 888; + // console.log( 'set task status', task.status, JSON.stringify( action, null, 2 ) ); + + if( task.status > 201 ){ + console.log( '[' + action.status + ']', action.error ); + } + // else { + // delete task.cmd; // reclaim memory + // delete task.data; // reclaim memory + // } + + // set batch status to highest response code + if( batch.status === 999 || task.status > batch.status ){ + batch.status = task.status; + } + }); + } + + // retry batch + if( batch.status > 201 ){ + batch.retries++; + console.log( 'retrying batch', '[' + batch.status + ']' ); + return transaction( batch, cb ); + } + + // done done + return cb( undefined ); + + }); + + // reclaim memory + payload = undefined; + } + + return transaction; + +} + +module.exports = wrapper; \ No newline at end of file