Skip to content

Commit

Permalink
progress commit
Browse files Browse the repository at this point in the history
  • Loading branch information
missinglink committed Oct 13, 2014
1 parent 60e6c08 commit 05463d4
Show file tree
Hide file tree
Showing 15 changed files with 562 additions and 19 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
node_modules
node_modules
*.log
*.pbf
68 changes: 68 additions & 0 deletions dbin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@

var sink = require('./src/sink')();

// var fs = require('fs'),
// readline = require('readline');

// var rd = readline.createInterface({
// input: process.stdin,
// output: process.stdout,
// terminal: false
// });

var byline = require('byline');
var through = require('through2');

// rd.on('line', function(line) {

process.stdin.on( 'error', function(){
console.error('something fucked up', arguments);
process.exit(1);
});

process.stdin
.pipe( byline.createStream() )
.pipe( through.obj(function( line, _, next ){

var chunk;
// process.stdout.write('.');

try {
line = line.toString('utf8');
chunk = JSON.parse( line );
// console.log( chunk );
}
catch( e ){
console.error('-----------------');
console.error('-----------------');
console.error('line>',line,'<line');
console.error('-----------------');
throw new Error( 'json error: ' + e.message );
}

// chunk.center_pointy = {
// lat: chunk.lat,
// lon: chunk.lon
// };

var index = chunk._index;
var type = chunk._type;
var id = chunk._id;

delete chunk._index;
delete chunk._type;
// delete chunk.id;
delete chunk._id;

sink.write({
_index: index,
_type: type,
_id: id,
data: chunk.data
});

next();

},function( line, _, next ){
sink.end();
}));
19 changes: 1 addition & 18 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,2 @@

var through = require('through2');

function streamFactory(){

var stream = through.obj( function( item, enc, next ){

item.test = 'test1';
this.push(item);

next();

});

return stream;

}

module.exports = streamFactory;
module.exports = require('./src/sink');
16 changes: 16 additions & 0 deletions osm.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

var through = require('through2');
var osm = require('openstreetmap-stream');

// var fs = require('fs');
// var osm = require('osm-pbf-parser');

// fs.createReadStream('/media/hdd/somes.osm.pbf')
// .pipe(osm())
process.stdin
.pipe(osm.parser())
.pipe(through.obj( function( chunk, _, next ){
this.push( JSON.stringify( chunk, '', 0 ) + '\n' );
next();
}))
.pipe(process.stdout);
5 changes: 5 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
"tap-spec": "^0.2.0"
},
"dependencies": {
"byline": "^4.1.1",
"elasticsearch": "^2.4.3",
"openstreetmap-stream": "latest",
"osm-pbf-parser": "^2.1.2",
"pelias-config": "0.0.4",
"through2": "^0.6.1"
}
}
23 changes: 23 additions & 0 deletions reset.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/bash

curl -XDELETE 'localhost:9200/test?pretty=1';
curl -XPUT 'localhost:9200/test?pretty=1';


curl -XPUT 'localhost:9200/test/node/_mapping?pretty=1' -d '{
"node" : {
"properties" : {
"geo" : {
"type": "geo_point",
"lat_lon": true,
"geohash": true,
"geohash_prefix": true,
"geohash_precision": 20,
"fielddata" : {
"format" : "compressed",
"precision" : "3m"
}
}
}
}
}';
3 changes: 3 additions & 0 deletions source.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

cat /media/hdd/somes.osm.pbf | node osm.js;
39 changes: 39 additions & 0 deletions src/Batch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@

var Task = require('./Task');

// Wed Oct 8 16:51:30 BST 2014
// Wed Oct 8 16:53:02 BST 2014


var defaults = {
max: 250 // maximum records per batch
};

function Batch( opts ){
this._max = opts && opts.max || defaults.max;
this._slots = [];
this.retries = 0;
this.status = 999;
}

// how many free slots are left in this batch
Batch.prototype.free = function(){
return this._max - this._slots.length;
};

// add an record to the batch
Batch.prototype.push = function( record ){

// console.log( record );
// process.exit(1);

if( !this.free() ){
console.error( 'batch not free' );
return false;
}
// console.log( 'Batch push', record );
this._slots.push( new Task( record ) );
return true;
};

module.exports = Batch;
120 changes: 120 additions & 0 deletions src/BatchManager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@

var Batch = require('./Batch'),
transaction = require('./transaction'),
client = require('./client')(),
HealthCheck = require('./HealthCheck'),
hc = new HealthCheck( client );

var debug = console.error.bind( console );
var debug = function(){};

var stats = require('./stats');
stats.start();

var defaults = {
total: 5 // maximum batches to queue in memory
};

function BatchManager( opts ){
this._total = opts && opts.total || defaults.total;
this._current = new Batch();
this._queued = [];
this._transient = 0;
this._pause = null;
}

BatchManager.prototype._next = function(){
debug( 'NEXT!' );
this._queued.push( this._current );
this._current = new Batch();

this._flush(); // @todo: make this more intuative
};

// enqueue batch if already full
BatchManager.prototype._enqueue = function(){
if( 0 >= this._current.free() ){
debug( 'ENQUEUE!' );
this._next();
}
};

// flush batch
BatchManager.prototype._flush = function(){
if( this._queued.length ){
var batch = this._queued.shift();
this._transient++; // record active transactions

// perform the transaction
var _self = this;
transaction( client )( batch, function( err ){
this._transient--;

if( err ){
stats.inc( 'batch_error', 1 );
console.error( 'transaction error', err );
}

if( !err ){
stats.inc( 'indexed', batch._slots.length );

var types = { node: 0, way: 0, relation: 0 };

batch._slots.forEach( function( task ){
types[ task.data.type ]++;
});

stats.inc( 'batch_retries', batch.retries );
stats.inc( 'nodes', types.node );
stats.inc( 'ways', types.way );
stats.inc( 'relations', types.relation );
}

// console.log( 'batch complete', err, batch._slots.length );
debug( 'transaction returned', err || 'ok!' );
this._end();
}.bind(this));
}
};

BatchManager.prototype._end = function(){
// debug('try end', this._queued.length && !this._current._slots.length);
if( !this._queued.length && !this._transient && !this._current._slots.length ){
debug( 'END!' );
client.close();
stats.end();
hc.end();
}
};

// add an item to the current batch
BatchManager.prototype.push = function( item, next ){
// debug( 'BatchManager push' );
this._enqueue(); // enqueue current batch if full
this._current.push( item );

// accept more data
// if( this._queued.length < this._total ){
// debug( 'MOAR!' );
return next();
// }

// pause pipeline until queue empties
// debug( 'PAWS' );
// this._pause = next;
};

// resume paused pipeline
BatchManager.prototype.resume = function(){
if( 'function' == typeof this._pause ){
if( this._queued.length < this._total ){
debug( 'UNPAWS' );
this._pause();
this._pause = null;
return true;
}
}
return false;
};

module.exports = BatchManager;
Loading

0 comments on commit 05463d4

Please sign in to comment.