graphql-kafka-subscriptions
Advanced tools
Comparing version 0.1.0 to 0.2.1
@@ -13,17 +13,20 @@ "use strict"; | ||
var Kafka = require("node-rdkafka"); | ||
var Logger = require("bunyan"); | ||
var child_logger_1 = require("./child-logger"); | ||
var pubsub_async_iterator_1 = require("./pubsub-async-iterator"); | ||
var defaultLogger = Logger.createLogger({ | ||
name: 'pubsub', | ||
stream: process.stdout, | ||
level: 'info' | ||
}); | ||
var KafkaPubSub = (function () { | ||
function KafkaPubSub(options) { | ||
var _this = this; | ||
this.options = options; | ||
this.subscriptionMap = {}; | ||
this.channelSubscriptions = {}; | ||
this.producer = this.createProducer(this.options.topic); | ||
this.consumer = this.createConsumer(this.options.topic); | ||
this.consumer.on('data', function (message) { | ||
console.log('Got message'); | ||
_this.onMessage(JSON.parse(message.value.toString())); | ||
}); | ||
this.logger = child_logger_1.createChildLogger(this.options.logger || defaultLogger, 'KafkaPubSub'); | ||
} | ||
KafkaPubSub.prototype.publish = function (payload) { | ||
this.producer = this.producer || this.createProducer(this.options.topic); | ||
return this.producer.write(new Buffer(JSON.stringify(payload))); | ||
@@ -46,4 +49,3 @@ }; | ||
}; | ||
KafkaPubSub.prototype.onMessage = function (_a) { | ||
var channel = _a.channel, message = __rest(_a, ["channel"]); | ||
KafkaPubSub.prototype.onMessage = function (channel, message) { | ||
var subscriptions = this.channelSubscriptions[channel]; | ||
@@ -55,3 +57,3 @@ if (!subscriptions) { | ||
var subId = subscriptions_1[_i]; | ||
var _b = this.subscriptionMap[subId], cnl = _b[0], listener = _b[1]; | ||
var _a = this.subscriptionMap[subId], cnl = _a[0], listener = _a[1]; | ||
listener(message); | ||
@@ -61,2 +63,3 @@ } | ||
KafkaPubSub.prototype.createProducer = function (topic) { | ||
var _this = this; | ||
var producer = Kafka.Producer.createWriteStream({ | ||
@@ -66,4 +69,3 @@ 'metadata.broker.list': this.options.host + ":" + this.options.port | ||
producer.on('error', function (err) { | ||
console.error('Error in our kafka stream'); | ||
console.error(err); | ||
_this.logger.error(err, 'Error in our kafka stream'); | ||
}); | ||
@@ -73,5 +75,6 @@ return producer; | ||
KafkaPubSub.prototype.createConsumer = function (topic) { | ||
var randomGroupId = Math.ceil(Math.random() * 9999); | ||
var _this = this; | ||
var groupId = this.options.groupId || Math.ceil(Math.random() * 9999); | ||
var consumer = Kafka.KafkaConsumer.createReadStream({ | ||
'group.id': "kafka-group-" + randomGroupId, | ||
'group.id': "kafka-group-" + groupId, | ||
'metadata.broker.list': this.options.host + ":" + this.options.port, | ||
@@ -81,2 +84,12 @@ }, {}, { | ||
}); | ||
consumer.on('data', function (message) { | ||
var parsedMessage = JSON.parse(message.value.toString()); | ||
if (parsedMessage.channel) { | ||
var channel = parsedMessage.channel, payload = __rest(parsedMessage, ["channel"]); | ||
_this.onMessage(parsedMessage.channel, payload); | ||
} | ||
else { | ||
_this.onMessage(topic, parsedMessage); | ||
} | ||
}); | ||
return consumer; | ||
@@ -83,0 +96,0 @@ }; |
{ | ||
"name": "graphql-kafka-subscriptions", | ||
"version": "0.1.0", | ||
"version": "0.2.1", | ||
"description": "Apollo graphql subscription over Kafka protocol", | ||
"main": "dist/index.js", | ||
"scripts": { | ||
"test": "echo \"Error: no test specified\" && exit 1", | ||
"test": "jest --watchAll", | ||
"build": "tsc", | ||
@@ -27,11 +27,29 @@ "watch": "tsc -w" | ||
"homepage": "https://github.com/ancashoria/graphql-kafka-subscriptions#readme", | ||
"jest": { | ||
"roots": [ | ||
"<rootDir>/src/test" | ||
], | ||
"moduleFileExtensions": [ | ||
"ts", | ||
"tsx", | ||
"js" | ||
], | ||
"transform": { | ||
"\\.(ts|tsx)$": "<rootDir>/node_modules/ts-jest/preprocessor.js" | ||
}, | ||
"testRegex": "(/__tests__/.*|\\.(test|spec))\\.(tsx?|jsx?)$" | ||
}, | ||
"dependencies": { | ||
"bunyan": "1.8.12", | ||
"graphql-subscriptions": "^0.5.0", | ||
"iterall": "^1.1.1", | ||
"node-rdkafka": "^2.0.0" | ||
"node-rdkafka": "^2.1.0" | ||
}, | ||
"devDependencies": { | ||
"@types/jest": "^21.1.1", | ||
"@types/node": "^8.0.28", | ||
"jest": "^21.2.1", | ||
"ts-jest": "^21.0.1", | ||
"typescript": "^2.5.2" | ||
} | ||
} |
# graphql-kafka-subscriptions | ||
**Apollo graphql subscriptions over Kafka protocol** | ||
One producer and one consumer for each node instance. Communication happens over a single kafka topic. | ||
## Installation | ||
`npm install graphql-kafka-subscriptions` | ||
## Usage | ||
``` | ||
```javascript | ||
import { KafkaPubSub } from 'graphql-kafka-subscriptions' | ||
@@ -14,2 +21,6 @@ | ||
WIP | ||
Special thanks to: | ||
- [davidyaha](https://github.com/davidyaha) for [graphql-redis-subscriptions](https://github.com/davidyaha/graphql-redis-subscriptions) which was the main inspiration point for this project | ||
- [Apollo graphql community](http://dev.apollodata.com/community/) | ||
Help greatly appreciated |
Sorry, the diff of this file is not supported yet
No tests
QualityPackage does not have any tests. This is a strong signal of a poorly maintained or low quality package.
Found 1 instance in 1 package
32202
16
408
1
25
4
5
+ Addedbunyan@1.8.12
+ Addedbalanced-match@1.0.2(transitive)
+ Addedbrace-expansion@1.1.11(transitive)
+ Addedbunyan@1.8.12(transitive)
+ Addedconcat-map@0.0.1(transitive)
+ Addeddtrace-provider@0.8.8(transitive)
+ Addedglob@6.0.4(transitive)
+ Addedinflight@1.0.6(transitive)
+ Addedinherits@2.0.4(transitive)
+ Addedminimatch@3.1.2(transitive)
+ Addedminimist@1.2.8(transitive)
+ Addedmkdirp@0.5.6(transitive)
+ Addedmoment@2.30.1(transitive)
+ Addedmv@2.1.1(transitive)
+ Addedncp@2.0.0(transitive)
+ Addedonce@1.4.0(transitive)
+ Addedpath-is-absolute@1.0.1(transitive)
+ Addedrimraf@2.4.5(transitive)
+ Addedsafe-json-stringify@1.2.0(transitive)
+ Addedwrappy@1.0.2(transitive)
Updatednode-rdkafka@^2.1.0