cowrie-subscriptions
Advanced tools
Comparing version 1.2.0 to 1.3.0
{ | ||
"name": "cowrie-subscriptions", | ||
"version": "1.2.0", | ||
"version": "1.3.0", | ||
"description": "cowrie implementation of graphql subscriptions", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -7,5 +7,5 @@ const { PubSub: GooglePubSub } = require('@google-cloud/pubsub') | ||
super() | ||
this.pubsubClient = new GooglePubSub(config) | ||
this.pubSubClient = new GooglePubSub(config) | ||
// upstream subscriptions | ||
this.pubsubSubscriptions = {} | ||
this.pubSubSubscriptions = {} | ||
// downstream subscriptions | ||
@@ -29,3 +29,8 @@ this.subscriptions = {} | ||
// we don't want to be able to publish events from the subscriptions server | ||
return | ||
// return | ||
// testing only | ||
return this.pubSubClient | ||
.topic(topic) | ||
.publish(Buffer.from(JSON.stringify(data))) | ||
} | ||
@@ -39,8 +44,8 @@ | ||
// create an upstream subscription if it does not exist | ||
if (!this.pubsubSubscriptions[topic]) { | ||
if (!this.pubSubSubscriptions[topic]) { | ||
const sub = await this._getSubscription(topic, `${topic}-subscription-graphql`) | ||
if (this.pubsubSubscriptions[topic]) { | ||
this.pubsubSubscriptions[topic] = { | ||
...this.pubsubSubscriptions[topic], | ||
subIds: [...this.pubsubSubscriptions[topic].subIds, this.subIdCounter] | ||
if (this.pubSubSubscriptions[topic]) { | ||
this.pubSubSubscriptions[topic] = { | ||
...this.pubSubSubscriptions[topic], | ||
subIds: [...this.pubSubSubscriptions[topic].subIds, this.subIdCounter] | ||
} | ||
@@ -51,7 +56,18 @@ } else { | ||
sub.on('message', async (message) => { | ||
// convert message data to json | ||
let data | ||
try { | ||
data = JSON.parse(message.data.toString()) | ||
} catch(err) { | ||
console.error(err) | ||
// drop this message as it is definitely invalid | ||
message.ack() | ||
return | ||
} | ||
// get a list of downstream subscriptions to relay the message to | ||
try { | ||
self.pubsubSubscriptions.subIds.forEach(subID => { | ||
self.pubSubSubscriptions[topic].subIds.forEach(subID => { | ||
const [, onMessage] = self.subscriptions[subID] | ||
onMessage(message.data) | ||
onMessage(data) | ||
}) | ||
@@ -69,4 +85,4 @@ } catch(err) { | ||
this.pubsubSubscriptions[topic] = { | ||
...this.pubsubSubscriptions[topic], | ||
this.pubSubSubscriptions[topic] = { | ||
...this.pubSubSubscriptions[topic], | ||
subscription: sub, | ||
@@ -77,5 +93,5 @@ subIds: [this.subIdCounter] | ||
} else { | ||
this.pubsubSubscriptions[topic] = { | ||
...this.pubsubSubscriptions[topic], | ||
subIds: [...this.pubsubSubscriptions[topic].subIds, this.subIdCounter] | ||
this.pubSubSubscriptions[topic] = { | ||
...this.pubSubSubscriptions[topic], | ||
subIds: [...this.pubSubSubscriptions[topic].subIds, this.subIdCounter] | ||
} | ||
@@ -91,12 +107,12 @@ } | ||
const pubsubSubscription = this.pubsubSubscriptions[topic] | ||
const pubsubSubscription = this.pubSubSubscriptions[topic] | ||
if (!pubsubSubscription) return subId | ||
if (pubsubSubscription.subIds.length === 1) { | ||
delete this.pubsubSubscriptions[topic] | ||
delete this.pubSubSubscriptions[topic] | ||
} | ||
this.pubsubSubscriptions[topic] = { | ||
...this.pubsubSubscriptions[topic], | ||
subIds: this.pubsubSubscriptions[topic].subIds.filter(id => id !== subId) | ||
this.pubSubSubscriptions[topic] = { | ||
...this.pubSubSubscriptions[topic], | ||
subIds: this.pubSubSubscriptions[topic].subIds.filter(id => id !== subId) | ||
} | ||
@@ -103,0 +119,0 @@ |
4094
99
2