graphql-kafkajs-subscriptions
Advanced tools
Comparing version 1.1.0 to 2.0.0
@@ -5,2 +5,16 @@ # Changelog | ||
## [2.0.0](https://github.com/tomasAlabes/graphql-kafkajs-subscriptions/compare/v1.0.1...v2.0.0) (2020-05-29) | ||
### ⚠ BREAKING CHANGES | ||
* onMessage receives a KafkaMessage as param now | ||
### Features | ||
* optional kafkajs producer send params ([79fe0ea](https://github.com/tomasAlabes/graphql-kafkajs-subscriptions/commit/79fe0eac83dbbb8e72716d8c69c8eb287e28d96b)) | ||
* not meddling with the supplied event payload ([5b9fd52](https://github.com/tomasAlabes/graphql-kafkajs-subscriptions/commit/5b9fd52d2243f7749abf817673e8478345e3565b)) | ||
## [1.1.0](https://github.com/tomasAlabes/graphql-kafkajs-subscriptions/compare/v1.0.1...v1.1.0) (2020-05-04) | ||
@@ -7,0 +21,0 @@ |
@@ -1,1 +0,1 @@ | ||
export { KafkaPubSub } from "./kafka-pubsub"; | ||
export { KafkaPubSub, MessageHandler } from "./kafka-pubsub"; |
@@ -0,3 +1,4 @@ | ||
/// <reference types="node" /> | ||
import { PubSubEngine } from "graphql-subscriptions"; | ||
import { Kafka, IHeaders } from "kafkajs"; | ||
import { Kafka, IHeaders, KafkaMessage } from "kafkajs"; | ||
interface KafkaPubSubInput { | ||
@@ -8,2 +9,3 @@ kafka: Kafka; | ||
} | ||
export declare type MessageHandler = (msg: KafkaMessage) => any; | ||
export declare class KafkaPubSub implements PubSubEngine { | ||
@@ -18,4 +20,4 @@ private client; | ||
private constructor(); | ||
publish(channel: string, payload: object, headers?: IHeaders, sendOptions?: object): Promise<void>; | ||
subscribe(channel: string, onMessage: Function, options?: any): Promise<number>; | ||
publish(channel: string, payload: string | Buffer, headers?: IHeaders, sendOptions?: object): Promise<void>; | ||
subscribe(channel: string, onMessage: MessageHandler, _?: any): Promise<number>; | ||
unsubscribe(index: number): void; | ||
@@ -22,0 +24,0 @@ asyncIterator<T>(triggers: string | string[]): AsyncIterator<T>; |
@@ -49,13 +49,2 @@ "use strict"; | ||
}; | ||
var __rest = (this && this.__rest) || function (s, e) { | ||
var t = {}; | ||
for (var p in s) if (Object.prototype.hasOwnProperty.call(s, p) && e.indexOf(p) < 0) | ||
t[p] = s[p]; | ||
if (s != null && typeof Object.getOwnPropertySymbols === "function") | ||
for (var i = 0, p = Object.getOwnPropertySymbols(s); i < p.length; i++) { | ||
if (e.indexOf(p[i]) < 0 && Object.prototype.propertyIsEnumerable.call(s, p[i])) | ||
t[p[i]] = s[p[i]]; | ||
} | ||
return t; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
@@ -100,4 +89,4 @@ var pubsub_async_iterator_1 = require("./pubsub-async-iterator"); | ||
{ | ||
value: Buffer.from(JSON.stringify(__assign({ channel: channel }, payload))), | ||
headers: headers, | ||
value: payload, | ||
headers: __assign(__assign({}, headers), { channel: channel }), | ||
}, | ||
@@ -112,3 +101,3 @@ ], topic: this.topic }, sendOptions))]; | ||
}; | ||
KafkaPubSub.prototype.subscribe = function (channel, onMessage, options) { | ||
KafkaPubSub.prototype.subscribe = function (channel, onMessage, _) { | ||
return __awaiter(this, void 0, void 0, function () { | ||
@@ -169,11 +158,9 @@ var index; | ||
return __awaiter(_this, void 0, void 0, function () { | ||
var parsedMessage, channel, payload; | ||
return __generator(this, function (_b) { | ||
parsedMessage = JSON.parse(message.value.toString()); | ||
if (parsedMessage.channel) { | ||
channel = parsedMessage.channel, payload = __rest(parsedMessage, ["channel"]); | ||
this.onMessage(channel, payload); | ||
var _b; | ||
return __generator(this, function (_c) { | ||
if ((_b = message.headers) === null || _b === void 0 ? void 0 : _b.channel) { | ||
this.onMessage(message.headers.channel, message); | ||
} | ||
else { | ||
this.onMessage(topic, parsedMessage); | ||
this.onMessage(topic, message); | ||
} | ||
@@ -180,0 +167,0 @@ return [2]; |
"use strict"; | ||
var __assign = (this && this.__assign) || function () { | ||
__assign = Object.assign || function(t) { | ||
for (var s, i = 1, n = arguments.length; i < n; i++) { | ||
s = arguments[i]; | ||
for (var p in s) if (Object.prototype.hasOwnProperty.call(s, p)) | ||
t[p] = s[p]; | ||
} | ||
return t; | ||
}; | ||
return __assign.apply(this, arguments); | ||
}; | ||
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { | ||
@@ -42,3 +53,3 @@ function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } | ||
describe("Test Suite", function () { | ||
it("should test basic pub sub", function () { return __awaiter(void 0, void 0, void 0, function () { | ||
it("should test basic pub sub with buffer payload", function () { return __awaiter(void 0, void 0, void 0, function () { | ||
var topic, channel, payload, onMessage, pubsub; | ||
@@ -50,4 +61,4 @@ return __generator(this, function (_a) { | ||
channel = "my_channel"; | ||
payload = { data: 1 }; | ||
onMessage = jest.fn(function (channel, msg) { }); | ||
payload = Buffer.from(JSON.stringify({ data: 1 })); | ||
onMessage = jest.fn(function (msg) { }); | ||
return [4, index_1.KafkaPubSub.create({ | ||
@@ -67,3 +78,6 @@ groupIdPrefix: "my-prefix", | ||
expect(onMessage).toBeCalled(); | ||
expect(onMessage).toBeCalledWith(payload); | ||
expect(onMessage).toBeCalledWith({ | ||
value: payload, | ||
headers: { channel: channel }, | ||
}); | ||
return [2]; | ||
@@ -73,3 +87,66 @@ } | ||
}); }); | ||
it("should test basic pub sub with stringified payload", function () { return __awaiter(void 0, void 0, void 0, function () { | ||
var topic, channel, payload, onMessage, pubsub; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
topic = "mock_topic"; | ||
channel = "my_channel"; | ||
payload = JSON.stringify({ data: 1 }); | ||
onMessage = jest.fn(function (msg) { }); | ||
return [4, index_1.KafkaPubSub.create({ | ||
groupIdPrefix: "my-prefix", | ||
kafka: new InMemoryKafka_1.Kafka(), | ||
topic: topic, | ||
})]; | ||
case 1: | ||
pubsub = _a.sent(); | ||
return [4, pubsub.subscribe(channel, onMessage)]; | ||
case 2: | ||
_a.sent(); | ||
return [4, pubsub.publish(channel, payload)]; | ||
case 3: | ||
_a.sent(); | ||
expect(onMessage).toBeCalled(); | ||
expect(onMessage).toBeCalledWith({ | ||
value: payload, | ||
headers: { channel: channel }, | ||
}); | ||
return [2]; | ||
} | ||
}); | ||
}); }); | ||
it("should test basic pub sub with custom headers", function () { return __awaiter(void 0, void 0, void 0, function () { | ||
var topic, channel, payload, onMessage, pubsub, headers; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
topic = "mock_topic"; | ||
channel = "my_channel"; | ||
payload = JSON.stringify({ data: 1 }); | ||
onMessage = jest.fn(function (msg) { }); | ||
return [4, index_1.KafkaPubSub.create({ | ||
groupIdPrefix: "my-prefix", | ||
kafka: new InMemoryKafka_1.Kafka(), | ||
topic: topic, | ||
})]; | ||
case 1: | ||
pubsub = _a.sent(); | ||
return [4, pubsub.subscribe(channel, onMessage)]; | ||
case 2: | ||
_a.sent(); | ||
headers = { custom: "header" }; | ||
return [4, pubsub.publish(channel, payload, headers)]; | ||
case 3: | ||
_a.sent(); | ||
expect(onMessage).toBeCalled(); | ||
expect(onMessage).toBeCalledWith({ | ||
value: payload, | ||
headers: __assign(__assign({}, headers), { channel: channel }), | ||
}); | ||
return [2]; | ||
} | ||
}); | ||
}); }); | ||
}); | ||
//# sourceMappingURL=kafka-pubsub.spec.js.map |
{ | ||
"name": "graphql-kafkajs-subscriptions", | ||
"version": "1.1.0", | ||
"version": "2.0.0", | ||
"description": "Apollo graphql subscription over Kafka protocol", | ||
@@ -63,7 +63,7 @@ "keywords": [ | ||
"husky": "^4.2.3", | ||
"jest": "^25.4.0", | ||
"jest": "26.0.0", | ||
"kafkajs": "^1.12.0", | ||
"prettier": "2.0.5", | ||
"standard-version": "^7.1.0", | ||
"ts-jest": "^24.2.0", | ||
"ts-jest": "26.0.0", | ||
"tslint": "5.20.1", | ||
@@ -70,0 +70,0 @@ "typescript": "^3.8.3" |
@@ -38,4 +38,5 @@ # graphql-kafkajs-subscriptions | ||
collaboration: { | ||
resolve: (payload: YourType) => { | ||
return payload; | ||
resolve: (payload: KafkaMessage) => { | ||
// payload.value will be whatever you sent | ||
return payload.value; | ||
}, | ||
@@ -54,5 +55,5 @@ subscribe: (_, args) => { | ||
collaboration: { | ||
resolve: (payload: YourType) => { | ||
resolve: (payload: KafkaMessage) => { | ||
// what you publish will end up passing through here and to the client | ||
return payload; | ||
return payload.value; | ||
}, | ||
@@ -77,4 +78,17 @@ subscribe: (_, args) => { | ||
Use the rest of the kafkajs options: | ||
```javascript | ||
const event = {/* ... */}; | ||
const headers = { | ||
header1: "value" | ||
}; | ||
const producerOptions = { /* options from kafka.js.org/docs/producing: acks, timeout, etc */ }; | ||
pubsub.publish("my channel", event, headers, producerOptions); | ||
``` | ||
This ends up publishing the event to kafka (to the topic you used to create the `kafkaPubSub`) | ||
and received by all consumers. The consumer which is listening to `my channel` will send it | ||
to the client. |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
49292
720
92