Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@steelbreeze/broker

Package Overview
Dependencies
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@steelbreeze/broker - npm Package Compare versions

Comparing version 1.0.0-beta.4 to 1.0.0-beta.5

15

lib/client.d.ts

@@ -1,11 +0,14 @@

interface IConfig {
/** Specifies the location of a message broker server. */
interface HTTPConfig {
/** The host that the message brokers express application is running on. */
host: string;
/** The port of the host that the message brokers express application is running on. */
port: number;
/** The base URL of the message broker. */
path: string;
}
interface IClient {
publish(topicName: string, data: string, type: string): void;
subscribe(topicName: string, callback: (data: string) => void): void;
}
export declare function client(config: IConfig): IClient;
export declare function client(config: HTTPConfig): {
publish: (topicName: string, data: string) => void;
subscribe: (topicName: string, callback: (data: string) => void) => void;
};
export {};

22

lib/client.js

@@ -10,16 +10,24 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
// http post used for publishing
var http = __importStar(require("http"));
// EventSource used for subscribing
var EventSource = require('eventsource'); // unable to determine how to import this module in the TypeScript way
var EventSource = require('eventsource'); // TODO: determine how to TypeScript import this module
function client(config) {
var lastEventId = -1;
/**
* Publishes a message to a message broker for other client to receive based on their subscriptions.
* @param topicName The topic to publish on. This may be one or more URL segments.
* @param data The data to publish on the topic
*/
function publish(topicName, data) {
var post = http.request({ hostname: config.host, port: config.port, path: config.path + "/" + topicName, method: 'POST', headers: { 'Content-Type': 'application/json' } });
var post = http.request({ hostname: config.host, port: config.port, path: config.path + "/" + topicName, method: 'POST', headers: { 'Content-Type': 'text/plain', 'Content-Length': Buffer.byteLength(data) } });
post.write(data);
post.end();
}
/**
* Registers a subscription to messages on the given topic. The callback is called each time a message to published on the same topic.
* @param topicName The topic to subscribe to. This may be one or more URL segments.
* @param callback The function to call when data is publised on the topic.
*/
function subscribe(topicName, callback) {
var eventSource = new EventSource("http://" + config.host + ":" + config.port + config.path + "/" + topicName);
eventSource.addEventListener('message', function (event) {
var lastEventId = -1;
eventSource.onmessage = function (event) {
if (event.lastEventId !== lastEventId) {

@@ -31,3 +39,3 @@ lastEventId = event.lastEventId;

}
});
};
}

@@ -34,0 +42,0 @@ return { publish: publish, subscribe: subscribe };

import { Router } from 'express';
/**
* Creates an instance of a message broker server.
* Many message broker servers may be created; each bound to a different base url path.
* Many message broker servers may be created, each bound to a different base url.
* @param cacheLastMessage When true, subscribers will receive the last message upon subscrition.
* @returns Returns an express Router for use within an express application.
*/
export declare function server(cacheLastMessage?: boolean): Router;

@@ -6,4 +6,5 @@ "use strict";

* Creates an instance of a message broker server.
* Many message broker servers may be created; each bound to a different base url path.
* Many message broker servers may be created, each bound to a different base url.
* @param cacheLastMessage When true, subscribers will receive the last message upon subscrition.
* @returns Returns an express Router for use within an express application.
*/

@@ -14,10 +15,9 @@ function server(cacheLastMessage) {

var topics = {};
var messageId = 0;
// find or create a topic given its name
function getTopic(topic) {
return topics[topic] || (topics[topic] = { subscribers: [], data: undefined, lastEventId: undefined });
return topics[topic] || (topics[topic] = { lastEventId: -1, data: undefined, subscribers: [] });
}
// write a single message to a client
function serverSentEvent(res, lastEventId, data) {
res.write("event:message\nid:" + lastEventId + "\ndata:" + data + "\n\n");
res.write("id:" + lastEventId + "\ndata:" + data + "\n\n");
}

@@ -38,3 +38,3 @@ // GET method is used to subscribe by EventSource clients

// update the client with the last message if available
if (cacheLastMessage && topic.lastEventId && topic.data) {
if (cacheLastMessage && topic.data) {
serverSentEvent(res, topic.lastEventId, topic.data);

@@ -50,14 +50,17 @@ }

req.on('end', function () {
setImmediate(function (topic, lastEventId, data) {
topic.data = data;
topic.lastEventId = lastEventId;
for (var _i = 0, _a = topic.subscribers; _i < _a.length; _i++) {
var subscriber = _a[_i];
var topic = getTopic(req.url);
// update the topic with the new event details
topic.data = Buffer.concat(body).toString();
topic.lastEventId++;
// recycle the message id when maxed out
if (topic.lastEventId === Number.MAX_VALUE) {
topic.lastEventId = 0;
}
// queue dispatch of event to all current subscribers
setImmediate(function (subscribers, lastEventId, data) {
for (var _i = 0, subscribers_1 = subscribers; _i < subscribers_1.length; _i++) {
var subscriber = subscribers_1[_i];
serverSentEvent(subscriber, lastEventId, data);
}
}, getTopic(req.url), messageId++, Buffer.concat(body).toString());
// recycle the message id when maxed out
if (messageId === Number.MAX_VALUE) {
messageId = 0;
}
}, topic.subscribers, topic.lastEventId, topic.data);
});

@@ -64,0 +67,0 @@ // send response to publisher

{
"name": "@steelbreeze/broker",
"version": "1.0.0-beta.4",
"version": "1.0.0-beta.5",
"description": "Lightweight publish and subscribe using Server-Sent Events for node and express",

@@ -30,6 +30,8 @@ "main": "lib/index.js",

"dependencies": {
"eventsource": "^1.0.5"
},
"devDependencies": {
"@types/express": "^4.16.0",
"eventsource": "^1.0.5",
"express": "^4.16.3"
}
}
}

@@ -1,30 +0,24 @@

// http post used for publishing
import * as http from 'http';
const EventSource = require('eventsource'); // TODO: determine how to TypeScript import this module
// EventSource used for subscribing
const EventSource = require('eventsource'); // unable to determine how to import this module in the TypeScript way
/** Specifies the location of a message broker server. */
interface HTTPConfig {
/** The host that the message brokers express application is running on. */
host: string;
interface IConfig {
host: string;
/** The port of the host that the message brokers express application is running on. */
port: number;
/** The base URL of the message broker. */
path: string;
}
interface IEvent {
type: string;
data: string;
lastEventId: number;
origin: string;
}
interface IClient {
publish(topicName: string, data: string, type: string): void;
subscribe(topicName: string, callback: (data: string) => void): void;
}
export function client(config: IConfig): IClient {
var lastEventId = -1;
export function client(config: HTTPConfig) {
/**
* Publishes a message to a message broker for other client to receive based on their subscriptions.
* @param topicName The topic to publish on. This may be one or more URL segments.
* @param data The data to publish on the topic
*/
function publish(topicName: string, data: string): void {
var post = http.request({ hostname: config.host, port: config.port, path: `${config.path}/${topicName}`, method: 'POST', headers: { 'Content-Type': 'application/json' } });
var post = http.request({ hostname: config.host, port: config.port, path: `${config.path}/${topicName}`, method: 'POST', headers: { 'Content-Type': 'text/plain', 'Content-Length': Buffer.byteLength(data) } });

@@ -35,6 +29,12 @@ post.write(data);

/**
* Registers a subscription to messages on the given topic. The callback is called each time a message to published on the same topic.
* @param topicName The topic to subscribe to. This may be one or more URL segments.
* @param callback The function to call when data is publised on the topic.
*/
function subscribe(topicName: string, callback: (data: string) => void): void {
var eventSource = new EventSource(`http://${config.host}:${config.port}${config.path}/${topicName}`);
var lastEventId = -1;
eventSource.addEventListener('message', function (event: IEvent) {
eventSource.onmessage = (event: { type: string; data: string; lastEventId: number; origin: string; }): void => {
if (event.lastEventId !== lastEventId) {

@@ -47,3 +47,3 @@ lastEventId = event.lastEventId;

}
});
}
}

@@ -50,0 +50,0 @@

@@ -5,5 +5,5 @@ import { Router, Request, Response } from 'express';

interface ITopic {
lastEventId: number;
data: string | undefined;
subscribers: Response[];
data: string | undefined;
lastEventId: number | undefined;
}

@@ -13,4 +13,5 @@

* Creates an instance of a message broker server.
* Many message broker servers may be created; each bound to a different base url path.
* Many message broker servers may be created, each bound to a different base url.
* @param cacheLastMessage When true, subscribers will receive the last message upon subscrition.
* @returns Returns an express Router for use within an express application.
*/

@@ -20,7 +21,6 @@ export function server(cacheLastMessage: boolean = false): Router {

const topics: { [id: string]: ITopic } = {};
var messageId = 0;
// find or create a topic given its name
function getTopic(topic: string): ITopic {
return topics[topic] || (topics[topic] = { subscribers: [], data: undefined, lastEventId: undefined });
return topics[topic] || (topics[topic] = { lastEventId: -1, data: undefined, subscribers: [] });
}

@@ -30,3 +30,3 @@

function serverSentEvent(res: Response, lastEventId: number, data: string): void {
res.write(`event:message\nid:${lastEventId}\ndata:${data}\n\n`);
res.write(`id:${lastEventId}\ndata:${data}\n\n`);
}

@@ -52,3 +52,3 @@

// update the client with the last message if available
if (cacheLastMessage && topic.lastEventId && topic.data) {
if (cacheLastMessage && topic.data) {
serverSentEvent(res, topic.lastEventId, topic.data);

@@ -67,15 +67,19 @@ }

req.on('end', (): void => {
setImmediate((topic: ITopic, lastEventId, data: string) => {
topic.data = data;
topic.lastEventId = lastEventId;
var topic = getTopic(req.url);
for (var subscriber of topic.subscribers) {
serverSentEvent(subscriber, lastEventId, data);
}
}, getTopic(req.url), messageId++, Buffer.concat(body).toString());
// update the topic with the new event details
topic.data = Buffer.concat(body).toString();
topic.lastEventId++;
// recycle the message id when maxed out
if(messageId === Number.MAX_VALUE) {
messageId = 0;
if (topic.lastEventId === Number.MAX_VALUE) {
topic.lastEventId = 0;
}
// queue dispatch of event to all current subscribers
setImmediate((subscribers: Response[], lastEventId, data: string) => {
for (var subscriber of subscribers) {
serverSentEvent(subscriber, lastEventId, data);
}
}, topic.subscribers, topic.lastEventId, topic.data);
});

@@ -82,0 +86,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc