Skip to content

Commit

Permalink
v2.0.0: support client and server
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoping6688 committed Mar 15, 2017
1 parent 8afeaef commit 5b75029
Show file tree
Hide file tree
Showing 9 changed files with 571 additions and 210 deletions.
46 changes: 41 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# node-socket for client
Based on node.js tcp socket of TLV package structure (Node端基于TLV二进制协议格式进行封装的socket工具库,包括粘包断包处理)
# node-socket
Based on node.js tcp socket of TLV packet structure (Node端基于TLV二进制协议格式进行封装的socket工具库,包括粘包断包处理)


## Install
Expand All @@ -12,8 +12,44 @@ $ npm install --save node-socket
## Usage

```js
const socket = require('node-socket')
socket.connect(port, host, onConnected, onReceived, onClosed)
// for client
const socket = require('node-socket').client
socket.connect(options, onConnected, onReceived)

//=> @see example.js
function onReceived (tag, value) {
switch (tag){
case TAG_FROM_REGIST_SUCCESS:

// send message to server
socket.send(1, { token: '123' })

break;
case TAG_FROM_REGIST_FAIL:

break;
}
}


// for server
const server = require('node-socket').server

var options = {
port: 11000,
timeout: 15000,
heartbeatTag: 0,
heartbeatInterval: 7000,
recreateInterval: 1000
}

server.listen(options, onClientConnected, onClientDisconncted, onClientReceived)

function onClientReceived (tag, value, from) {
// server.broadcast(tag, value) // all
server.broadcast(tag, value, null, from) // all, except from
// server.broadcast(tag, value, clientList, from);
}

//=> @see test_server.js
//=> @see test_client.js
```
53 changes: 0 additions & 53 deletions example.js

This file was deleted.

142 changes: 3 additions & 139 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,140 +1,4 @@
var net = require("net");
var ExBuffer = require("./lib/ExBuffer");

var client = null;
var exBuffer = null;
var connectCallback = null;
var receiveCallback = null;
var closeCallback = null;
var serverPort = null;
var serverHost = null;
var hasShutdown = false;
var debuger = console.log;

