@steelbreeze/broker
Advanced tools
Comparing version 1.0.0-beta.13 to 1.0.0-beta.14
@@ -26,3 +26,3 @@ /** Specifies the location of a message broker server. */ | ||
/** | ||
* Creates a client to the message broker, providing publish and subscribe operations. | ||
* Returns the client API to the message broker, providing publish and subscribe operations. | ||
* @param config Configuration specifying the server, port and base URL path of the message broker server. | ||
@@ -32,4 +32,15 @@ * @returns Returns a client providing publish and subscribe operations. | ||
export declare function client(config: HTTPConfig): { | ||
/** | ||
* Publishes a message to a message broker for other client to receive based on their subscriptions. | ||
* @param topicName The topic to publish on. This may be one or more URL segments. | ||
* @param data The data to publish on the topic | ||
* @param onError Optional error handler callback | ||
*/ | ||
publish: (topicName: string, data: string, onError?: ErrorHandler | undefined) => void; | ||
/** | ||
* Registers a subscription to messages on the given topic. The callback is called each time a message to published on the same topic. | ||
* @param topicName The topic to subscribe to. This may be one or more URL segments. | ||
* @param callback The function to call when data is publised on the topic; passing a message object. | ||
*/ | ||
subscribe: (topicName: string, callback: ClientCallback | undefined) => void; | ||
}; |
@@ -13,3 +13,3 @@ "use strict"; | ||
/** | ||
* Creates a client to the message broker, providing publish and subscribe operations. | ||
* Returns the client API to the message broker, providing publish and subscribe operations. | ||
* @param config Configuration specifying the server, port and base URL path of the message broker server. | ||
@@ -19,44 +19,44 @@ * @returns Returns a client providing publish and subscribe operations. | ||
function client(config) { | ||
/** | ||
* Publishes a message to a message broker for other client to receive based on their subscriptions. | ||
* @param topicName The topic to publish on. This may be one or more URL segments. | ||
* @param data The data to publish on the topic | ||
* @param onError Optional error handler callback | ||
*/ | ||
function publish(topicName, data, onError) { | ||
if (onError === void 0) { onError = undefined; } | ||
var post = http.request({ hostname: config.host, port: config.port, path: config.path + "/" + topicName, method: 'POST', headers: { 'Content-Type': 'text/plain', 'Content-Length': Buffer.byteLength(data) } }); | ||
// trap and propigate error messages as required | ||
post.on('error', function (err) { | ||
if (onError) { | ||
onError(err); | ||
} | ||
}); | ||
// send message to the server | ||
post.write(data); | ||
post.end(); | ||
} | ||
/** | ||
* Registers a subscription to messages on the given topic. The callback is called each time a message to published on the same topic. | ||
* @param topicName The topic to subscribe to. This may be one or more URL segments. | ||
* @param callback The function to call when data is publised on the topic; passing a message object. | ||
*/ | ||
function subscribe(topicName, callback) { | ||
var eventSource = new EventSource("http://" + config.host + ":" + config.port + config.path + "/" + topicName); | ||
var lastEventId = -1; | ||
// process messages from the server | ||
eventSource.onmessage = function (event) { | ||
// perform a duplicate message check | ||
if (event.lastEventId !== lastEventId) { | ||
lastEventId = event.lastEventId; | ||
// propigate message to the client | ||
if (callback) { | ||
callback({ topicName: topicName, id: event.lastEventId, data: event.data }); | ||
return { | ||
/** | ||
* Publishes a message to a message broker for other client to receive based on their subscriptions. | ||
* @param topicName The topic to publish on. This may be one or more URL segments. | ||
* @param data The data to publish on the topic | ||
* @param onError Optional error handler callback | ||
*/ | ||
publish: function (topicName, data, onError) { | ||
if (onError === void 0) { onError = undefined; } | ||
var post = http.request({ hostname: config.host, port: config.port, path: config.path + "/" + topicName, method: 'POST', headers: { 'Content-Type': 'text/plain', 'Content-Length': Buffer.byteLength(data) } }); | ||
// trap and propigate error messages as required | ||
post.on('error', function (err) { | ||
if (onError) { | ||
onError(err); | ||
} | ||
} | ||
}; | ||
} | ||
// return the client API to the caller | ||
return { publish: publish, subscribe: subscribe }; | ||
}); | ||
// send message to the server | ||
post.write(data); | ||
post.end(); | ||
}, | ||
/** | ||
* Registers a subscription to messages on the given topic. The callback is called each time a message to published on the same topic. | ||
* @param topicName The topic to subscribe to. This may be one or more URL segments. | ||
* @param callback The function to call when data is publised on the topic; passing a message object. | ||
*/ | ||
subscribe: function (topicName, callback) { | ||
var eventSource = new EventSource("http://" + config.host + ":" + config.port + config.path + "/" + topicName); | ||
var lastEventId = -1; | ||
// process messages from the server | ||
eventSource.onmessage = function (event) { | ||
// perform a duplicate message check | ||
if (event.lastEventId !== lastEventId) { | ||
lastEventId = event.lastEventId; | ||
// propigate message to the client | ||
if (callback) { | ||
callback({ topicName: topicName, id: event.lastEventId, data: event.data }); | ||
} | ||
} | ||
}; | ||
} | ||
}; | ||
} | ||
exports.client = client; |
@@ -9,8 +9,6 @@ "use strict"; | ||
// write a single message to a client | ||
function sendEvent(res, topic) { | ||
if (topic.data) { | ||
setImmediate(function (res, lastEventId, data) { | ||
res.write("id:" + lastEventId + "\ndata:" + data + "\n\n"); | ||
}, res, topic.lastEventId, topic.data); | ||
} | ||
function sendEvent(client, eventId, data) { | ||
setImmediate(function (client, eventId, data) { | ||
client.write("id:" + eventId + "\ndata:" + data + "\n\n"); | ||
}, client, eventId, data); | ||
} | ||
@@ -39,6 +37,7 @@ /** | ||
// update the client with the last message if required | ||
if (cacheLastMessage) { | ||
sendEvent(res, topic); | ||
if (cacheLastMessage && topic.data) { | ||
sendEvent(res, topic.lastEventId, topic.data); | ||
} | ||
}).post('*', function (req, res) { | ||
}); | ||
router.post('*', function (req, res) { | ||
var topic = getTopic(topics, req.url); | ||
@@ -49,3 +48,4 @@ var body = []; | ||
body.push(chunk); | ||
}).on('end', function () { | ||
}); | ||
req.on('end', function () { | ||
// update the topic with the new event details | ||
@@ -56,3 +56,3 @@ topic.data = Buffer.concat(body).toString(); | ||
topic.subscribers.forEach(function (subscriber) { | ||
sendEvent(subscriber, topic); | ||
sendEvent(subscriber, topic.lastEventId, topic.data); // NOTE: as we have just set topic.data we know it's not undefined | ||
}); | ||
@@ -59,0 +59,0 @@ }); |
{ | ||
"name": "@steelbreeze/broker", | ||
"version": "1.0.0-beta.13", | ||
"version": "1.0.0-beta.14", | ||
"description": "Lightweight publish and subscribe using Server-Sent Events for node and express", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
@@ -45,3 +45,3 @@ import * as http from 'http'; | ||
/** | ||
* Creates a client to the message broker, providing publish and subscribe operations. | ||
* Returns the client API to the message broker, providing publish and subscribe operations. | ||
* @param config Configuration specifying the server, port and base URL path of the message broker server. | ||
@@ -51,41 +51,43 @@ * @returns Returns a client providing publish and subscribe operations. | ||
export function client(config: HTTPConfig) { | ||
/** | ||
* Publishes a message to a message broker for other client to receive based on their subscriptions. | ||
* @param topicName The topic to publish on. This may be one or more URL segments. | ||
* @param data The data to publish on the topic | ||
* @param onError Optional error handler callback | ||
*/ | ||
function publish(topicName: string, data: string, onError: ErrorHandler | undefined = undefined): void { | ||
var post = http.request({ hostname: config.host, port: config.port, path: `${config.path}/${topicName}`, method: 'POST', headers: { 'Content-Type': 'text/plain', 'Content-Length': Buffer.byteLength(data) } }); | ||
return { | ||
/** | ||
* Publishes a message to a message broker for other client to receive based on their subscriptions. | ||
* @param topicName The topic to publish on. This may be one or more URL segments. | ||
* @param data The data to publish on the topic | ||
* @param onError Optional error handler callback | ||
*/ | ||
publish: (topicName: string, data: string, onError: ErrorHandler | undefined = undefined): void => { | ||
var post = http.request({ hostname: config.host, port: config.port, path: `${config.path}/${topicName}`, method: 'POST', headers: { 'Content-Type': 'text/plain', 'Content-Length': Buffer.byteLength(data) } }); | ||
// trap and propigate error messages as required | ||
post.on('error', (err: Error) => { | ||
if (onError) { | ||
onError(err); | ||
} | ||
}); | ||
// trap and propigate error messages as required | ||
post.on('error', (err: Error) => { | ||
if (onError) { | ||
onError(err); | ||
} | ||
}); | ||
// send message to the server | ||
post.write(data); | ||
post.end(); | ||
} | ||
// send message to the server | ||
post.write(data); | ||
post.end(); | ||
}, | ||
/** | ||
* Registers a subscription to messages on the given topic. The callback is called each time a message to published on the same topic. | ||
* @param topicName The topic to subscribe to. This may be one or more URL segments. | ||
* @param callback The function to call when data is publised on the topic; passing a message object. | ||
*/ | ||
function subscribe(topicName: string, callback: ClientCallback | undefined ): void { | ||
var eventSource = new EventSource(`http://${config.host}:${config.port}${config.path}/${topicName}`); | ||
var lastEventId = -1; | ||
/** | ||
* Registers a subscription to messages on the given topic. The callback is called each time a message to published on the same topic. | ||
* @param topicName The topic to subscribe to. This may be one or more URL segments. | ||
* @param callback The function to call when data is publised on the topic; passing a message object. | ||
*/ | ||
subscribe: (topicName: string, callback: ClientCallback | undefined): void => { | ||
var eventSource = new EventSource(`http://${config.host}:${config.port}${config.path}/${topicName}`); | ||
var lastEventId = -1; | ||
// process messages from the server | ||
eventSource.onmessage = (event: ServerMessage): void => { | ||
// perform a duplicate message check | ||
if (event.lastEventId !== lastEventId) { | ||
lastEventId = event.lastEventId; | ||
// process messages from the server | ||
eventSource.onmessage = (event: ServerMessage): void => { | ||
// perform a duplicate message check | ||
if (event.lastEventId !== lastEventId) { | ||
lastEventId = event.lastEventId; | ||
// propigate message to the client | ||
if (callback) { | ||
callback({ topicName: topicName, id: event.lastEventId, data: event.data }); | ||
// propigate message to the client | ||
if (callback) { | ||
callback({ topicName: topicName, id: event.lastEventId, data: event.data }); | ||
} | ||
} | ||
@@ -95,5 +97,2 @@ } | ||
} | ||
// return the client API to the caller | ||
return { publish, subscribe }; | ||
} |
@@ -21,8 +21,6 @@ import { Router, Request, Response } from 'express'; | ||
// write a single message to a client | ||
function sendEvent(res: Response, topic: Topic): void { | ||
if (topic.data) { | ||
setImmediate((res: Response, lastEventId: number, data: string) => { | ||
res.write(`id:${lastEventId}\ndata:${data}\n\n`); | ||
}, res, topic.lastEventId, topic.data); | ||
} | ||
function sendEvent(client: Response, eventId: number, data: string): void { | ||
setImmediate((client: Response, eventId: number, data: string) => { | ||
client.write(`id:${eventId}\ndata:${data}\n\n`); | ||
}, client, eventId, data); | ||
} | ||
@@ -56,6 +54,8 @@ | ||
// update the client with the last message if required | ||
if (cacheLastMessage) { | ||
sendEvent(res, topic); | ||
if (cacheLastMessage && topic.data) { | ||
sendEvent(res, topic.lastEventId, topic.data); | ||
} | ||
}).post('*', (req: Request, res: Response) => { | ||
}); | ||
router.post('*', (req: Request, res: Response) => { | ||
var topic = getTopic(topics, req.url); | ||
@@ -67,3 +67,5 @@ var body: Array<Buffer> = []; | ||
body.push(chunk); | ||
}).on('end', (): void => { | ||
}); | ||
req.on('end', (): void => { | ||
// update the topic with the new event details | ||
@@ -75,3 +77,3 @@ topic.data = Buffer.concat(body).toString(); | ||
topic.subscribers.forEach((subscriber) => { | ||
sendEvent(subscriber, topic); | ||
sendEvent(subscriber, topic.lastEventId, topic.data!); // NOTE: as we have just set topic.data we know it's not undefined | ||
}); | ||
@@ -78,0 +80,0 @@ }); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
26011
423