fastify-sse-v2
Advanced tools
Comparing version 1.0.4 to 1.0.5
@@ -9,2 +9,6 @@ # Changelog | ||
## [1.0.5] | ||
## Fixed | ||
- use response stream directly | ||
## [1.0.4] | ||
@@ -26,4 +30,5 @@ ## Fixed | ||
[Unreleased]: https://github.com/nodefactoryio/fastify-sse-v2/compare/v1.0.4...HEAD | ||
[Unreleased]: https://github.com/nodefactoryio/fastify-sse-v2/compare/v1.0.3...v1.0.4 | ||
[Unreleased]: https://github.com/nodefactoryio/fastify-sse-v2/compare/v1.0.5...HEAD | ||
[1.0.5]: https://github.com/nodefactoryio/fastify-sse-v2/compare/v1.0.4...v1.0.5 | ||
[1.0.4]: https://github.com/nodefactoryio/fastify-sse-v2/compare/v1.0.3...v1.0.4 | ||
[1.0.3]: https://github.com/nodefactoryio/fastify-sse-v2/compare/v1.0.2...v1.0.3 | ||
@@ -30,0 +35,0 @@ [1.0.2]: https://github.com/nodefactoryio/fastify-sse-v2/compare/v1.0.1...v1.0.2 |
@@ -16,8 +16,7 @@ "use strict"; | ||
instance.decorateReply("sse", function (source) { | ||
const outputStream = (0, _sse.getOutputStream)(); | ||
outputStream.push((0, _sse.serializeSSEEvent)({ | ||
this.res.write((0, _sse.serializeSSEEvent)({ | ||
retry: options.retryDelay || 3000 | ||
})); | ||
this.type("text/event-stream").header("Connection", "keep-alive").header("Cache-Control", "no-cache").send(outputStream); | ||
(0, _itToStream.default)((0, _sse.transformAsyncIterable)(source)).pipe(outputStream); | ||
this.type("text/event-stream").header("Connection", "keep-alive").header("Cache-Control", "no-cache"); | ||
(0, _itToStream.default)((0, _sse.transformAsyncIterable)(source)).pipe(this.res); | ||
}); | ||
@@ -24,0 +23,0 @@ }; |
@@ -1,7 +0,3 @@ | ||
/// <reference types="node" /> | ||
import { Transform } from "stream"; | ||
import { EventMessage } from "fastify"; | ||
export declare function getOutputStream(): Transform; | ||
export declare function transformEventStream(chunk: string, encoding: string, callback: (error?: (Error | null), data?: string) => void): void; | ||
export declare function transformAsyncIterable(source: AsyncIterable<EventMessage>): AsyncIterable<string>; | ||
export declare function serializeSSEEvent(chunk: EventMessage): string; |
@@ -6,31 +6,5 @@ "use strict"; | ||
}); | ||
exports.getOutputStream = getOutputStream; | ||
exports.transformEventStream = transformEventStream; | ||
exports.transformAsyncIterable = transformAsyncIterable; | ||
exports.serializeSSEEvent = serializeSSEEvent; | ||
var _stream = require("stream"); | ||
function getOutputStream() { | ||
return new _stream.PassThrough({ | ||
flush(callback) { | ||
callback(null, serializeSSEEvent({ | ||
event: "end", | ||
data: "Stream closed" | ||
})); | ||
} | ||
}); | ||
} | ||
function transformEventStream(chunk, encoding, callback) { | ||
try { | ||
callback(null, serializeSSEEvent({ | ||
data: chunk | ||
})); | ||
} catch (e) { | ||
callback(e); | ||
} | ||
} | ||
async function* transformAsyncIterable(source) { | ||
@@ -40,2 +14,7 @@ for await (const message of source) { | ||
} | ||
yield serializeSSEEvent({ | ||
event: "end", | ||
data: "Stream closed" | ||
}); | ||
} | ||
@@ -42,0 +21,0 @@ |
{ | ||
"name": "fastify-sse-v2", | ||
"version": "1.0.4", | ||
"version": "1.0.5", | ||
"description": "Fastify plugin for sending server side events.", | ||
@@ -51,3 +51,3 @@ "main": "lib/index.js", | ||
"eventsource": "^1.0.7", | ||
"fastify": "^2.13.0", | ||
"fastify": "^2.15.2", | ||
"it-pushable": "^1.4.0", | ||
@@ -54,0 +54,0 @@ "mocha": "^6.1.4", |
@@ -50,3 +50,3 @@ # Fastify SSE Plugin | ||
res.sse(new EventIterator( | ||
({ push }) => { | ||
(push) => { | ||
eventEmitter.on("some_event", push) | ||
@@ -59,1 +59,6 @@ return () => eventEmitter.removeEventListener("some_event", push) | ||
``` | ||
##### Note | ||
- to remove event listeners (or some other cleanup) when client closes connection, | ||
you can listen on fastify's [req.raw connection close event](https://nodejs.org/docs/latest-v12.x/api/http.html#http_event_close_2): | ||
import {EventMessage, FastifyReply, Plugin} from "fastify"; | ||
import {IncomingMessage, Server, ServerResponse} from "http"; | ||
import {SsePluginOptions} from "./types"; | ||
import {getOutputStream, serializeSSEEvent, transformAsyncIterable} from "./sse"; | ||
import {serializeSSEEvent, transformAsyncIterable} from "./sse"; | ||
import toStream from "it-to-stream"; | ||
@@ -9,13 +9,11 @@ | ||
async function (instance, options): Promise<void> { | ||
instance.decorateReply( | ||
instance.decorateReply( | ||
"sse", | ||
function (this: FastifyReply<ServerResponse>, source: AsyncIterable<EventMessage>): void { | ||
const outputStream = getOutputStream(); | ||
outputStream.push(serializeSSEEvent({retry: options.retryDelay || 3000})); | ||
this.res.write(serializeSSEEvent({retry: options.retryDelay || 3000})); | ||
this.type("text/event-stream") | ||
.header("Connection", "keep-alive") | ||
.header("Cache-Control", "no-cache") | ||
.send(outputStream); | ||
toStream(transformAsyncIterable(source)).pipe(outputStream); | ||
.header("Cache-Control", "no-cache"); | ||
toStream(transformAsyncIterable(source)).pipe(this.res); | ||
}); | ||
}; | ||
}; |
@@ -1,24 +0,3 @@ | ||
import {PassThrough, Transform} from "stream"; | ||
import {EventMessage} from "fastify"; | ||
export function getOutputStream(): Transform { | ||
return new PassThrough({ | ||
flush(callback: (error?: (Error | null), data?: unknown) => void): void { | ||
callback(null, serializeSSEEvent({event: "end", data: "Stream closed"})); | ||
} | ||
}); | ||
} | ||
export function transformEventStream( | ||
chunk: string, encoding: string, callback: (error?: (Error | null), data?: string) => void | ||
): void { | ||
try { | ||
callback(null, serializeSSEEvent({ | ||
data: chunk | ||
})); | ||
} catch (e) { | ||
callback(e); | ||
} | ||
} | ||
export async function* transformAsyncIterable(source: AsyncIterable<EventMessage>): AsyncIterable<string> { | ||
@@ -28,2 +7,3 @@ for await (const message of source) { | ||
} | ||
yield serializeSSEEvent({event: "end", data: "Stream closed"}); | ||
} | ||
@@ -30,0 +10,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
63
17424
195