function connect(port, host, onConnected, onReceived, onClosed){
close(false);

if (port){
serverPort = port;
}
if (host){
serverHost = host;
}
if (onConnected){
connectCallback = onConnected;
}
if (onReceived){
receiveCallback = onReceived;
}
if (onClosed){
closeCallback = onClosed;
}

client = net.connect(serverPort, serverHost, function(){
debuger('Client connected: ' + serverHost + ":" + serverPort);

if (typeof connectCallback === "function"){
connectCallback();
}

exBuffer = new ExBuffer().int8Tag().uint32Head().bigEndian();
exBuffer.on('data', onReceivePackData);

client.on('data', function(data) {
exBuffer.put(data);
});

client.on('end', function() {
debuger('Client disconnected from server');
if (typeof closeCallback === "function"){
closeCallback();
}
});

client.on('timeout', function() {
debuger('The socket times out.');
setTimeout(reconnect, 1000);
});
});

client.on('error', function(error){
debuger("The socket had an error: " + error.code);
setTimeout(reconnect, 1000);
});

client.on('close', function() {
debuger('The socket closed.');
});

client.setTimeout(15000);
module.exports = {
client: require('./lib/client'),
server: require('./lib/server')
}

function reconnect(){
if (!hasShutdown){
connect();
}
}

function close(shutdown = true) {
hasShutdown = shutdown;
if (client){
client.end();
client.destroy();
client = null;
}
}

function send(cmd, arg){
debuger('[Send] tag: ' + cmd + ' value: ' + (arg ? JSON.stringify(arg) : ''));
if (client){
var data = null, len = 0;
if (arg){
data = JSON.stringify(arg);
len = Buffer.byteLength(data);
}

// TLV pack

// Write the tag: use one byte
var tagBuf = new Buffer(1);
tagBuf.writeInt8(cmd, 0);
client.write(tagBuf);

// Write the length of value: use four bytes
var headBuf = new Buffer(4);
headBuf.writeUInt32BE(len, 0);
client.write(headBuf);

// Then, write the value
if (len > 0){
var bodyBuf = new Buffer(len);
bodyBuf.write(data);
client.write(bodyBuf);
}
} else {
debuger("socket has not yet connected");
}
}

function onReceivePackData(data){
var tag = data.tag;
var value = data.value ? data.value.toString() : null;
debuger('[Received] tag: ' + tag + ' value: ' + value);

if (value){
value = JSON.parse(value);
}

if (typeof receiveCallback === "function"){
receiveCallback(tag, value);
}
}

function setDebuger(value){
debuger = value;
}

module.exports.setDebuger = setDebuger;
module.exports.connect = connect;
module.exports.close = close;
module.exports.send = send;
19 changes: 10 additions & 9 deletions lib/ExBuffer.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var ExBuffer = function (bufferLength) {
var self = this;
var _headLen = 2;
var _endian = 'B';
var _buffer = new Buffer(bufferLength || 512); //Buffer大于8kb 会使用slowBuffer,效率低
var _buffer = Buffer.alloc(bufferLength || 512); //Buffer大于8kb 会使用slowBuffer,效率低
var _readOffset = 0;
var _putOffset = 0;
var _dlen = 0;
Expand Down Expand Up @@ -64,16 +64,16 @@ var ExBuffer = function (bufferLength) {
};

/**
* 送入一端Buffer
* 送入一段Buffer
*/
this.put = function(buffer,offset,len){
this.put = function(buffer, offset, len) {
if (offset == undefined) offset = 0;
if (len == undefined) len = buffer.length - offset;
//buf.copy(targetBuffer, [targetStart], [sourceStart], [sourceEnd])
//当前缓冲区已经不能满足次数数据了
if (len + getLen() > _buffer.length) {
var ex = Math.ceil((len + getLen()) / 1024); //每次扩展1kb
var tmp = new Buffer(ex * 1024);
var tmp = Buffer.alloc(ex * 1024);
var exlen = tmp.length - _buffer.length;
_buffer.copy(tmp);
//fix bug : superzheng
Expand Down Expand Up @@ -109,6 +109,7 @@ var ExBuffer = function (bufferLength) {
buffer.copy(_buffer, _putOffset, offset, offset + len);
_putOffset += len;
}

proc();
};

Expand All @@ -135,8 +136,8 @@ var ExBuffer = function (bufferLength) {
_readOffset += _tagLen;
_dlen = _buffer['readUInt' + (8 * _headLen) + '' + _endian + 'E'](_readOffset);
_readOffset += _headLen;
}else {//
var hbuf = new Buffer(_headLen + _tagLen);
} else {//
var hbuf = Buffer.alloc(_headLen + _tagLen);
var rlen = 0;
for (var i = 0; i < (_buffer.length + _tagLen - _readOffset); i++) {
hbuf[i] = _buffer[_readOffset++];
Expand All @@ -160,7 +161,7 @@ var ExBuffer = function (bufferLength) {
//console.log('_dlen:'+_dlen + ',unreadLen:'+getLen());

if (getLen() >= _dlen) {
var dbuff = new Buffer(_dlen);
var dbuff = Buffer.alloc(_dlen);
if (_readOffset + _dlen > _buffer.length) {
var len1 = _buffer.length - _readOffset;
if (len1 > 0) {
Expand All @@ -177,7 +178,7 @@ var ExBuffer = function (bufferLength) {
_dlen = 0;
if (_tagLen > 0){
//_tagLen = 0;
self.emit("data", {tag:_tag, value:dbuff});
self.emit("data", { tag:_tag, value:dbuff });
} else {
self.emit("data", dbuff);
}
Expand All @@ -188,7 +189,7 @@ var ExBuffer = function (bufferLength) {
} catch(e) {
self.emit("error", e);
}
}else {
} else {
break;
}
}
Expand Down
Loading

0 comments on commit 5b75029

Please sign in to comment.