Skip to content

Commit

Permalink
some code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mabels committed May 1, 2011
1 parent 65fe596 commit c6c1e44
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 349 deletions.
9 changes: 4 additions & 5 deletions conf-dispatcher/couch-client.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
/*global Buffer */

var Http = require('http'),
Url = require('url'),
EventEmitter = require('events').EventEmitter,
querystring = require('querystring');

var Http = require('http');
var Url = require('url');
var EventEmitter = require('events').EventEmitter;
var querystring = require('querystring');

var POOL_SIZE = 200; // Maximum number of concurrent connections allowed.
var MAX_DOCS = 1000; // The maximum number of docs to send in a single batch
Expand Down
216 changes: 122 additions & 94 deletions conf-dispatcher/server.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
var http = require('http');
var url = require('url');
var url = require('url');
var util = require('util');
var fs = require('fs');
var fs = require('fs');

var CouchClient = require('./couch-client');

var listen = { host: "127.0.0.1", port: 8124 };

var callStreamie = function(token, fn) {
var streamie = http.request({
host: 'streamie.org',
//host: '172.29.29.222',
//port: 8080,
port: 80,
path: '/user_info',
method: 'GET',
Expand All @@ -28,163 +28,191 @@ var callStreamie = function(token, fn) {
console.error('callStreamie:exception:'+e)
}
})
}).end()
}).end();
}

var getMacAddress = function(address, fn) {
fs.readFile('/proc/net/arp', 'utf8', function(err, data) {
if (err) {
console.error('can not read /proc/net/arp:'+err)
return
console.error('can not read /proc/net/arp:'+err);
return;
}
var lines = data.toString().split("\n")
for(var i = lines.length-1; i >= 0; --i) {
var line = lines[i].split(/\s+/)
if (line[0] == address) {
return fn(line[3])
return fn(line[3]);
}
}
fn()
})
fn();
});
}

var updateStreamie = function(mac, req, ret) {
var updateStreamie = function(mac, req, ret, retryCnt) {
retryCnt = retryCnt || 0;
streamie.get(ret.user_id, function(err, doc) {
if (err) { console.error('couchdb:get:failure:'+err) }
//console.log('mac:'+mac+":"+JSON.stringify(ret)+":"+JSON.stringify(doc))
//console.log(req)
if (err) {
console.error('updateStreamie:couchdb:get:failure:'+err);
}
var client = {
ipv4: req.headers['x-real-ip'] || req.socket.remoteAddress,
ipv4: req.headers['x-real-ip'] || req.socket.remoteAddress,
hwaddr: mac,
useragent: req.headers['user-agent'],
created_at: new Date()
};
if (!doc) {
console.log('NEW-DOC');
console.log('NEW-DOC:'+ret.user_id);
doc = {
_id: ret.user_id,
twitter: ret,
clients: [client]
}
};
} else {
console.log('UPDATE-DOC');
doc.twitter = ret
doc.twitter = ret;
delete doc.completed
var found = false;
for(var i = doc.clients.length-1; i >= 0; --i) {
var r_ip = req.headers['x-real-ip'] || req.socket.remoteAddress;
if (doc.clients[i].ipv4 == r_ip) {
console.log('UPDATE-IPUPDATE');
console.log('UPD-IPUPDATE:'+ret.user_id);
doc.clients[i] = client;
found = true;
break;
}
}
if (!found) {
console.log('UPDATE-ADDCLIENT');
console.log('UPD-ADDCLIENT:'+ret.user_id);
doc.clients.push(client);
}
}
//console.log(JSON.stringify(doc))
streamie.save(doc, function(err, doc) {
if (err) {
console.error('couchdb:save:failure:'+err)
updateStreamie(req, ret)
}
})
})
if (err) {
// retry raise condition
if (retryCnt < 5) {
console.error('updateStreamie:couchdb:save:failure:'+err+":"+retryCnt);
setTimeout(function() { updateStreamie(mac, req, ret, retryCnt + 1); }, 500);
} else {
console.error('updateStreamie:couchdb:save:failure:'+err+":MAX-RETRIED");
}
}
});
});
}


var streamie = CouchClient('http://localhost:5984/streamie')
var streamie = CouchClient('http://localhost:5984/streamie');
streamie.request('PUT', '/streamie', function(err, result) {
http.createServer(function (req, res) {
var dispatch = url.parse(req.url, true)
var dispatch = url.parse(req.url, true);
if (dispatch.pathname == '/authorize') {
callStreamie(dispatch.query['token'], function(ret) {
// res.setEncoding('utf-8')
ret.oauth = dispatch.query['token']
res.writeHead(ret.statusCode+'', {'Content-Type': 'application/javascript'});
callback = dispatch.query['callback'] || 'callback';
res.end(callback + '(' + JSON.stringify(ret) + ')')
if (ret.error) {
callStreamie(dispatch.query['token'], function(ret) {
ret.oauth = dispatch.query['token']
res.writeHead(ret.statusCode+'', {'Content-Type': 'application/javascript'});
callback = dispatch.query['callback'] || 'callback';
res.end(callback + '(' + JSON.stringify(ret) + ')')
if (ret.error) { return; }
getMacAddress(req.headers['x-real-ip'] || req.socket.remoteAddress, function(mac) {
if (mac) { updateStreamie(mac, req, ret); }
});
});
return;
}
getMacAddress(req.headers['x-real-ip'] || req.socket.remoteAddress, function(mac) {
if (mac) { updateStreamie(mac, req, ret) }
})
})
return;
}
}
res.writeHead(404, {'Content-Type': 'text/plain'});
res.end('Weg hier \n');
}).listen(8124, "127.0.0.1");
res.end("Weg hier\n");
}).listen(listen.port, listen.host);

