New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@steelbreeze/broker

Package Overview
Dependencies
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@steelbreeze/broker - npm Package Compare versions

Comparing version 1.0.0-beta.1 to 1.0.0-beta.2

8

examples/testClient.js

@@ -1,8 +0,8 @@

const broker = require(`../lib/index`); // replace with @steelbreeze/broker
const broker = require('../lib/index'); // replace with @steelbreeze/broker
// create a client to the message broker
var client = broker.client({host:"localhost", port: 1024, path: "/events"});
var client = broker.client({host:'localhost', port: 1024, path: '/events'});
// subscribe to the devices topic on the /events broker
client.subscribe(`devices`, (message) => {
client.subscribe('devices', (message) => {
console.log(message);

@@ -13,3 +13,3 @@ });

setInterval( () => {
client.publish(`devices`, `Hello at ${new Date()}`);
client.publish('devices', `Hello at ${new Date()}`);
}, 1000);
const express = require('express');
const broker = require(`../lib/index`); // replace with @steelbreeze/broker
const broker = require('../lib/index'); // replace with @steelbreeze/broker

@@ -11,5 +11,5 @@ // create the express application

// bind the message broker to the /events base URL
app.use(`/events`, events);
app.use('/events', events);
// start the express application
app.listen(1024, '0.0.0.0');
app.listen(1024, 'localhost');

@@ -17,3 +17,3 @@ "use strict";

function publish(topicName, data) {
var post = http.request({ hostname: config.host, port: config.port, path: config.path + "/" + topicName, method: "POST", headers: { 'Content-Type': 'application/json' } });
var post = http.request({ hostname: config.host, port: config.port, path: config.path + "/" + topicName, method: 'POST', headers: { 'Content-Type': 'application/json' } });
post.write(data);

@@ -24,3 +24,3 @@ post.end();

var eventSource = new EventSource("http://" + config.host + ":" + config.port + config.path + "/" + topicName);
eventSource.addEventListener("message", function (event) {
eventSource.addEventListener('message', function (event) {
if (event.lastEventId !== lastEventId) {

@@ -27,0 +27,0 @@ lastEventId = event.lastEventId;

@@ -6,3 +6,9 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
/**
* Message broker server, brokering client publications and managing subscriptions.
*/
__export(require("./server"));
/**
* Message broker client, supporting publish and subscribe operations based on simple topics.
*/
__export(require("./client"));
__export(require("./server"));
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
var express_1 = require("express");
/**
* Creates an instance of a message broker server.
* Many message broker servers may be created; each bound to a different base url path.
* @param cacheLastMessage When true, subscribers will receive the last message upon subscrition.
*/
function server(cacheLastMessage) {

@@ -18,6 +23,6 @@ if (cacheLastMessage === void 0) { cacheLastMessage = false; }

// GET method is used to subscribe by EventSource clients
router.get("/:topic", function (req, res) {
router.get('/:topic', function (req, res) {
var topic = getTopic(req.params.topic);
// remove the subscription when the connection closes
req.on("close", function () {
req.on('close', function () {
topic.subscribers.splice(topic.subscribers.indexOf(res), 1);

@@ -28,5 +33,5 @@ });

// set the response headers to specify this is an event stream
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// update the client with the last message if available

@@ -38,8 +43,8 @@ if (cacheLastMessage && topic.lastEventId && topic.data) {

// POST method publishes a message
router.post("/:topic", function (req, res) {
router.post('/:topic', function (req, res) {
var body = [];
req.on("data", function (chunk) {
req.on('data', function (chunk) {
body.push(chunk);
});
req.on("end", function () {
req.on('end', function () {
setImmediate(function (topic, lastEventId, data) {

@@ -53,3 +58,6 @@ topic.data = data;

}, getTopic(req.params.topic), messageId++, Buffer.concat(body).toString());
// TODO: recycle messageId when it gets too large
// recycle the message id when maxed out
if (messageId === Number.MAX_VALUE) {
messageId = 0;
}
});

@@ -56,0 +64,0 @@ // send response to publisher

{
"name": "@steelbreeze/broker",
"version": "1.0.0-beta.1",
"version": "1.0.0-beta.2",
"description": "Lightweight publish and subscribe using Server-Sent Events for node and express",

@@ -5,0 +5,0 @@ "main": "lib/index.js",

@@ -20,9 +20,5 @@ // http post used for publishing

interface Callback {
(data: string): void;
}
interface IClient {
publish(topicName: string, data: string, type: string): void;
subscribe(topicName: string, callback: Callback): void;
subscribe(topicName: string, callback: (data: string) => void): void;
}

@@ -34,3 +30,3 @@

function publish(topicName: string, data: string): void {
var post = http.request({ hostname: config.host, port: config.port, path: `${config.path}/${topicName}`, method: `POST`, headers: { 'Content-Type': 'application/json' } });
var post = http.request({ hostname: config.host, port: config.port, path: `${config.path}/${topicName}`, method: 'POST', headers: { 'Content-Type': 'application/json' } });

@@ -41,6 +37,6 @@ post.write(data);

function subscribe(topicName: string, callback: Callback): void {
function subscribe(topicName: string, callback: (data: string) => void): void {
var eventSource = new EventSource(`http://${config.host}:${config.port}${config.path}/${topicName}`);
eventSource.addEventListener(`message`, function (event: IEvent) {
eventSource.addEventListener('message', function (event: IEvent) {
if (event.lastEventId !== lastEventId) {

@@ -47,0 +43,0 @@ lastEventId = event.lastEventId;

@@ -1,2 +0,9 @@

export * from './client';
/**
* Message broker server, brokering client publications and managing subscriptions.
*/
export * from './server';
/**
* Message broker client, supporting publish and subscribe operations based on simple topics.
*/
export * from './client';
import { Router, Request, Response } from 'express';
// internal interface used to manage the content associated with a topic
interface ITopic {

@@ -9,2 +10,7 @@ subscribers: Response[];

/**
* Creates an instance of a message broker server.
* Many message broker servers may be created; each bound to a different base url path.
* @param cacheLastMessage When true, subscribers will receive the last message upon subscrition.
*/
export function server(cacheLastMessage: boolean = false): Router {

@@ -26,7 +32,7 @@ const router = Router();

// GET method is used to subscribe by EventSource clients
router.get("/:topic", (req: Request, res: Response) => {
router.get('/:topic', (req: Request, res: Response) => {
var topic = getTopic(req.params.topic);
// remove the subscription when the connection closes
req.on("close", () => {
req.on('close', () => {
topic.subscribers.splice(topic.subscribers.indexOf(res), 1);

@@ -39,5 +45,5 @@ });

// set the response headers to specify this is an event stream
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');

@@ -51,10 +57,10 @@ // update the client with the last message if available

// POST method publishes a message
router.post("/:topic", (req: Request, res: Response) => {
router.post('/:topic', (req: Request, res: Response) => {
var body: Array<Buffer> = [];
req.on("data", (chunk: Buffer): void => {
req.on('data', (chunk: Buffer): void => {
body.push(chunk);
});
req.on("end", (): void => {
req.on('end', (): void => {
setImmediate((topic: ITopic, lastEventId, data: string) => {

@@ -69,3 +75,6 @@ topic.data = data;

// TODO: recycle messageId when it gets too large
// recycle the message id when maxed out
if(messageId === Number.MAX_VALUE) {
messageId = 0;
}
});

@@ -72,0 +81,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc