+11
-1
@@ -12,6 +12,8 @@ var tcppubsub = require('../lib/main') | ||
| member.connect(function(){ | ||
| //Subscribe the topic without callback | ||
| member.sub('app/configuration/service') | ||
| member.sub('hey/ho') | ||
| // Subscribe the topic STRING or ARRAY | ||
@@ -24,2 +26,10 @@ member.sub('app/configuration/server', function(topic){ | ||
| member.pub('app/configuration/2', 'after pub', function(){ | ||
| setTimeout(function(){ | ||
| member.sub('app/configuration/2') | ||
| }, 1000) | ||
| }) | ||
@@ -26,0 +36,0 @@ // Receive the data on the certain topic |
@@ -24,5 +24,12 @@ var tcppubsub = require('../lib/main') | ||
| broker.on('error', function(err){console.log('error:', err)}) | ||
| broker.on('published', function(data){console.log('published:', data)}) | ||
| broker.on('subscribed', function(topic){console.log('subscribed:', topic)}) | ||
| broker.on('published', function(data){ | ||
| }) | ||
| broker.on('subscribed', function(topic){ | ||
| if(topic === 'hey/ho') | ||
| broker.pub('hey/ho', 'hello from server') | ||
| }) | ||
| broker.on('unsubscribed', function(topic){console.log('unsubscribed:', topic)}) | ||
+1
-0
@@ -90,2 +90,3 @@ let net = require('net'); | ||
| this.client.write(Buffer.from('p' + JSON.stringify({ t : topic, p : payload}) + this.eof)); | ||
| cb(topic, payload) | ||
| } | ||
@@ -92,0 +93,0 @@ } |
+73
-43
@@ -15,4 +15,5 @@ | ||
| this.wildcards = {} | ||
| this.openTasks = {} | ||
| this.eof = '\n' | ||
| this.block = block ? block : true | ||
| this.block = typeof block === 'undefined' ? true : block | ||
| } | ||
@@ -85,2 +86,3 @@ | ||
| handle(meta, chunk, sock){ | ||
| switch(meta){ | ||
@@ -136,3 +138,3 @@ case 'p': | ||
| self.emit('close', 'Socket close'); | ||
| sock.destroy(); | ||
| // self.destroy(sock); | ||
| }); | ||
@@ -142,2 +144,3 @@ | ||
| self.emit('error', err.message); | ||
| self.destroy(sock); | ||
| }); | ||
@@ -147,2 +150,8 @@ } | ||
| publish(chunk, sock){ | ||
| if(!this.topics[chunk.t]){ | ||
| this.openTasks[chunk.t] = chunk | ||
| return | ||
| } | ||
| for(let socket in this.topics[chunk.t]){ | ||
@@ -160,3 +169,3 @@ try{ | ||
| catch(e){ | ||
| console.error(e) | ||
| delete this.topics[chunk.t][socket] | ||
| } | ||
@@ -169,25 +178,21 @@ } | ||
| for(let card in this.wildcards){ | ||
| try { | ||
| if(wildcard(chunk.t, card)){ | ||
| for(let socket in this.wildcards[card]){ | ||
| try{ | ||
| if(this.block){ | ||
| if(this.wildcards[card][socket] !== sock){ | ||
| this.wildcards[card][socket].write(Buffer.from(JSON.stringify(chunk) + this.eof)); | ||
| } | ||
| } | ||
| else{ | ||
| if(wildcard(chunk.t, card)){ | ||
| for(let socket in this.wildcards[card]){ | ||
| try{ | ||
| if(this.block){ | ||
| if(this.wildcards[card][socket] !== sock){ | ||
| this.wildcards[card][socket].write(Buffer.from(JSON.stringify(chunk) + this.eof)); | ||
| } | ||
| } | ||
| catch(e){ | ||
| console.error(e) | ||
| else{ | ||
| this.wildcards[card][socket].write(Buffer.from(JSON.stringify(chunk) + this.eof)); | ||
| } | ||
| } | ||
| catch(e){ | ||
| delete this.topics[chunk.t][socket] | ||
| } | ||
| } | ||
| } | ||
| catch(e){ | ||
| console.error(e) | ||
| } | ||
| } | ||
@@ -198,2 +203,3 @@ } | ||
| if(topic.indexOf('#') === -1){ | ||
| if(!this.topics[topic]){ | ||
@@ -203,2 +209,8 @@ this.topics[topic] = [] | ||
| this.topics[topic].push(sock) | ||
| // Send open tasks | ||
| if(this.openTasks[topic]){ | ||
| this.publish(this.openTasks[topic]) | ||
| delete this.openTasks[topic] | ||
| } | ||
| } | ||
@@ -212,6 +224,16 @@ else{ | ||
| let card = topic.replace(/#/g, '*') | ||
| if(!this.wildcards[card]){ | ||
| this.wildcards[card] = [] | ||
| } | ||
| this.wildcards[card].push(sock) | ||
| // Send open tasks | ||
| for(let openTopic in this.openTasks){ | ||
| if(wildcard(topic, openTopic)){ | ||
| this.wildcardpublish(this.openTasks[openTopic]) | ||
| delete this.openTasks[openTopic] | ||
| } | ||
| } | ||
| } | ||
@@ -222,8 +244,3 @@ | ||
| if(this.topics[topic]){ | ||
| try { | ||
| delete this.topics[topic][this.topics[topic].indexOf(sock)] | ||
| } | ||
| catch(e){ | ||
| console.error(e) | ||
| } | ||
| this.deleteTopic(topic, sock) | ||
| } | ||
@@ -239,8 +256,3 @@ } | ||
| if(this.wildcards[card]){ | ||
| try { | ||
| delete this.wildcards[card][this.wildcards[card].indexOf(sock)] | ||
| } | ||
| catch(e){ | ||
| console.error(e) | ||
| } | ||
| this.deleteWildcard(topic, sock) | ||
| } | ||
@@ -251,8 +263,3 @@ } | ||
| for(let topic in this.topics){ | ||
| try{ | ||
| delete this.topics[topic][ this.topics[topic].indexOf(sock) ] | ||
| } | ||
| catch(e){ | ||
| console.error(e) | ||
| } | ||
| this.deleteTopic(topic, sock) | ||
| } | ||
@@ -265,10 +272,33 @@ this.wildcarddestroy(sock) | ||
| for(let topic in this.wildcards){ | ||
| try{ | ||
| delete this.wildcards[topic][ this.wildcards[topic].indexOf(sock) ] | ||
| } | ||
| catch(e){ | ||
| console.error(e) | ||
| } | ||
| this.deleteWildcard(topic, sock) | ||
| } | ||
| } | ||
| deleteTopic(topic, sock){ | ||
| let index = this.topics[topic].indexOf(sock) | ||
| if(index >= 0){ | ||
| this.topics[topic].splice(index, 1) | ||
| } | ||
| if(!this.topics[topic].length){ | ||
| delete this.topics[topic] | ||
| } | ||
| } | ||
| deleteWildcard(topic, sock){ | ||
| let index = this.wildcards[topic].indexOf(sock) | ||
| if(index >= 0){ | ||
| this.wildcards[topic].splice(index, 1) | ||
| } | ||
| if(!this.wildcards[topic].length){ | ||
| delete this.wildcards[topic] | ||
| } | ||
| } | ||
| pub(topic, payload){ | ||
| this.publish({t: topic, p: payload}) | ||
| } | ||
| } | ||
@@ -275,0 +305,0 @@ |
+1
-1
| { | ||
| "name": "tcppubsub", | ||
| "version": "1.0.4", | ||
| "version": "1.0.5", | ||
| "description": "A simple node-js-tcp publish-subscribe framework. With a broker and a client called member.", | ||
@@ -5,0 +5,0 @@ "main": "lib/main.js", |
+20
-2
@@ -33,3 +33,3 @@ # tcppubsub | ||
| var host = 'localhost' | ||
| var block = true // block payload sending to publisher, if he has subscribed the topic too. Default: true | ||
| var block = true // block payload sending to publisher, if he has subscribed the topic too. Default: true (blocked) | ||
@@ -43,2 +43,5 @@ var broker = new tcppubsub.Broker(port, host, block) | ||
| // Publish on topics from broker | ||
| broker.pub('hey/ho', 'Yellow submarine!') | ||
| //use the socket-events like: | ||
@@ -119,2 +122,17 @@ broker.on('end', function(msg){console.log('end:', msg)}) | ||
| ### Message Event | ||
| Catch a message-event with namespace `message` for receiving all messages or use a specific topic. | ||
| ```js | ||
| member.on('my/super/topic', function(data){ | ||
| // data on the certain topic | ||
| }) | ||
| /******* OR *******/ | ||
| member.on('message', function(topic, data){ | ||
| // all data | ||
| }) | ||
| ``` | ||
| ### Example | ||
@@ -182,3 +200,3 @@ | ||
| * **Yannick Grund** - *Initial work* - [yamigr](https://github.com/yamigr) | ||
| * **Yannick Grund (yamicro.io@gmail.com)** - *Initial work* - [yamigr](https://github.com/yamigr) | ||
@@ -185,0 +203,0 @@ ## ToDo |
+1
-1
@@ -8,3 +8,3 @@ var assert = require('assert') | ||
| var broker = new tcppubsub.Broker(port, host) | ||
| var broker = new tcppubsub.Broker(port, host, false) | ||
| var member | ||
@@ -11,0 +11,0 @@ var did = false |
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
75688
1.72%570
4.59%207
9.52%