Skip to content

Commit

Permalink
Add support for automatic chunking with maxPacketSize option
Browse files Browse the repository at this point in the history
  • Loading branch information
goldfire committed Dec 3, 2018
1 parent a2c5932 commit d6eec10
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 4 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ dem.publish('my-channel', {hello: 'world'});
new Democracy({
interval: 1000, // The interval (ms) at which `hello` heartbeats are sent to the other peers.
timeout: 3000, // How long a peer must go without sending a `hello` to be considered down.
maxPacketSize: 508, // Maximum size per packet. If the data exceeds this, it will be chunked.
source: '0.0.0.0:12345', // The IP and port to listen to (usually the local IP).
peers: [], // The other servers/ports you want to communicate with (can be on the same or different server).
weight: Math.random() * Date.now(), // The highest weight is used to determine the new leader. Must be unique for each node.
Expand Down
67 changes: 63 additions & 4 deletions lib/democracy.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ class Democracy extends EventEmitter {
super();

this._nodes = {};
this._chunks = {};

// Merge the passed options with the defaults.
this.options = {
interval: options.interval || 1000,
timeout: options.timeout || 3000,
maxPacketSize: options.maxPacketSize || 508,
source: options.source || '0.0.0.0:12345',
peers: options.peers || [],
weight: options.weight || Math.random() * Date.now(),
Expand Down Expand Up @@ -137,13 +139,38 @@ class Democracy extends EventEmitter {

data.source = `${this.options.source[0]}:${this.options.source[1]}`;

// Adjust the max size by the max size of the chunk wrapper data.
const maxSize = this.options.maxPacketSize;
const chunkSize = maxSize - 52;

// Check if the packet needs to be chunked.
const str = JSON.stringify(data);
let chunks = [];
if (str.length > maxSize) {
const count = Math.ceil(str.length / chunkSize);
const packetId = shortid.generate();

for (let i = 0; i < count; i += 1) {
chunks.push(JSON.stringify({
chunk: str.substr(i * chunkSize, chunkSize),
id: packetId,
c: count,
i,
}));
}
} else {
chunks.push(str);
}

// Data must be sent as a Buffer over the UDP socket.
const msg = Buffer.from(JSON.stringify(data));
chunks = chunks.map(chunk => Buffer.from(chunk));

// Loop through each connect node and send the packet over.
for (let i = 0; i < this.options.peers.length; i += 1) {
if (!id || this._nodes[id].source === `${this.options.peers[i][0]}:${this.options.peers[i][1]}`) {
this.socket.send(msg, 0, msg.length, this.options.peers[i][1], this.options.peers[i][0]);
for (let x = 0; x < chunks.length; x += 1) {
for (let i = 0; i < this.options.peers.length; i += 1) {
if (!id || this._nodes[id].source === `${this.options.peers[i][0]}:${this.options.peers[i][1]}`) {
this.socket.send(chunks[x], 0, chunks[x].length, this.options.peers[i][1], this.options.peers[i][0]);
}
}
}

Expand Down Expand Up @@ -243,6 +270,38 @@ class Democracy extends EventEmitter {
processEvent(msg) {
const data = this.decodeMsg(msg);

// Check if this is a chunk and put in the store.
if (data && data.chunk && data.id) {
// Add the chunk to the buffer.
this._chunks[data.id] = this._chunks[data.id] || [];
this._chunks[data.id].push(data);

// If the buffer is full, combine and process.
if (this._chunks[data.id].length === data.c) {
// Sort the chunks by index.
this._chunks[data.id].sort((a, b) => {
if (a.i < b.i) {
return -1;
}
if (a.i > b.i) {
return 1;
}

return 0;
});

// Merge the data into a single string.
const newData = this._chunks[data.id].reduce((acc, val) => acc + val.chunk, '');
delete this._chunks[data.id];

// Process the data as a buffer.
this.processEvent(Buffer.from(newData));
}

return this;
}

// Validate the data.
if (!data || data.id === this._id) {
return this;
}
Expand Down

0 comments on commit d6eec10

Please sign in to comment.