@steelbreeze/broker
Advanced tools
Comparing version 1.0.0-beta.4 to 1.0.0-beta.5
@@ -1,11 +0,14 @@ | ||
interface IConfig { | ||
/** Specifies the location of a message broker server. */ | ||
interface HTTPConfig { | ||
/** The host that the message brokers express application is running on. */ | ||
host: string; | ||
/** The port of the host that the message brokers express application is running on. */ | ||
port: number; | ||
/** The base URL of the message broker. */ | ||
path: string; | ||
} | ||
interface IClient { | ||
publish(topicName: string, data: string, type: string): void; | ||
subscribe(topicName: string, callback: (data: string) => void): void; | ||
} | ||
export declare function client(config: IConfig): IClient; | ||
export declare function client(config: HTTPConfig): { | ||
publish: (topicName: string, data: string) => void; | ||
subscribe: (topicName: string, callback: (data: string) => void) => void; | ||
}; | ||
export {}; |
@@ -10,16 +10,24 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
// http post used for publishing | ||
var http = __importStar(require("http")); | ||
// EventSource used for subscribing | ||
var EventSource = require('eventsource'); // unable to determine how to import this module in the TypeScript way | ||
var EventSource = require('eventsource'); // TODO: determine how to TypeScript import this module | ||
function client(config) { | ||
var lastEventId = -1; | ||
/** | ||
* Publishes a message to a message broker for other client to receive based on their subscriptions. | ||
* @param topicName The topic to publish on. This may be one or more URL segments. | ||
* @param data The data to publish on the topic | ||
*/ | ||
function publish(topicName, data) { | ||
var post = http.request({ hostname: config.host, port: config.port, path: config.path + "/" + topicName, method: 'POST', headers: { 'Content-Type': 'application/json' } }); | ||
var post = http.request({ hostname: config.host, port: config.port, path: config.path + "/" + topicName, method: 'POST', headers: { 'Content-Type': 'text/plain', 'Content-Length': Buffer.byteLength(data) } }); | ||
post.write(data); | ||
post.end(); | ||
} | ||
/** | ||
* Registers a subscription to messages on the given topic. The callback is called each time a message to published on the same topic. | ||
* @param topicName The topic to subscribe to. This may be one or more URL segments. | ||
* @param callback The function to call when data is publised on the topic. | ||
*/ | ||
function subscribe(topicName, callback) { | ||
var eventSource = new EventSource("http://" + config.host + ":" + config.port + config.path + "/" + topicName); | ||
eventSource.addEventListener('message', function (event) { | ||
var lastEventId = -1; | ||
eventSource.onmessage = function (event) { | ||
if (event.lastEventId !== lastEventId) { | ||
@@ -31,3 +39,3 @@ lastEventId = event.lastEventId; | ||
} | ||
}); | ||
}; | ||
} | ||
@@ -34,0 +42,0 @@ return { publish: publish, subscribe: subscribe }; |
import { Router } from 'express'; | ||
/** | ||
* Creates an instance of a message broker server. | ||
* Many message broker servers may be created; each bound to a different base url path. | ||
* Many message broker servers may be created, each bound to a different base url. | ||
* @param cacheLastMessage When true, subscribers will receive the last message upon subscrition. | ||
* @returns Returns an express Router for use within an express application. | ||
*/ | ||
export declare function server(cacheLastMessage?: boolean): Router; |
@@ -6,4 +6,5 @@ "use strict"; | ||
* Creates an instance of a message broker server. | ||
* Many message broker servers may be created; each bound to a different base url path. | ||
* Many message broker servers may be created, each bound to a different base url. | ||
* @param cacheLastMessage When true, subscribers will receive the last message upon subscrition. | ||
* @returns Returns an express Router for use within an express application. | ||
*/ | ||
@@ -14,10 +15,9 @@ function server(cacheLastMessage) { | ||
var topics = {}; | ||
var messageId = 0; | ||
// find or create a topic given its name | ||
function getTopic(topic) { | ||
return topics[topic] || (topics[topic] = { subscribers: [], data: undefined, lastEventId: undefined }); | ||
return topics[topic] || (topics[topic] = { lastEventId: -1, data: undefined, subscribers: [] }); | ||
} | ||
// write a single message to a client | ||
function serverSentEvent(res, lastEventId, data) { | ||
res.write("event:message\nid:" + lastEventId + "\ndata:" + data + "\n\n"); | ||
res.write("id:" + lastEventId + "\ndata:" + data + "\n\n"); | ||
} | ||
@@ -38,3 +38,3 @@ // GET method is used to subscribe by EventSource clients | ||
// update the client with the last message if available | ||
if (cacheLastMessage && topic.lastEventId && topic.data) { | ||
if (cacheLastMessage && topic.data) { | ||
serverSentEvent(res, topic.lastEventId, topic.data); | ||
@@ -50,14 +50,17 @@ } | ||
req.on('end', function () { | ||
setImmediate(function (topic, lastEventId, data) { | ||
topic.data = data; | ||
topic.lastEventId = lastEventId; | ||
for (var _i = 0, _a = topic.subscribers; _i < _a.length; _i++) { | ||
var subscriber = _a[_i]; | ||
var topic = getTopic(req.url); | ||
// update the topic with the new event details | ||
topic.data = Buffer.concat(body).toString(); | ||
topic.lastEventId++; | ||
// recycle the message id when maxed out | ||
if (topic.lastEventId === Number.MAX_VALUE) { | ||
topic.lastEventId = 0; | ||
} | ||
// queue dispatch of event to all current subscribers | ||
setImmediate(function (subscribers, lastEventId, data) { | ||
for (var _i = 0, subscribers_1 = subscribers; _i < subscribers_1.length; _i++) { | ||
var subscriber = subscribers_1[_i]; | ||
serverSentEvent(subscriber, lastEventId, data); | ||
} | ||
}, getTopic(req.url), messageId++, Buffer.concat(body).toString()); | ||
// recycle the message id when maxed out | ||
if (messageId === Number.MAX_VALUE) { | ||
messageId = 0; | ||
} | ||
}, topic.subscribers, topic.lastEventId, topic.data); | ||
}); | ||
@@ -64,0 +67,0 @@ // send response to publisher |
{ | ||
"name": "@steelbreeze/broker", | ||
"version": "1.0.0-beta.4", | ||
"version": "1.0.0-beta.5", | ||
"description": "Lightweight publish and subscribe using Server-Sent Events for node and express", | ||
@@ -30,6 +30,8 @@ "main": "lib/index.js", | ||
"dependencies": { | ||
"eventsource": "^1.0.5" | ||
}, | ||
"devDependencies": { | ||
"@types/express": "^4.16.0", | ||
"eventsource": "^1.0.5", | ||
"express": "^4.16.3" | ||
} | ||
} | ||
} |
@@ -1,30 +0,24 @@ | ||
// http post used for publishing | ||
import * as http from 'http'; | ||
const EventSource = require('eventsource'); // TODO: determine how to TypeScript import this module | ||
// EventSource used for subscribing | ||
const EventSource = require('eventsource'); // unable to determine how to import this module in the TypeScript way | ||
/** Specifies the location of a message broker server. */ | ||
interface HTTPConfig { | ||
/** The host that the message brokers express application is running on. */ | ||
host: string; | ||
interface IConfig { | ||
host: string; | ||
/** The port of the host that the message brokers express application is running on. */ | ||
port: number; | ||
/** The base URL of the message broker. */ | ||
path: string; | ||
} | ||
interface IEvent { | ||
type: string; | ||
data: string; | ||
lastEventId: number; | ||
origin: string; | ||
} | ||
interface IClient { | ||
publish(topicName: string, data: string, type: string): void; | ||
subscribe(topicName: string, callback: (data: string) => void): void; | ||
} | ||
export function client(config: IConfig): IClient { | ||
var lastEventId = -1; | ||
export function client(config: HTTPConfig) { | ||
/** | ||
* Publishes a message to a message broker for other client to receive based on their subscriptions. | ||
* @param topicName The topic to publish on. This may be one or more URL segments. | ||
* @param data The data to publish on the topic | ||
*/ | ||
function publish(topicName: string, data: string): void { | ||
var post = http.request({ hostname: config.host, port: config.port, path: `${config.path}/${topicName}`, method: 'POST', headers: { 'Content-Type': 'application/json' } }); | ||
var post = http.request({ hostname: config.host, port: config.port, path: `${config.path}/${topicName}`, method: 'POST', headers: { 'Content-Type': 'text/plain', 'Content-Length': Buffer.byteLength(data) } }); | ||
@@ -35,6 +29,12 @@ post.write(data); | ||
/** | ||
* Registers a subscription to messages on the given topic. The callback is called each time a message to published on the same topic. | ||
* @param topicName The topic to subscribe to. This may be one or more URL segments. | ||
* @param callback The function to call when data is publised on the topic. | ||
*/ | ||
function subscribe(topicName: string, callback: (data: string) => void): void { | ||
var eventSource = new EventSource(`http://${config.host}:${config.port}${config.path}/${topicName}`); | ||
var lastEventId = -1; | ||
eventSource.addEventListener('message', function (event: IEvent) { | ||
eventSource.onmessage = (event: { type: string; data: string; lastEventId: number; origin: string; }): void => { | ||
if (event.lastEventId !== lastEventId) { | ||
@@ -47,3 +47,3 @@ lastEventId = event.lastEventId; | ||
} | ||
}); | ||
} | ||
} | ||
@@ -50,0 +50,0 @@ |
@@ -5,5 +5,5 @@ import { Router, Request, Response } from 'express'; | ||
interface ITopic { | ||
lastEventId: number; | ||
data: string | undefined; | ||
subscribers: Response[]; | ||
data: string | undefined; | ||
lastEventId: number | undefined; | ||
} | ||
@@ -13,4 +13,5 @@ | ||
* Creates an instance of a message broker server. | ||
* Many message broker servers may be created; each bound to a different base url path. | ||
* Many message broker servers may be created, each bound to a different base url. | ||
* @param cacheLastMessage When true, subscribers will receive the last message upon subscrition. | ||
* @returns Returns an express Router for use within an express application. | ||
*/ | ||
@@ -20,7 +21,6 @@ export function server(cacheLastMessage: boolean = false): Router { | ||
const topics: { [id: string]: ITopic } = {}; | ||
var messageId = 0; | ||
// find or create a topic given its name | ||
function getTopic(topic: string): ITopic { | ||
return topics[topic] || (topics[topic] = { subscribers: [], data: undefined, lastEventId: undefined }); | ||
return topics[topic] || (topics[topic] = { lastEventId: -1, data: undefined, subscribers: [] }); | ||
} | ||
@@ -30,3 +30,3 @@ | ||
function serverSentEvent(res: Response, lastEventId: number, data: string): void { | ||
res.write(`event:message\nid:${lastEventId}\ndata:${data}\n\n`); | ||
res.write(`id:${lastEventId}\ndata:${data}\n\n`); | ||
} | ||
@@ -52,3 +52,3 @@ | ||
// update the client with the last message if available | ||
if (cacheLastMessage && topic.lastEventId && topic.data) { | ||
if (cacheLastMessage && topic.data) { | ||
serverSentEvent(res, topic.lastEventId, topic.data); | ||
@@ -67,15 +67,19 @@ } | ||
req.on('end', (): void => { | ||
setImmediate((topic: ITopic, lastEventId, data: string) => { | ||
topic.data = data; | ||
topic.lastEventId = lastEventId; | ||
var topic = getTopic(req.url); | ||
for (var subscriber of topic.subscribers) { | ||
serverSentEvent(subscriber, lastEventId, data); | ||
} | ||
}, getTopic(req.url), messageId++, Buffer.concat(body).toString()); | ||
// update the topic with the new event details | ||
topic.data = Buffer.concat(body).toString(); | ||
topic.lastEventId++; | ||
// recycle the message id when maxed out | ||
if(messageId === Number.MAX_VALUE) { | ||
messageId = 0; | ||
if (topic.lastEventId === Number.MAX_VALUE) { | ||
topic.lastEventId = 0; | ||
} | ||
// queue dispatch of event to all current subscribers | ||
setImmediate((subscribers: Response[], lastEventId, data: string) => { | ||
for (var subscriber of subscribers) { | ||
serverSentEvent(subscriber, lastEventId, data); | ||
} | ||
}, topic.subscribers, topic.lastEventId, topic.data); | ||
}); | ||
@@ -82,0 +86,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
21417
1
344
2
- Removed@types/express@^4.16.0
- Removedexpress@^4.16.3
- Removed@types/body-parser@1.19.5(transitive)
- Removed@types/connect@3.4.38(transitive)
- Removed@types/express@4.17.21(transitive)
- Removed@types/express-serve-static-core@4.19.6(transitive)
- Removed@types/http-errors@2.0.4(transitive)
- Removed@types/mime@1.3.5(transitive)
- Removed@types/node@22.9.3(transitive)
- Removed@types/qs@6.9.17(transitive)
- Removed@types/range-parser@1.2.7(transitive)
- Removed@types/send@0.17.4(transitive)
- Removed@types/serve-static@1.15.7(transitive)
- Removedaccepts@1.3.8(transitive)
- Removedarray-flatten@1.1.1(transitive)
- Removedbody-parser@1.20.3(transitive)
- Removedbytes@3.1.2(transitive)
- Removedcall-bind@1.0.7(transitive)
- Removedcontent-disposition@0.5.4(transitive)
- Removedcontent-type@1.0.5(transitive)
- Removedcookie@0.7.1(transitive)
- Removedcookie-signature@1.0.6(transitive)
- Removeddebug@2.6.9(transitive)
- Removeddefine-data-property@1.1.4(transitive)
- Removeddepd@2.0.0(transitive)
- Removeddestroy@1.2.0(transitive)
- Removedee-first@1.1.1(transitive)
- Removedencodeurl@1.0.22.0.0(transitive)
- Removedes-define-property@1.0.0(transitive)
- Removedes-errors@1.3.0(transitive)
- Removedescape-html@1.0.3(transitive)
- Removedetag@1.8.1(transitive)
- Removedexpress@4.21.1(transitive)
- Removedfinalhandler@1.3.1(transitive)
- Removedforwarded@0.2.0(transitive)
- Removedfresh@0.5.2(transitive)
- Removedfunction-bind@1.1.2(transitive)
- Removedget-intrinsic@1.2.4(transitive)
- Removedgopd@1.0.1(transitive)
- Removedhas-property-descriptors@1.0.2(transitive)
- Removedhas-proto@1.0.3(transitive)
- Removedhas-symbols@1.0.3(transitive)
- Removedhasown@2.0.2(transitive)
- Removedhttp-errors@2.0.0(transitive)
- Removediconv-lite@0.4.24(transitive)
- Removedinherits@2.0.4(transitive)
- Removedipaddr.js@1.9.1(transitive)
- Removedmedia-typer@0.3.0(transitive)
- Removedmerge-descriptors@1.0.3(transitive)
- Removedmethods@1.1.2(transitive)
- Removedmime@1.6.0(transitive)
- Removedmime-db@1.52.0(transitive)
- Removedmime-types@2.1.35(transitive)
- Removedms@2.0.02.1.3(transitive)
- Removednegotiator@0.6.3(transitive)
- Removedobject-inspect@1.13.3(transitive)
- Removedon-finished@2.4.1(transitive)
- Removedparseurl@1.3.3(transitive)
- Removedpath-to-regexp@0.1.10(transitive)
- Removedproxy-addr@2.0.7(transitive)
- Removedqs@6.13.0(transitive)
- Removedrange-parser@1.2.1(transitive)
- Removedraw-body@2.5.2(transitive)
- Removedsafe-buffer@5.2.1(transitive)
- Removedsafer-buffer@2.1.2(transitive)
- Removedsend@0.19.0(transitive)
- Removedserve-static@1.16.2(transitive)
- Removedset-function-length@1.2.2(transitive)
- Removedsetprototypeof@1.2.0(transitive)
- Removedside-channel@1.0.6(transitive)
- Removedstatuses@2.0.1(transitive)
- Removedtoidentifier@1.0.1(transitive)
- Removedtype-is@1.6.18(transitive)
- Removedundici-types@6.19.8(transitive)
- Removedunpipe@1.0.0(transitive)
- Removedutils-merge@1.0.1(transitive)
- Removedvary@1.1.2(transitive)