+25
-27
| let net = require('net'); | ||
| let EventEmitter = require('events').EventEmitter | ||
| class Client extends EventEmitter { | ||
@@ -11,3 +10,3 @@ constructor(port, host){ | ||
| this.client = new net.Socket() | ||
| this.eof = '\r\n' | ||
| this.eof = '\n' | ||
| this.connect() | ||
@@ -37,2 +36,3 @@ this.messages() | ||
| self.connected = true | ||
| self.client.setEncoding('utf8'); | ||
| self.client.setKeepAlive(self.isKeepalive, self.keepaliveDelay) | ||
@@ -63,3 +63,3 @@ let t = Object.keys(self.topics) | ||
| if(this.connected){ | ||
| self.client.write(Buffer.from(JSON.stringify({ meta : 'subscribe', data : subTopics}) + self.eof)); | ||
| self.client.write(Buffer.from('s' + JSON.stringify(subTopics) + self.eof)); | ||
| cb(topic) | ||
@@ -69,3 +69,3 @@ } | ||
| setTimeout(function(){ | ||
| self.client.write(Buffer.from(JSON.stringify({ meta : 'subscribe', data : subTopics}) + self.eof)); | ||
| self.client.write(Buffer.from('s' + JSON.stringify(subTopics) + self.eof)); | ||
| cb(topic) | ||
@@ -87,3 +87,3 @@ },200) | ||
| if(this.connected){ | ||
| self.client.write(Buffer.from(JSON.stringify({ meta : 'unsubscribe', data : subTopics}) + self.eof)); | ||
| self.client.write(Buffer.from('u' + JSON.stringify(subTopics) + self.eof)); | ||
| cb(topic) | ||
@@ -93,7 +93,6 @@ } | ||
| setTimeout(function(){ | ||
| self.client.write(Buffer.from(JSON.stringify({ meta : 'unsubscribe', data : subTopics}) + self.eof)); | ||
| self.client.write(Buffer.from('u' + JSON.stringify(subTopics) + self.eof)); | ||
| cb(topic) | ||
| },200) | ||
| } | ||
| } | ||
@@ -105,7 +104,7 @@ | ||
| if(this.connected){ | ||
| self.client.write(Buffer.from(JSON.stringify({ meta : 'publish', data : { topic : topic, payload : payload}}) + self.eof)); | ||
| self.client.write(Buffer.from('p' + JSON.stringify({ t : topic, p : payload}) + self.eof)); | ||
| } | ||
| else{ | ||
| setTimeout(function(){ | ||
| self.client.write(Buffer.from(JSON.stringify({ meta : 'publish', data : { topic : topic, payload : payload}}) + self.eof)); | ||
| self.client.write(Buffer.from('p' + JSON.stringify({ t : topic, p : payload}) + self.eof)); | ||
| },200) | ||
@@ -116,27 +115,26 @@ } | ||
| messages(){ | ||
| let self = this | ||
| let chunk = '' | ||
| let messages = [] | ||
| let len = 0 | ||
| let message = '' | ||
| this.client.on('data', function(data) { | ||
| chunk += data.toString() | ||
| if(chunk.indexOf(self.eof) !== -1){ | ||
| chunk = chunk.split(self.eof) | ||
| for(let i = 0; i < chunk.length; i++){ | ||
| messages = chunk.split(self.eof) | ||
| len = messages.length | ||
| chunk = '' | ||
| for(let i = 0; i < len; i++){ | ||
| try{ | ||
| chunk[i] = JSON.parse(chunk[i]) | ||
| try{ | ||
| self.emit(chunk[i]['meta'], chunk[i]['data'].topic, chunk[i]['data'].payload) | ||
| if(self.listeners(chunk[i]['data'].topic).length){ | ||
| self.emit(chunk[i]['data'].topic, chunk[i]['data'].payload) | ||
| } | ||
| } | ||
| catch(e){ | ||
| console.error(e) | ||
| } | ||
| message = messages.shift() | ||
| message = JSON.parse(message) | ||
| self.emit('message', message.t, message.p) | ||
| if(self.listeners(message.t).length) | ||
| self.emit(message.t, message.p) | ||
| } | ||
| catch(e){ | ||
| if(typeof chunk !== 'undefined' && typeof chunk[i] !== 'undefined') | ||
| chunk = chunk[i] | ||
| if(e){ | ||
| //when last chunk is not json while tcp package was too big concat to next package | ||
| chunk = message.toString() | ||
| } | ||
| } | ||
@@ -173,3 +171,3 @@ } | ||
| let self = this | ||
| this.client.write(Buffer.from(JSON.stringify({ meta : 'destroy' }) + this.eof)); | ||
| self.client.write(Buffer.from('d') + self.eof); | ||
| } | ||
@@ -176,0 +174,0 @@ } |
+43
-38
@@ -15,3 +15,3 @@ | ||
| this.wildcards = {} | ||
| this.eof = '\r\n' | ||
| this.eof = '\n' | ||
| this.block = block ? block : false | ||
@@ -21,5 +21,3 @@ } | ||
| getConnections(cb){ | ||
| cb = (typeof cb === 'function') ? cb : function(){} | ||
| this.server.getConnections(function(err, numb){ | ||
@@ -35,2 +33,3 @@ cb(err, numb) | ||
| this.server.on('connection',function(sock){ | ||
| sock.setEncoding('utf8'); | ||
| self.events(sock) | ||
@@ -61,14 +60,22 @@ self.messages(sock) | ||
| let chunk = '' | ||
| let messages = [] | ||
| let len = 0 | ||
| let message = '' | ||
| let lastChunk = '' | ||
| sock.on('data', function(data) { | ||
| chunk += data.toString() | ||
| if(chunk.indexOf(self.eof) !== -1){ | ||
| chunk = chunk.split(self.eof) | ||
| for(let i = 0; i < chunk.length; i++){ | ||
| messages = chunk.split(self.eof) | ||
| chunk = '' | ||
| len = messages.length | ||
| for(let i = 0; i < len; i++){ | ||
| try{ | ||
| self.handle(JSON.parse(chunk[i]), sock) | ||
| message = messages.shift() | ||
| self.handle(message.charAt(0), JSON.parse(message.substr(1, message.length)), sock) | ||
| } | ||
| catch(e){ | ||
| if(typeof chunk !== 'undefined' && typeof chunk[i] !== 'undefined') | ||
| chunk = chunk[i] | ||
| if(e){ | ||
| //when last chunk is not json because tcp package was too big concat to next package | ||
| chunk = message | ||
| } | ||
| } | ||
@@ -80,28 +87,27 @@ } | ||
| handle(chunk, sock){ | ||
| switch(chunk['meta']){ | ||
| case 'publish': | ||
| this.publish(chunk['data'].topic, chunk['data'].payload, sock) | ||
| handle(meta, chunk, sock){ | ||
| switch(meta){ | ||
| case 'p': | ||
| this.publish(chunk.t, chunk.p, sock) | ||
| if(Object.keys(this.wildcards).length){ | ||
| this.wildcardpublish(chunk['data'].topic, chunk['data'].payload, sock) | ||
| this.wildcardpublish(chunk.t, chunk.p, sock) | ||
| } | ||
| this.emit('published', chunk['data'].topic, chunk['data'].payload) | ||
| this.emit('published', chunk.t, chunk.p) | ||
| break | ||
| case 'subscribe': | ||
| for(let topic in chunk['data']){ | ||
| this.subscribe(chunk['data'][topic], sock) | ||
| this.emit('subscribed', chunk['data'][topic]) | ||
| case 's': | ||
| for(let t in chunk){ | ||
| this.subscribe(chunk[t], sock) | ||
| this.emit('subscribed', chunk[t]) | ||
| } | ||
| break | ||
| case 'unsubscribe': | ||
| for(let topic in chunk['data']){ | ||
| this.unsubscribe(chunk['data'][topic], sock) | ||
| this.emit('unsubscribed', chunk['data'][topic]) | ||
| case 'u': | ||
| for(let t in chunk){ | ||
| this.unsubscribe(chunk[t], sock) | ||
| this.emit('unsubscribed', chunk[t]) | ||
| } | ||
| break | ||
| case 'destroy': | ||
| case 'd': | ||
| this.destroy(sock) | ||
@@ -137,4 +143,3 @@ this.emit('destroy', sock.address()) | ||
| sock.on('error', function (err) { | ||
| console.log(err) | ||
| //self.emit('error', err.message); | ||
| self.emit('error', err.message); | ||
| }); | ||
@@ -148,7 +153,7 @@ } | ||
| if(this.topics[topic][socket] !== sock){ | ||
| this.topics[topic][socket].write(Buffer.from(JSON.stringify({ meta : 'message', data : { topic : topic, payload : payload}}) + this.eof), 'utf-8'); | ||
| this.topics[topic][socket].write(Buffer.from(JSON.stringify({ t : topic, p : payload}) + this.eof)); | ||
| } | ||
| } | ||
| else{ | ||
| this.topics[topic][socket].write(Buffer.from(JSON.stringify( {meta : 'message', data : { topic : topic, payload : payload}}) + this.eof), 'utf-8'); | ||
| this.topics[topic][socket].write(Buffer.from(JSON.stringify({ t : topic, p : payload}) + this.eof)); | ||
| } | ||
@@ -159,3 +164,3 @@ | ||
| catch(e){ | ||
| console.log(e) | ||
| console.error(e) | ||
| } | ||
@@ -175,11 +180,11 @@ } | ||
| if(this.wildcards[card][socket] !== sock){ | ||
| this.wildcards[card][socket].write(Buffer.from(JSON.stringify({ meta : 'message', data : { topic : topic, payload : payload}}) + this.eof), 'utf-8'); | ||
| this.wildcards[card][socket].write(Buffer.from(JSON.stringify({ t : topic, p : payload}) + this.eof)); | ||
| } | ||
| } | ||
| else{ | ||
| this.wildcards[card][socket].write(Buffer.from(JSON.stringify({ meta : 'message', data : { topic : topic, payload : payload}}) + this.eof), 'utf-8'); | ||
| this.wildcards[card][socket].write(Buffer.from(JSON.stringify({ t : topic, p : payload}) + this.eof)); | ||
| } | ||
| } | ||
| catch(e){ | ||
| console.log(e) | ||
| console.error(e) | ||
| } | ||
@@ -222,3 +227,3 @@ } | ||
| catch(e){ | ||
| console.log(e) | ||
| console.error(e) | ||
| } | ||
@@ -239,3 +244,3 @@ } | ||
| catch(e){ | ||
| console.log(e) | ||
| console.error(e) | ||
| } | ||
@@ -251,3 +256,3 @@ } | ||
| catch(e){ | ||
| console.log(e) | ||
| console.error(e) | ||
| } | ||
@@ -265,3 +270,3 @@ } | ||
| catch(e){ | ||
| console.log(e) | ||
| console.error(e) | ||
| } | ||
@@ -268,0 +273,0 @@ } |
+1
-1
| { | ||
| "name": "tcppubsub", | ||
| "version": "0.0.8", | ||
| "version": "0.0.9", | ||
| "description": "A simple node-js-tcp publish-subscribe framework. With a broker and a client called member.", | ||
@@ -5,0 +5,0 @@ "main": "lib/main.js", |
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
439
2.81%20094
-0.88%