New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

tcppubsub

Package Overview
Dependencies
Maintainers
1
Versions
17
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

tcppubsub - npm Package Compare versions

Comparing version
0.0.8
to
0.0.9
+25
-27
lib/Client.js
let net = require('net');
let EventEmitter = require('events').EventEmitter
class Client extends EventEmitter {

@@ -11,3 +10,3 @@ constructor(port, host){

this.client = new net.Socket()
this.eof = '\r\n'
this.eof = '\n'
this.connect()

@@ -37,2 +36,3 @@ this.messages()

self.connected = true
self.client.setEncoding('utf8');
self.client.setKeepAlive(self.isKeepalive, self.keepaliveDelay)

@@ -63,3 +63,3 @@ let t = Object.keys(self.topics)

if(this.connected){
self.client.write(Buffer.from(JSON.stringify({ meta : 'subscribe', data : subTopics}) + self.eof));
self.client.write(Buffer.from('s' + JSON.stringify(subTopics) + self.eof));
cb(topic)

@@ -69,3 +69,3 @@ }

setTimeout(function(){
self.client.write(Buffer.from(JSON.stringify({ meta : 'subscribe', data : subTopics}) + self.eof));
self.client.write(Buffer.from('s' + JSON.stringify(subTopics) + self.eof));
cb(topic)

@@ -87,3 +87,3 @@ },200)

if(this.connected){
self.client.write(Buffer.from(JSON.stringify({ meta : 'unsubscribe', data : subTopics}) + self.eof));
self.client.write(Buffer.from('u' + JSON.stringify(subTopics) + self.eof));
cb(topic)

@@ -93,7 +93,6 @@ }

setTimeout(function(){
self.client.write(Buffer.from(JSON.stringify({ meta : 'unsubscribe', data : subTopics}) + self.eof));
self.client.write(Buffer.from('u' + JSON.stringify(subTopics) + self.eof));
cb(topic)
},200)
}
}

@@ -105,7 +104,7 @@

if(this.connected){
self.client.write(Buffer.from(JSON.stringify({ meta : 'publish', data : { topic : topic, payload : payload}}) + self.eof));
self.client.write(Buffer.from('p' + JSON.stringify({ t : topic, p : payload}) + self.eof));
}
else{
setTimeout(function(){
self.client.write(Buffer.from(JSON.stringify({ meta : 'publish', data : { topic : topic, payload : payload}}) + self.eof));
self.client.write(Buffer.from('p' + JSON.stringify({ t : topic, p : payload}) + self.eof));
},200)

@@ -116,27 +115,26 @@ }