var iptables = function(para, fn) {
var iptables = require('child_process').spawn('sudo', ['/sbin/iptables'].concat(para))
iptables.on('exit', function(code) {
console.log('iptables:'+para.join(' ')+"=>"+code);
~~code && console.log('iptables:'+para.join(' ')+"=>"+code);
fn(code);
});
};

var writeIPTables = function(para, fn, cmds, codes) {
cmds = cmds || ['-D', '-I'];
codes = codes || [];
var cmd = cmds.shift();
if (!cmd) { return true; }
iptables([cmd].concat(para), function(code) {
codes.push(code);
writeIPTables(para, fn, cmds, codes) && fn(codes);
})
return false;
}

var updateIPTables = function(id, rev) {
streamie.get(id, function(err, doc) {
if (err) { console.error('couchdb:get:failure:'+err) }
if (doc._rev != rev) { return }
if (doc.completed && doc.completed.pid == process.pid) { return }
for(var i = doc.clients.length-1; i >= 0; --i) {
var client = doc.clients[i]

// $IPTABLES -t mangle -I FREE_MACS -i $CONF_IF -p all -m mac
// --mac-source c8:bc:c8:4f:d4:66 -s 10.205.0.100 -j MARK --set-mark 0x1205
var iptable = [];
iptable.push('-t');
iptable.push('mangle');
iptable.push('-I');
iptable.push('FREE_MACS');
iptable.push('-p');
iptable.push('all');
iptable.push('-m');
iptable.push('mac');
iptable.push('--mac-source');
iptable.push(client.hwaddr);
iptable.push('-s');
iptable.push(client.ipv4);
iptable.push('-j');
iptable.push('MARK');
iptable.push('--set-mark');
iptable.push('0x1205');
iptables(iptable, function(code) {
client.iptabled = { date: new Date(), exitcode: code, rev: doc._rev }
});
}
doc.completed = { date: new Date(), pid: process.pid, rev: doc._rev };
streamie.save(doc, function(err, doc) {
if (err) {
console.log('updateIPTables failed:'+err)
updateIPTables(id, rev)
}
})
})
var updateIPTables = function(doc, cmds, retryCnt) {
retryCnt = retryCnt || 0;
cmds = ['-D', '-I'];
if (doc._rev != rev) { return; }
if (doc.completed && doc.completed.pid == process.pid) { return; }
var called = 0;
for(var i = doc.clients.length-1; i >= 0; --i) {
var client = doc.clients[i];
// $IPTABLES -t mangle -I FREE_MACS -i $CONF_IF -p all -m mac
// --mac-source c8:bc:c8:4f:d4:66 -s 10.205.0.100 -j MARK --set-mark 0x1205
var iptable = [];
iptable.push('FREE_MACS');
iptable.push('-t', 'mangle');
iptable.push('-p', 'all');
iptable.push('-m', 'mac');
iptable.push('--mac-source', client.hwaddr);
iptable.push('-s', client.ipv4);
iptable.push('-j', 'MARK');
iptable.push('--set-mark', '0x1205');
writeIPTables(iptable, function(codes) {
client.iptabled = { date: new Date(), exitcodes: codes, rev: doc._rev };
if (called++ == doc.clients.length) {
doc.completed = { date: new Date(), pid: process.pid, rev: doc._rev };
streamie.save(doc, function(err, doc) {
if (err) {
if (retryCnt < 5) {
console.error('updateIPTables:couchdb:save:failure:'+err+":"+retryCnt);
setTimeout(function() { updateIPTables(id, rev, retryCnt + 1); }, 500);
} else {
console.error('updateIPTables:couchdb:save:failure:'+err+":MAX-RETRIED");
}
}
})
}
}, cmds);
}
}
iptables(['-t', 'mangle', '-F', 'FREE_MACS'], function(code) {
iptables(['-t', 'mangle', '-A', 'FREE_MACS', '-j', 'RETURN'], function(code) {
streamie.changes(0, function(err, changes) {
var docrevs = {};
streamie.all(function(err, doc) {
if (err) {
console.log('ERROR:streamie:all:'+err);
return;
}
docrevs[doc._id] = doc.rev;
updateIPTables(doc);
});
streamie.changes(-1, function(err, changes) {
if (err) {
console.error('couchdb:changes:failure:'+err)
return
console.error('ERROR:couchdb:changes:'+err);
return;
}
if (changes.deleted) {
console.log('DELETE:'+util.inspect(changes));
return
streamie.request('GET', '/streamie/'+changes.id+'?rev='+changes.rev, function(err, doc) {
updateIPTables(doc, ['-D']);
})
}
for(var i in changes.changes) {
updateIPTables(changes.id, changes.changes[i].rev )
if (docrevs[changes.id] != changes.changes.rev) {
streamie.get(changes.id, function(err, doc) {
if (err) {

return;
}
docrevs[doc._id] = doc._rev;
updateIPTables(doc);
})
}
}
})
})
})
})

console.log('Server running at http://127.0.0.1:8124/');
console.log('Server running at http://'+listen.host+":"+listen.port+'/');
1 change: 0 additions & 1 deletion conf-dispatcher/streamie.json

This file was deleted.

Loading

0 comments on commit c6c1e44

Please sign in to comment.