fastify-sse-v2
Advanced tools
Comparing version
@@ -9,2 +9,6 @@ # Changelog | ||
## [1.0.5] | ||
## Fixed | ||
- use response stream directly | ||
## [1.0.4] | ||
@@ -26,4 +30,5 @@ ## Fixed | ||
[Unreleased]: https://github.com/nodefactoryio/fastify-sse-v2/compare/v1.0.4...HEAD | ||
[Unreleased]: https://github.com/nodefactoryio/fastify-sse-v2/compare/v1.0.3...v1.0.4 | ||
[Unreleased]: https://github.com/nodefactoryio/fastify-sse-v2/compare/v1.0.5...HEAD | ||
[1.0.5]: https://github.com/nodefactoryio/fastify-sse-v2/compare/v1.0.4...v1.0.5 | ||
[1.0.4]: https://github.com/nodefactoryio/fastify-sse-v2/compare/v1.0.3...v1.0.4 | ||
[1.0.3]: https://github.com/nodefactoryio/fastify-sse-v2/compare/v1.0.2...v1.0.3 | ||
@@ -30,0 +35,0 @@ [1.0.2]: https://github.com/nodefactoryio/fastify-sse-v2/compare/v1.0.1...v1.0.2 |
@@ -16,8 +16,7 @@ "use strict"; | ||
instance.decorateReply("sse", function (source) { | ||
const outputStream = (0, _sse.getOutputStream)(); | ||
outputStream.push((0, _sse.serializeSSEEvent)({ | ||
this.res.write((0, _sse.serializeSSEEvent)({ | ||
retry: options.retryDelay || 3000 | ||
})); | ||
this.type("text/event-stream").header("Connection", "keep-alive").header("Cache-Control", "no-cache").send(outputStream); | ||
(0, _itToStream.default)((0, _sse.transformAsyncIterable)(source)).pipe(outputStream); | ||
this.type("text/event-stream").header("Connection", "keep-alive").header("Cache-Control", "no-cache"); | ||
(0, _itToStream.default)((0, _sse.transformAsyncIterable)(source)).pipe(this.res); | ||
}); | ||
@@ -24,0 +23,0 @@ }; |
@@ -1,7 +0,3 @@ | ||
/// <reference types="node" /> | ||
import { Transform } from "stream"; | ||
import { EventMessage } from "fastify"; | ||
export declare function getOutputStream(): Transform; | ||
export declare function transformEventStream(chunk: string, encoding: string, callback: (error?: (Error | null), data?: string) => void): void; | ||
export declare function transformAsyncIterable(source: AsyncIterable<EventMessage>): AsyncIterable<string>; | ||
export declare function serializeSSEEvent(chunk: EventMessage): string; |
@@ -6,31 +6,5 @@ "use strict"; | ||
}); | ||
exports.getOutputStream = getOutputStream; | ||
exports.transformEventStream = transformEventStream; | ||
exports.transformAsyncIterable = transformAsyncIterable; | ||
exports.serializeSSEEvent = serializeSSEEvent; | ||
var _stream = require("stream"); | ||
function getOutputStream() { | ||
return new _stream.PassThrough({ | ||
flush(callback) { | ||
callback(null, serializeSSEEvent({ | ||
event: "end", | ||
data: "Stream closed" | ||
})); | ||
} | ||
}); | ||
} | ||
function transformEventStream(chunk, encoding, callback) { | ||
try { | ||
callback(null, serializeSSEEvent({ | ||
data: chunk | ||
})); | ||
} catch (e) { | ||
callback(e); | ||
} | ||
} | ||
async function* transformAsyncIterable(source) { | ||
@@ -40,2 +14,7 @@ for await (const message of source) { | ||
} | ||
yield serializeSSEEvent({ | ||
event: "end", | ||
data: "Stream closed" | ||
}); | ||
} | ||
@@ -42,0 +21,0 @@ |
{ | ||
"name": "fastify-sse-v2", | ||
"version": "1.0.4", | ||
"version": "1.0.5", | ||
"description": "Fastify plugin for sending server side events.", | ||
@@ -51,3 +51,3 @@ "main": "lib/index.js", | ||
"eventsource": "^1.0.7", | ||
"fastify": "^2.13.0", | ||
"fastify": "^2.15.2", | ||
"it-pushable": "^1.4.0", | ||
@@ -54,0 +54,0 @@ "mocha": "^6.1.4", |
@@ -50,3 +50,3 @@ # Fastify SSE Plugin | ||
res.sse(new EventIterator( | ||
({ push }) => { | ||
(push) => { | ||
eventEmitter.on("some_event", push) | ||
@@ -59,1 +59,6 @@ return () => eventEmitter.removeEventListener("some_event", push) | ||
``` | ||
##### Note | ||
- to remove event listeners (or some other cleanup) when client closes connection, | ||
you can listen on fastify's [req.raw connection close event](https://nodejs.org/docs/latest-v12.x/api/http.html#http_event_close_2): | ||
import {EventMessage, FastifyReply, Plugin} from "fastify"; | ||
import {IncomingMessage, Server, ServerResponse} from "http"; | ||
import {SsePluginOptions} from "./types"; | ||
import {getOutputStream, serializeSSEEvent, transformAsyncIterable} from "./sse"; | ||
import {serializeSSEEvent, transformAsyncIterable} from "./sse"; | ||
import toStream from "it-to-stream"; | ||
@@ -9,13 +9,11 @@ | ||
async function (instance, options): Promise<void> { | ||
instance.decorateReply( | ||
instance.decorateReply( | ||
"sse", | ||
function (this: FastifyReply<ServerResponse>, source: AsyncIterable<EventMessage>): void { | ||
const outputStream = getOutputStream(); | ||
outputStream.push(serializeSSEEvent({retry: options.retryDelay || 3000})); | ||
this.res.write(serializeSSEEvent({retry: options.retryDelay || 3000})); | ||
this.type("text/event-stream") | ||
.header("Connection", "keep-alive") | ||
.header("Cache-Control", "no-cache") | ||
.send(outputStream); | ||
toStream(transformAsyncIterable(source)).pipe(outputStream); | ||
.header("Cache-Control", "no-cache"); | ||
toStream(transformAsyncIterable(source)).pipe(this.res); | ||
}); | ||
}; | ||
}; |
@@ -1,24 +0,3 @@ | ||
import {PassThrough, Transform} from "stream"; | ||
import {EventMessage} from "fastify"; | ||
export function getOutputStream(): Transform { | ||
return new PassThrough({ | ||
flush(callback: (error?: (Error | null), data?: unknown) => void): void { | ||
callback(null, serializeSSEEvent({event: "end", data: "Stream closed"})); | ||
} | ||
}); | ||
} | ||
export function transformEventStream( | ||
chunk: string, encoding: string, callback: (error?: (Error | null), data?: string) => void | ||
): void { | ||
try { | ||
callback(null, serializeSSEEvent({ | ||
data: chunk | ||
})); | ||
} catch (e) { | ||
callback(e); | ||
} | ||
} | ||
export async function* transformAsyncIterable(source: AsyncIterable<EventMessage>): AsyncIterable<string> { | ||
@@ -28,2 +7,3 @@ for await (const message of source) { | ||
} | ||
yield serializeSSEEvent({event: "end", data: "Stream closed"}); | ||
} | ||
@@ -30,0 +10,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
63
8.62%17424
-11.02%195
-17.72%