messages(){
let self = this
let chunk = ''
let messages = []
let len = 0
let message = ''
this.client.on('data', function(data) {
chunk += data.toString()
if(chunk.indexOf(self.eof) !== -1){
chunk = chunk.split(self.eof)
for(let i = 0; i < chunk.length; i++){
messages = chunk.split(self.eof)
len = messages.length
chunk = ''
for(let i = 0; i < len; i++){
try{
chunk[i] = JSON.parse(chunk[i])
try{
self.emit(chunk[i]['meta'], chunk[i]['data'].topic, chunk[i]['data'].payload)
if(self.listeners(chunk[i]['data'].topic).length){
self.emit(chunk[i]['data'].topic, chunk[i]['data'].payload)
}
}
catch(e){
console.error(e)
}
message = messages.shift()
message = JSON.parse(message)
self.emit('message', message.t, message.p)
if(self.listeners(message.t).length)
self.emit(message.t, message.p)
}
catch(e){
if(typeof chunk !== 'undefined' && typeof chunk[i] !== 'undefined')
chunk = chunk[i]
if(e){
//when last chunk is not json while tcp package was too big concat to next package
chunk = message.toString()
}
}

@@ -173,3 +171,3 @@ }

let self = this
this.client.write(Buffer.from(JSON.stringify({ meta : 'destroy' }) + this.eof));
self.client.write(Buffer.from('d') + self.eof);
}

@@ -176,0 +174,0 @@ }

@@ -15,3 +15,3 @@

this.wildcards = {}
this.eof = '\r\n'
this.eof = '\n'
this.block = block ? block : false

@@ -21,5 +21,3 @@ }

getConnections(cb){
cb = (typeof cb === 'function') ? cb : function(){}
this.server.getConnections(function(err, numb){

@@ -35,2 +33,3 @@ cb(err, numb)

this.server.on('connection',function(sock){
sock.setEncoding('utf8');
self.events(sock)

@@ -61,14 +60,22 @@ self.messages(sock)

let chunk = ''
let messages = []
let len = 0
let message = ''
let lastChunk = ''
sock.on('data', function(data) {
chunk += data.toString()
if(chunk.indexOf(self.eof) !== -1){
chunk = chunk.split(self.eof)
for(let i = 0; i < chunk.length; i++){
messages = chunk.split(self.eof)
chunk = ''
len = messages.length
for(let i = 0; i < len; i++){
try{
self.handle(JSON.parse(chunk[i]), sock)
message = messages.shift()
self.handle(message.charAt(0), JSON.parse(message.substr(1, message.length)), sock)
}
catch(e){
if(typeof chunk !== 'undefined' && typeof chunk[i] !== 'undefined')
chunk = chunk[i]
if(e){
//when last chunk is not json because tcp package was too big concat to next package
chunk = message
}
}

@@ -80,28 +87,27 @@ }

handle(chunk, sock){
switch(chunk['meta']){
case 'publish':
this.publish(chunk['data'].topic, chunk['data'].payload, sock)
handle(meta, chunk, sock){
switch(meta){
case 'p':
this.publish(chunk.t, chunk.p, sock)
if(Object.keys(this.wildcards).length){
this.wildcardpublish(chunk['data'].topic, chunk['data'].payload, sock)
this.wildcardpublish(chunk.t, chunk.p, sock)
}
this.emit('published', chunk['data'].topic, chunk['data'].payload)
this.emit('published', chunk.t, chunk.p)
break
case 'subscribe':
for(let topic in chunk['data']){
this.subscribe(chunk['data'][topic], sock)
this.emit('subscribed', chunk['data'][topic])
case 's':
for(let t in chunk){
this.subscribe(chunk[t], sock)
this.emit('subscribed', chunk[t])
}
break
case 'unsubscribe':
for(let topic in chunk['data']){
this.unsubscribe(chunk['data'][topic], sock)
this.emit('unsubscribed', chunk['data'][topic])
case 'u':
for(let t in chunk){
this.unsubscribe(chunk[t], sock)
this.emit('unsubscribed', chunk[t])
}
break
case 'destroy':
case 'd':
this.destroy(sock)

@@ -137,4 +143,3 @@ this.emit('destroy', sock.address())

sock.on('error', function (err) {
console.log(err)
//self.emit('error', err.message);
self.emit('error', err.message);
});

@@ -148,7 +153,7 @@ }

if(this.topics[topic][socket] !== sock){
this.topics[topic][socket].write(Buffer.from(JSON.stringify({ meta : 'message', data : { topic : topic, payload : payload}}) + this.eof), 'utf-8');
this.topics[topic][socket].write(Buffer.from(JSON.stringify({ t : topic, p : payload}) + this.eof));
}
}
else{
this.topics[topic][socket].write(Buffer.from(JSON.stringify( {meta : 'message', data : { topic : topic, payload : payload}}) + this.eof), 'utf-8');
this.topics[topic][socket].write(Buffer.from(JSON.stringify({ t : topic, p : payload}) + this.eof));
}

@@ -159,3 +164,3 @@

catch(e){
console.log(e)
console.error(e)
}

@@ -175,11 +180,11 @@ }

if(this.wildcards[card][socket] !== sock){
this.wildcards[card][socket].write(Buffer.from(JSON.stringify({ meta : 'message', data : { topic : topic, payload : payload}}) + this.eof), 'utf-8');
this.wildcards[card][socket].write(Buffer.from(JSON.stringify({ t : topic, p : payload}) + this.eof));
}
}
else{
this.wildcards[card][socket].write(Buffer.from(JSON.stringify({ meta : 'message', data : { topic : topic, payload : payload}}) + this.eof), 'utf-8');
this.wildcards[card][socket].write(Buffer.from(JSON.stringify({ t : topic, p : payload}) + this.eof));
}
}
catch(e){
console.log(e)
console.error(e)
}

@@ -222,3 +227,3 @@ }

catch(e){
console.log(e)
console.error(e)
}

@@ -239,3 +244,3 @@ }

catch(e){
console.log(e)
console.error(e)
}

@@ -251,3 +256,3 @@ }

catch(e){
console.log(e)
console.error(e)
}

@@ -265,3 +270,3 @@ }

catch(e){
console.log(e)
console.error(e)
}

@@ -268,0 +273,0 @@ }

{
"name": "tcppubsub",
"version": "0.0.8",
"version": "0.0.9",
"description": "A simple node-js-tcp publish-subscribe framework. With a broker and a client called member.",

@@ -5,0 +5,0 @@ "main": "lib/main.js",