Socket
Socket
Sign inDemoInstall

cowrie-subscriptions

Package Overview
Dependencies
147
Maintainers
1
Versions
5
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.2.0 to 1.3.0

2

package.json
{
"name": "cowrie-subscriptions",
"version": "1.2.0",
"version": "1.3.0",
"description": "cowrie implementation of graphql subscriptions",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -7,5 +7,5 @@ const { PubSub: GooglePubSub } = require('@google-cloud/pubsub')

super()
this.pubsubClient = new GooglePubSub(config)
this.pubSubClient = new GooglePubSub(config)
// upstream subscriptions
this.pubsubSubscriptions = {}
this.pubSubSubscriptions = {}
// downstream subscriptions

@@ -29,3 +29,8 @@ this.subscriptions = {}

// we don't want to be able to publish events from the subscriptions server
return
// return
// testing only
return this.pubSubClient
.topic(topic)
.publish(Buffer.from(JSON.stringify(data)))
}

@@ -39,8 +44,8 @@

// create an upstream subscription if it does not exist
if (!this.pubsubSubscriptions[topic]) {
if (!this.pubSubSubscriptions[topic]) {
const sub = await this._getSubscription(topic, `${topic}-subscription-graphql`)
if (this.pubsubSubscriptions[topic]) {
this.pubsubSubscriptions[topic] = {
...this.pubsubSubscriptions[topic],
subIds: [...this.pubsubSubscriptions[topic].subIds, this.subIdCounter]
if (this.pubSubSubscriptions[topic]) {
this.pubSubSubscriptions[topic] = {
...this.pubSubSubscriptions[topic],
subIds: [...this.pubSubSubscriptions[topic].subIds, this.subIdCounter]
}

@@ -51,7 +56,18 @@ } else {

sub.on('message', async (message) => {
// convert message data to json
let data
try {
data = JSON.parse(message.data.toString())
} catch(err) {
console.error(err)
// drop this message as it is definitely invalid
message.ack()
return
}
// get a list of downstream subscriptions to relay the message to
try {
self.pubsubSubscriptions.subIds.forEach(subID => {
self.pubSubSubscriptions[topic].subIds.forEach(subID => {
const [, onMessage] = self.subscriptions[subID]
onMessage(message.data)
onMessage(data)
})

@@ -69,4 +85,4 @@ } catch(err) {

this.pubsubSubscriptions[topic] = {
...this.pubsubSubscriptions[topic],
this.pubSubSubscriptions[topic] = {
...this.pubSubSubscriptions[topic],
subscription: sub,

@@ -77,5 +93,5 @@ subIds: [this.subIdCounter]

} else {
this.pubsubSubscriptions[topic] = {
...this.pubsubSubscriptions[topic],
subIds: [...this.pubsubSubscriptions[topic].subIds, this.subIdCounter]
this.pubSubSubscriptions[topic] = {
...this.pubSubSubscriptions[topic],
subIds: [...this.pubSubSubscriptions[topic].subIds, this.subIdCounter]
}

@@ -91,12 +107,12 @@ }

const pubsubSubscription = this.pubsubSubscriptions[topic]
const pubsubSubscription = this.pubSubSubscriptions[topic]
if (!pubsubSubscription) return subId
if (pubsubSubscription.subIds.length === 1) {
delete this.pubsubSubscriptions[topic]
delete this.pubSubSubscriptions[topic]
}
this.pubsubSubscriptions[topic] = {
...this.pubsubSubscriptions[topic],
subIds: this.pubsubSubscriptions[topic].subIds.filter(id => id !== subId)
this.pubSubSubscriptions[topic] = {
...this.pubSubSubscriptions[topic],
subIds: this.pubSubSubscriptions[topic].subIds.filter(id => id !== subId)
}

@@ -103,0 +119,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc