Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@steelbreeze/broker

Package Overview
Dependencies
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@steelbreeze/broker - npm Package Compare versions

Comparing version 1.0.0-beta.13 to 1.0.0-beta.14

13

lib/client.d.ts

@@ -26,3 +26,3 @@ /** Specifies the location of a message broker server. */

/**
* Creates a client to the message broker, providing publish and subscribe operations.
* Returns the client API to the message broker, providing publish and subscribe operations.
* @param config Configuration specifying the server, port and base URL path of the message broker server.

@@ -32,4 +32,15 @@ * @returns Returns a client providing publish and subscribe operations.

export declare function client(config: HTTPConfig): {
/**
* Publishes a message to a message broker for other client to receive based on their subscriptions.
* @param topicName The topic to publish on. This may be one or more URL segments.
* @param data The data to publish on the topic
* @param onError Optional error handler callback
*/
publish: (topicName: string, data: string, onError?: ErrorHandler | undefined) => void;
/**
* Registers a subscription to messages on the given topic. The callback is called each time a message to published on the same topic.
* @param topicName The topic to subscribe to. This may be one or more URL segments.
* @param callback The function to call when data is publised on the topic; passing a message object.
*/
subscribe: (topicName: string, callback: ClientCallback | undefined) => void;
};

82

lib/client.js

@@ -13,3 +13,3 @@ "use strict";

/**
* Creates a client to the message broker, providing publish and subscribe operations.
* Returns the client API to the message broker, providing publish and subscribe operations.
* @param config Configuration specifying the server, port and base URL path of the message broker server.

@@ -19,44 +19,44 @@ * @returns Returns a client providing publish and subscribe operations.

function client(config) {
/**
* Publishes a message to a message broker for other client to receive based on their subscriptions.
* @param topicName The topic to publish on. This may be one or more URL segments.
* @param data The data to publish on the topic
* @param onError Optional error handler callback
*/
function publish(topicName, data, onError) {
if (onError === void 0) { onError = undefined; }
var post = http.request({ hostname: config.host, port: config.port, path: config.path + "/" + topicName, method: 'POST', headers: { 'Content-Type': 'text/plain', 'Content-Length': Buffer.byteLength(data) } });
// trap and propigate error messages as required
post.on('error', function (err) {
if (onError) {
onError(err);
}
});
// send message to the server
post.write(data);
post.end();
}
/**
* Registers a subscription to messages on the given topic. The callback is called each time a message to published on the same topic.
* @param topicName The topic to subscribe to. This may be one or more URL segments.
* @param callback The function to call when data is publised on the topic; passing a message object.
*/
function subscribe(topicName, callback) {
var eventSource = new EventSource("http://" + config.host + ":" + config.port + config.path + "/" + topicName);
var lastEventId = -1;
// process messages from the server
eventSource.onmessage = function (event) {
// perform a duplicate message check
if (event.lastEventId !== lastEventId) {
lastEventId = event.lastEventId;
// propigate message to the client
if (callback) {
callback({ topicName: topicName, id: event.lastEventId, data: event.data });
return {
/**
* Publishes a message to a message broker for other client to receive based on their subscriptions.
* @param topicName The topic to publish on. This may be one or more URL segments.
* @param data The data to publish on the topic
* @param onError Optional error handler callback
*/
publish: function (topicName, data, onError) {
if (onError === void 0) { onError = undefined; }
var post = http.request({ hostname: config.host, port: config.port, path: config.path + "/" + topicName, method: 'POST', headers: { 'Content-Type': 'text/plain', 'Content-Length': Buffer.byteLength(data) } });
// trap and propigate error messages as required
post.on('error', function (err) {
if (onError) {
onError(err);
}
}
};
}
// return the client API to the caller
return { publish: publish, subscribe: subscribe };
});
// send message to the server
post.write(data);
post.end();
},
/**
* Registers a subscription to messages on the given topic. The callback is called each time a message to published on the same topic.
* @param topicName The topic to subscribe to. This may be one or more URL segments.
* @param callback The function to call when data is publised on the topic; passing a message object.
*/
subscribe: function (topicName, callback) {
var eventSource = new EventSource("http://" + config.host + ":" + config.port + config.path + "/" + topicName);
var lastEventId = -1;
// process messages from the server
eventSource.onmessage = function (event) {
// perform a duplicate message check
if (event.lastEventId !== lastEventId) {
lastEventId = event.lastEventId;
// propigate message to the client
if (callback) {
callback({ topicName: topicName, id: event.lastEventId, data: event.data });
}
}
};
}
};
}
exports.client = client;

@@ -9,8 +9,6 @@ "use strict";

// write a single message to a client
function sendEvent(res, topic) {
if (topic.data) {
setImmediate(function (res, lastEventId, data) {
res.write("id:" + lastEventId + "\ndata:" + data + "\n\n");
}, res, topic.lastEventId, topic.data);
}
function sendEvent(client, eventId, data) {
setImmediate(function (client, eventId, data) {
client.write("id:" + eventId + "\ndata:" + data + "\n\n");
}, client, eventId, data);
}

@@ -39,6 +37,7 @@ /**

// update the client with the last message if required
if (cacheLastMessage) {
sendEvent(res, topic);
if (cacheLastMessage && topic.data) {
sendEvent(res, topic.lastEventId, topic.data);
}
}).post('*', function (req, res) {
});
router.post('*', function (req, res) {
var topic = getTopic(topics, req.url);

@@ -49,3 +48,4 @@ var body = [];

body.push(chunk);
}).on('end', function () {
});
req.on('end', function () {
// update the topic with the new event details

@@ -56,3 +56,3 @@ topic.data = Buffer.concat(body).toString();

topic.subscribers.forEach(function (subscriber) {
sendEvent(subscriber, topic);
sendEvent(subscriber, topic.lastEventId, topic.data); // NOTE: as we have just set topic.data we know it's not undefined
});

@@ -59,0 +59,0 @@ });

{
"name": "@steelbreeze/broker",
"version": "1.0.0-beta.13",
"version": "1.0.0-beta.14",
"description": "Lightweight publish and subscribe using Server-Sent Events for node and express",

@@ -5,0 +5,0 @@ "main": "lib/index.js",

@@ -45,3 +45,3 @@ import * as http from 'http';

/**
* Creates a client to the message broker, providing publish and subscribe operations.
* Returns the client API to the message broker, providing publish and subscribe operations.
* @param config Configuration specifying the server, port and base URL path of the message broker server.

@@ -51,41 +51,43 @@ * @returns Returns a client providing publish and subscribe operations.

export function client(config: HTTPConfig) {
/**
* Publishes a message to a message broker for other client to receive based on their subscriptions.
* @param topicName The topic to publish on. This may be one or more URL segments.
* @param data The data to publish on the topic
* @param onError Optional error handler callback
*/
function publish(topicName: string, data: string, onError: ErrorHandler | undefined = undefined): void {
var post = http.request({ hostname: config.host, port: config.port, path: `${config.path}/${topicName}`, method: 'POST', headers: { 'Content-Type': 'text/plain', 'Content-Length': Buffer.byteLength(data) } });
return {
/**
* Publishes a message to a message broker for other client to receive based on their subscriptions.
* @param topicName The topic to publish on. This may be one or more URL segments.
* @param data The data to publish on the topic
* @param onError Optional error handler callback
*/
publish: (topicName: string, data: string, onError: ErrorHandler | undefined = undefined): void => {
var post = http.request({ hostname: config.host, port: config.port, path: `${config.path}/${topicName}`, method: 'POST', headers: { 'Content-Type': 'text/plain', 'Content-Length': Buffer.byteLength(data) } });
// trap and propigate error messages as required
post.on('error', (err: Error) => {
if (onError) {
onError(err);
}
});
// trap and propigate error messages as required
post.on('error', (err: Error) => {
if (onError) {
onError(err);
}
});
// send message to the server
post.write(data);
post.end();
}
// send message to the server
post.write(data);
post.end();
},
/**
* Registers a subscription to messages on the given topic. The callback is called each time a message to published on the same topic.
* @param topicName The topic to subscribe to. This may be one or more URL segments.
* @param callback The function to call when data is publised on the topic; passing a message object.
*/
function subscribe(topicName: string, callback: ClientCallback | undefined ): void {
var eventSource = new EventSource(`http://${config.host}:${config.port}${config.path}/${topicName}`);
var lastEventId = -1;
/**
* Registers a subscription to messages on the given topic. The callback is called each time a message to published on the same topic.
* @param topicName The topic to subscribe to. This may be one or more URL segments.
* @param callback The function to call when data is publised on the topic; passing a message object.
*/
subscribe: (topicName: string, callback: ClientCallback | undefined): void => {
var eventSource = new EventSource(`http://${config.host}:${config.port}${config.path}/${topicName}`);
var lastEventId = -1;
// process messages from the server
eventSource.onmessage = (event: ServerMessage): void => {
// perform a duplicate message check
if (event.lastEventId !== lastEventId) {
lastEventId = event.lastEventId;
// process messages from the server
eventSource.onmessage = (event: ServerMessage): void => {
// perform a duplicate message check
if (event.lastEventId !== lastEventId) {
lastEventId = event.lastEventId;
// propigate message to the client
if (callback) {
callback({ topicName: topicName, id: event.lastEventId, data: event.data });
// propigate message to the client
if (callback) {
callback({ topicName: topicName, id: event.lastEventId, data: event.data });
}
}

@@ -95,5 +97,2 @@ }

}
// return the client API to the caller
return { publish, subscribe };
}

@@ -21,8 +21,6 @@ import { Router, Request, Response } from 'express';

// write a single message to a client
function sendEvent(res: Response, topic: Topic): void {
if (topic.data) {
setImmediate((res: Response, lastEventId: number, data: string) => {
res.write(`id:${lastEventId}\ndata:${data}\n\n`);
}, res, topic.lastEventId, topic.data);
}
function sendEvent(client: Response, eventId: number, data: string): void {
setImmediate((client: Response, eventId: number, data: string) => {
client.write(`id:${eventId}\ndata:${data}\n\n`);
}, client, eventId, data);
}

@@ -56,6 +54,8 @@

// update the client with the last message if required
if (cacheLastMessage) {
sendEvent(res, topic);
if (cacheLastMessage && topic.data) {
sendEvent(res, topic.lastEventId, topic.data);
}
}).post('*', (req: Request, res: Response) => {
});
router.post('*', (req: Request, res: Response) => {
var topic = getTopic(topics, req.url);

@@ -67,3 +67,5 @@ var body: Array<Buffer> = [];

body.push(chunk);
}).on('end', (): void => {
});
req.on('end', (): void => {
// update the topic with the new event details

@@ -75,3 +77,3 @@ topic.data = Buffer.concat(body).toString();

topic.subscribers.forEach((subscriber) => {
sendEvent(subscriber, topic);
sendEvent(subscriber, topic.lastEventId, topic.data!); // NOTE: as we have just set topic.data we know it's not undefined
});

@@ -78,0 +80,0 @@ });

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc