From 5b75029b5f6e5e99b542863ac3cf13bbd8ef3aba Mon Sep 17 00:00:00 2001 From: xiaoping6688 Date: Thu, 16 Mar 2017 03:03:59 +0800 Subject: [PATCH] v2.0.0: support client and server --- README.md | 46 ++++++++- example.js | 53 ---------- index.js | 142 +------------------------ lib/ExBuffer.js | 19 ++-- lib/client.js | 172 +++++++++++++++++++++++++++++++ lib/server.js | 268 ++++++++++++++++++++++++++++++++++++++++++++++++ package.json | 10 +- test_client.js | 34 ++++++ test_server.js | 37 +++++++ 9 files changed, 571 insertions(+), 210 deletions(-) delete mode 100644 example.js create mode 100644 lib/client.js create mode 100644 lib/server.js create mode 100644 test_client.js create mode 100644 test_server.js diff --git a/README.md b/README.md index 1f0b7a3..53551a5 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ -# node-socket for client -Based on node.js tcp socket of TLV package structure (Node端基于TLV二进制协议格式进行封装的socket工具库,包括粘包断包处理) +# node-socket +Based on node.js tcp socket of TLV packet structure (Node端基于TLV二进制协议格式进行封装的socket工具库,包括粘包断包处理) ## Install @@ -12,8 +12,44 @@ $ npm install --save node-socket ## Usage ```js -const socket = require('node-socket') -socket.connect(port, host, onConnected, onReceived, onClosed) +// for client +const socket = require('node-socket').client +socket.connect(options, onConnected, onReceived) -//=> @see example.js +function onReceived (tag, value) { + switch (tag){ + case TAG_FROM_REGIST_SUCCESS: + + // send message to server + socket.send(1, { token: '123' }) + + break; + case TAG_FROM_REGIST_FAIL: + + break; + } +} + + +// for server +const server = require('node-socket').server + +var options = { + port: 11000, + timeout: 15000, + heartbeatTag: 0, + heartbeatInterval: 7000, + recreateInterval: 1000 +} + +server.listen(options, onClientConnected, onClientDisconncted, onClientReceived) + +function onClientReceived (tag, value, from) { + // server.broadcast(tag, value) // all + server.broadcast(tag, value, null, from) // all, except from + // server.broadcast(tag, value, clientList, from); +} + +//=> @see test_server.js +//=> @see test_client.js ``` diff --git a/example.js b/example.js deleted file mode 100644 index c99475f..0000000 --- a/example.js +++ /dev/null @@ -1,53 +0,0 @@ - -var port = 11001; //服务器端口 -var server = "10.3.0.175"; //服务器IP -var token = "123"; //注册token - -var TAG_SEND_REGIST = 1; //客户端注册 -var TAG_SEND_PING = 112; //客户端心跳响应 -var TAG_FROM_REGIST_SUCCESS = 'a'; //客户端注册成功 -var TAG_FROM_REGIST_FAIL = 'b'; //客户端注册失败 -var TAG_FROM_PING = 'p'; //收到服务端心跳指令 -var TAG_FROM_CLOSE = 'c'; //连接被服务端关闭 - -var showDebugData = false; - -var socket = require("./index"); -// socket.setDebuger(appendLog); -socket.connect(port, server, onConnected, onReceived); - -function onConnected () { - socket.send(TAG_SEND_REGIST, { token: token }); -} - -function onReceived (tag, value) { - if (tag != TAG_FROM_PING) - appendLog('[Received] tag: ' + tag + ' value: ' + JSON.stringify(value)); - - switch (tag){ - case TAG_FROM_REGIST_SUCCESS: - - break; - case TAG_FROM_REGIST_FAIL: - - break; - case TAG_FROM_PING: - socket.send(TAG_SEND_PING); - break; - case TAG_FROM_CLOSE: - - break; - } -} - -function appendLog (log) { - if (showDebugData == false) return; - - var debugText = document.getElementById('debugText'); - if (debugText) { - debugText.innerHTML += log + "
"; - debugText.scrollTop = debugText.scrollHeight; - } else { - console.log(log); - } -} diff --git a/index.js b/index.js index 8b3b589..ec408bd 100644 --- a/index.js +++ b/index.js @@ -1,140 +1,4 @@ -var net = require("net"); -var ExBuffer = require("./lib/ExBuffer"); - -var client = null; -var exBuffer = null; -var connectCallback = null; -var receiveCallback = null; -var closeCallback = null; -var serverPort = null; -var serverHost = null; -var hasShutdown = false; -var debuger = console.log; - -function connect(port, host, onConnected, onReceived, onClosed){ - close(false); - - if (port){ - serverPort = port; - } - if (host){ - serverHost = host; - } - if (onConnected){ - connectCallback = onConnected; - } - if (onReceived){ - receiveCallback = onReceived; - } - if (onClosed){ - closeCallback = onClosed; - } - - client = net.connect(serverPort, serverHost, function(){ - debuger('Client connected: ' + serverHost + ":" + serverPort); - - if (typeof connectCallback === "function"){ - connectCallback(); - } - - exBuffer = new ExBuffer().int8Tag().uint32Head().bigEndian(); - exBuffer.on('data', onReceivePackData); - - client.on('data', function(data) { - exBuffer.put(data); - }); - - client.on('end', function() { - debuger('Client disconnected from server'); - if (typeof closeCallback === "function"){ - closeCallback(); - } - }); - - client.on('timeout', function() { - debuger('The socket times out.'); - setTimeout(reconnect, 1000); - }); - }); - - client.on('error', function(error){ - debuger("The socket had an error: " + error.code); - setTimeout(reconnect, 1000); - }); - - client.on('close', function() { - debuger('The socket closed.'); - }); - - client.setTimeout(15000); +module.exports = { + client: require('./lib/client'), + server: require('./lib/server') } - -function reconnect(){ - if (!hasShutdown){ - connect(); - } -} - -function close(shutdown = true) { - hasShutdown = shutdown; - if (client){ - client.end(); - client.destroy(); - client = null; - } -} - -function send(cmd, arg){ - debuger('[Send] tag: ' + cmd + ' value: ' + (arg ? JSON.stringify(arg) : '')); - if (client){ - var data = null, len = 0; - if (arg){ - data = JSON.stringify(arg); - len = Buffer.byteLength(data); - } - - // TLV pack - - // Write the tag: use one byte - var tagBuf = new Buffer(1); - tagBuf.writeInt8(cmd, 0); - client.write(tagBuf); - - // Write the length of value: use four bytes - var headBuf = new Buffer(4); - headBuf.writeUInt32BE(len, 0); - client.write(headBuf); - - // Then, write the value - if (len > 0){ - var bodyBuf = new Buffer(len); - bodyBuf.write(data); - client.write(bodyBuf); - } - } else { - debuger("socket has not yet connected"); - } -} - -function onReceivePackData(data){ - var tag = data.tag; - var value = data.value ? data.value.toString() : null; - debuger('[Received] tag: ' + tag + ' value: ' + value); - - if (value){ - value = JSON.parse(value); - } - - if (typeof receiveCallback === "function"){ - receiveCallback(tag, value); - } -} - -function setDebuger(value){ - debuger = value; -} - -module.exports.setDebuger = setDebuger; -module.exports.connect = connect; -module.exports.close = close; -module.exports.send = send; diff --git a/lib/ExBuffer.js b/lib/ExBuffer.js index 89344f3..67d53e1 100644 --- a/lib/ExBuffer.js +++ b/lib/ExBuffer.js @@ -16,7 +16,7 @@ var ExBuffer = function (bufferLength) { var self = this; var _headLen = 2; var _endian = 'B'; - var _buffer = new Buffer(bufferLength || 512); //Buffer大于8kb 会使用slowBuffer,效率低 + var _buffer = Buffer.alloc(bufferLength || 512); //Buffer大于8kb 会使用slowBuffer,效率低 var _readOffset = 0; var _putOffset = 0; var _dlen = 0; @@ -64,16 +64,16 @@ var ExBuffer = function (bufferLength) { }; /** - * 送入一端Buffer + * 送入一段Buffer */ - this.put = function(buffer,offset,len){ + this.put = function(buffer, offset, len) { if (offset == undefined) offset = 0; if (len == undefined) len = buffer.length - offset; //buf.copy(targetBuffer, [targetStart], [sourceStart], [sourceEnd]) //当前缓冲区已经不能满足次数数据了 if (len + getLen() > _buffer.length) { var ex = Math.ceil((len + getLen()) / 1024); //每次扩展1kb - var tmp = new Buffer(ex * 1024); + var tmp = Buffer.alloc(ex * 1024); var exlen = tmp.length - _buffer.length; _buffer.copy(tmp); //fix bug : superzheng @@ -109,6 +109,7 @@ var ExBuffer = function (bufferLength) { buffer.copy(_buffer, _putOffset, offset, offset + len); _putOffset += len; } + proc(); }; @@ -135,8 +136,8 @@ var ExBuffer = function (bufferLength) { _readOffset += _tagLen; _dlen = _buffer['readUInt' + (8 * _headLen) + '' + _endian + 'E'](_readOffset); _readOffset += _headLen; - }else {// - var hbuf = new Buffer(_headLen + _tagLen); + } else {// + var hbuf = Buffer.alloc(_headLen + _tagLen); var rlen = 0; for (var i = 0; i < (_buffer.length + _tagLen - _readOffset); i++) { hbuf[i] = _buffer[_readOffset++]; @@ -160,7 +161,7 @@ var ExBuffer = function (bufferLength) { //console.log('_dlen:'+_dlen + ',unreadLen:'+getLen()); if (getLen() >= _dlen) { - var dbuff = new Buffer(_dlen); + var dbuff = Buffer.alloc(_dlen); if (_readOffset + _dlen > _buffer.length) { var len1 = _buffer.length - _readOffset; if (len1 > 0) { @@ -177,7 +178,7 @@ var ExBuffer = function (bufferLength) { _dlen = 0; if (_tagLen > 0){ //_tagLen = 0; - self.emit("data", {tag:_tag, value:dbuff}); + self.emit("data", { tag:_tag, value:dbuff }); } else { self.emit("data", dbuff); } @@ -188,7 +189,7 @@ var ExBuffer = function (bufferLength) { } catch(e) { self.emit("error", e); } - }else { + } else { break; } } diff --git a/lib/client.js b/lib/client.js new file mode 100644 index 0000000..cc3c592 --- /dev/null +++ b/lib/client.js @@ -0,0 +1,172 @@ +/** + * Socket客户端 + */ + +var net = require("net") +var ExBuffer = require("./ExBuffer") + +var client = null +var exBuffer = null +var connectCallback = null +var receiveCallback = null +var serverPort = null +var serverHost = null +var disconnected = false +var debuger = console.log + +var defaultOptions = { + timeout: 15000, + heartbeatTag: 0, + reconnectInterval: 1000 +} + +/** + * Connect to server + * @param port (required) Server port + * @param host (required) Server hostname + * @param onDisconncted (optional) On the client connected, callback() + * @param onReceived (optional) On the client received messages, callback(tag, value) + */ +function connect(options, onConnected, onReceived){ + close(false) + + if (options && options.timeout) { + defaultOptions.timeout = options.timeout + } + if (options && options.heartbeatTag) { + defaultOptions.heartbeatTag = options.heartbeatTag + } + if (options && options.reconnectInterval) { + defaultOptions.reconnectInterval = options.reconnectInterval + } + + if (options && options.port){ + serverPort = options.port + } + if (options && options.host){ + serverHost = options.host + } + if (onConnected){ + connectCallback = onConnected + } + if (onReceived){ + receiveCallback = onReceived + } + + client = net.connect(serverPort, serverHost, function () { + debuger('Client has connected to: ' + serverHost + ":" + serverPort) + + exBuffer = new ExBuffer().int8Tag().uint32Head().bigEndian() + exBuffer.on('data', onReceivePackData) + + client.on('data', function(data) { + exBuffer.put(data) + }) + + client.on('end', function() { + debuger('The socket has been disconnected by server') + }) + + client.on('timeout', function() { + debuger('The socket has timed out.') + setTimeout(reconnect, defaultOptions.reconnectInterval) + }) + + if (typeof connectCallback === "function"){ + connectCallback() + } + }) + + client.on('error', function(error) { + debuger("The socket occured an error: " + error) + setTimeout(reconnect, defaultOptions.reconnectInterval) + }) + + client.on('close', function() { + debuger('The socket closed.') + }) + + client.setTimeout(defaultOptions.timeout) +} + +function reconnect(){ + if (!disconnected){ + connect() + } +} + +/** + * Close the client + * @param disconnect + */ +function close(disconnect = true) { + disconnected = disconnect + + if (client){ + client.destroy() + client = null + } +} + +/** + * Send message to server + * @param cmd means tag + * @parma arg means value + */ +function send(cmd, arg){ + debuger('[Send] tag: ' + cmd + ' value: ' + (arg ? JSON.stringify(arg) : '')) + + if (client){ + var data = null, len = 0 + if (arg){ + data = JSON.stringify(arg) + len = Buffer.byteLength(data) + } + + // TLV pack: + // Write the tag: use one byte + var tagBuf = Buffer.alloc(1) + tagBuf.writeInt8(cmd, 0) + client.write(tagBuf) + + // Write the length of value: use four bytes + var headBuf = Buffer.alloc(4) + headBuf.writeUInt32BE(len, 0) + client.write(headBuf) + + // Then, write the value + if (len > 0){ + var bodyBuf = Buffer.alloc(len) + bodyBuf.write(data) + client.write(bodyBuf) + } + } else { + debuger("The socket has not connected yet.") + } +} + +function onReceivePackData(data){ + var tag = data.tag + var value = data.value ? data.value.toString() : null + debuger('[Received] tag: ' + tag + ' value: ' + value) + + // ignore heartbeat response + if (tag === defaultOptions.heartbeatTag) return + + if (value){ + value = JSON.parse(value) + } + + if (typeof receiveCallback === "function"){ + receiveCallback(tag, value) + } +} + +function setDebuger(value){ + debuger = value +} + +module.exports.setDebuger = setDebuger +module.exports.connect = connect +module.exports.close = close +module.exports.send = send diff --git a/lib/server.js b/lib/server.js new file mode 100644 index 0000000..c6684bd --- /dev/null +++ b/lib/server.js @@ -0,0 +1,268 @@ +/** + * Socket服务端 + */ + +var net = require("net") +var ExBuffer = require("./ExBuffer") + +var server = null +var exBuffer = null +var connectCallback = null +var disconnectCallback = null +var receiveCallback = null +var serverPort = null +var disconnected = false +var debuger = console.log +var clientList = [] +var heartbeatHandle = 0 + +var defaultOptions = { + timeout: 15000, + heartbeatTag: 0, + heartbeatInterval: 7000, + recreateInterval: 1000 +} + +/** + * Create server + * @param port (required) Server port for listen + * @param onConnected (optional) On a client connected, callback(socket) + * @param onDisconncted (optional) On a client disconnected, callback(socket) + * @param onReceived (optional) On a client received message, callback(tag, value, socket) + */ +function createServer(options, onConnected, onDisconncted, onReceived){ + close(false) + + if (options && options.timeout) { + defaultOptions.timeout = options.timeout + } + if (options && options.heartbeatTag) { + defaultOptions.heartbeatTag = options.heartbeatTag + } + if (options && options.heartbeatInterval) { + defaultOptions.heartbeatInterval = options.heartbeatInterval + } + if (options && options.reconnectInterval) { + defaultOptions.recreateInterval = options.recreateInterval + } + + if (options && options.port){ + serverPort = options.port + } + if (onConnected){ + connectCallback = onConnected + } + if (onDisconncted){ + disconnectCallback = onDisconncted + } + if (onReceived){ + receiveCallback = onReceived + } + + clientList = [] + heartbeatHandle = setInterval(heartbeatHandler, defaultOptions.heartbeatInterval) + + server = net.createServer(function (socket) { + debuger('A socket has connected: ' + socket.remoteAddress + ':' + socket.remotePort) + + // When an idle timeout is triggered the socket will receive a 'timeout' event but the connection will not be severed + socket.setTimeout(defaultOptions.timeout, function () { + debuger('The socket has timed out: ' + socket.remoteAddress + ':' + socket.remotePort) + socket.destroy() + }) + + exBuffer = new ExBuffer().int8Tag().uint32Head().bigEndian() + exBuffer.on('data', handlerDelegate(onReceivePackData, socket)) + + socket.on('data', function (data) { + exBuffer.put(data) + }) + + socket.on('error', function (error) { + debuger('The socket occured an error:' + error) + socket.destroy() + }) + + socket.on('end', function() { + debuger('The socket ended.') + }) + + socket.on('close', function() { + debuger('A socket has closed: ' + socket.remoteAddress + ':' + socket.remotePort) + removeClient(socket) + }) + + addClient(socket) + }) + + server.on('error', function(error) { + debuger("Server occured an error: " + error) + setTimeout(recreate, defaultOptions.recreateInterval) + }) + + server.on('close', function() { + debuger('Server is closed.') + }) + + server.on('listening', function(){ + debuger('Server is listening: ' + serverPort) + }) + + server.listen(serverPort) +} + +/** + * Close the server + * @param disconnect + */ +function close(disconnect = true) { + disconnected = disconnect + + if (server){ + clearInterval(heartbeatHandle) + + server.close() + server = null + } +} + +function heartbeatHandler () { + for (var i = clientList.length - 1; i >= 0; i--) { + ping(clientList[i]) + } +} + +function ping (client) { + send(defaultOptions.heartbeatTag, null, client) +} + +function addClient (socket) { + if (socket) { + clientList.push(socket) + + if (typeof connectCallback === "function"){ + connectCallback(socket) + } + } +} + +function removeClient (socket) { + if (socket) { + var index = clientList.indexOf(socket) + if (index != -1) { + clientList.splice(index, 1) + + if (typeof disconnectCallback === "function"){ + disconnectCallback(socket) + } + } + } +} + +/** + * Send message to a client + * @param cmd means tag + * @parma arg means value + * @parma socket the target client + */ +function send(cmd, arg, socket){ + if (socket && socket.writable) { + if (cmd !== defaultOptions.heartbeatTag) { + debuger('[Send to ' + socket.remoteAddress + '] tag: ' + cmd + ' value: ' + (arg ? JSON.stringify(arg) : '')) + } + + var data = null, len = 0 + if (arg){ + data = JSON.stringify(arg) + len = Buffer.byteLength(data) + } + + // TLV pack: + // Write the tag: use one byte + var tagBuf = Buffer.alloc(1) + tagBuf.writeInt8(cmd, 0) + socket.write(tagBuf) + + // Write the length of value: use four bytes + var headBuf = Buffer.alloc(4) + headBuf.writeUInt32BE(len, 0) + socket.write(headBuf) + + // Then, write the value + if (len > 0){ + var bodyBuf = Buffer.alloc(len) + bodyBuf.write(data) + socket.write(bodyBuf) + } + } +} + +function onReceivePackData(data, socket) { + var tag = data.tag + var value = data.value ? data.value.toString() : null + + // ignore heartbeat response + if (tag === defaultOptions.heartbeatTag) return + + debuger('[Received from ' + socket.remoteAddress + '] tag: ' + tag + ' value: ' + value) + + if (value){ + value = JSON.parse(value) + } + + if (typeof receiveCallback === "function"){ + receiveCallback(tag, value, socket) + } +} + +function recreate(){ + if (!disconnected){ + createServer() + } +} + +function setDebuger(value){ + debuger = value +} + +/** + * Broadcast message + * @param cmd (required) the tag + * @param arg (optional) the value + * @param list (optional) client list, if not specified it will broadcast all connections + * @param from (optional) from which client + */ +function broadcast(cmd, arg, list, from) { + if (!list) { + list = clientList + } + + for (var i = list.length - 1; i >= 0; i--) { + var socket = list[i] + if (from != socket) { + if (socket.writable) { + send(cmd, arg, socket) + } else { + socket.destroy() + } + } + } +} + +/** + * 回调函数代理,使其可传任意额外参数(不想用匿名函数的情况下) + * @param handler 回调函数 + * @param args ...额外参数 + * @return function + */ +function handlerDelegate(handler, ...args) { + return function (...params) { + handler.apply(null, params.concat(args)) + } +} + +module.exports.setDebuger = setDebuger +module.exports.listen = createServer +module.exports.close = close +module.exports.send = send +module.exports.broadcast = broadcast diff --git a/package.json b/package.json index 29705f7..7add6ef 100644 --- a/package.json +++ b/package.json @@ -1,10 +1,11 @@ { "name": "node-socket", - "version": "1.0.2", - "description": "TCP Socket with TLV packet (Node端基于TLV二进制协议格式进行封装的socket工具库,包括粘包断包处理)", + "version": "2.0.0", + "description": "Based on node.js tcp socket of TLV package structure", "main": "index.js", "scripts": { - "test": "echo \"Error: no test specified\" && exit 1" + "server": "node test_server.js", + "client": "node test_client.js" }, "repository": { "type": "git", @@ -13,7 +14,8 @@ "keywords": [ "node", "socket", - "tlv" + "tlv", + "tcp" ], "author": { "email": "super_xp@126.com", diff --git a/test_client.js b/test_client.js new file mode 100644 index 0000000..58e3207 --- /dev/null +++ b/test_client.js @@ -0,0 +1,34 @@ +// *************************** connect to server ****************************** + +var port = 11000; +var host = "127.0.0.1"; + +var TAG_SEND_REGIST = 1; +var TAG_FROM_REGIST_SUCCESS = 'a'; +var TAG_FROM_REGIST_FAIL = 'b'; + +var token = "123"; + +var socket = require("./index").client; + +var options = { + port:port, + host:host +} + +socket.connect(options, onConnected, onReceived); + +function onConnected () { + socket.send(TAG_SEND_REGIST, { token: token }); +} + +function onReceived (tag, value) { + switch (tag){ + case TAG_FROM_REGIST_SUCCESS: + + break; + case TAG_FROM_REGIST_FAIL: + + break; + } +} diff --git a/test_server.js b/test_server.js new file mode 100644 index 0000000..d64b880 --- /dev/null +++ b/test_server.js @@ -0,0 +1,37 @@ + +// *************************** create server ****************************** + +var port = 11000; + +var server = require("./index").server; + +var clientList = []; +var options = { + port:port +} + +server.setDebuger(logger) +server.listen(options, onClientConnected, onClientDisconncted, onClientReceived); + +function onClientConnected (socket) { + // clientList.push(socket); +} + +function onClientDisconncted (socket) { + // var index = clientList.indexOf(socket); + // if (index != -1) { + // clientList.splice(index, 1); + // } +} + +function onClientReceived (tag, value, from) { + // server.broadcast(tag, value) // all + server.broadcast(tag, value, null, from) // all, except from + // server.broadcast(tag, value, clientList, from); +} + +function logger (log) { + var now = new Date() + var time = now.getHours() + ':' + now.getMinutes() + ':' + now.getSeconds() + console.log('[' + time + '] ' + log) +} \ No newline at end of file