Research
Security News
Quasar RAT Disguised as an npm Package for Detecting Vulnerabilities in Ethereum Smart Contracts
Socket researchers uncover a malicious npm package posing as a tool for detecting vulnerabilities in Etherium smart contracts.
@fanoutio/connect-eventstream
Advanced tools
Connect-compatible middleware that enables the easy creation of EventStream endpoints
Utility library to facilitate the creation of endpoints that implement the
server-sent events (SSE)
protocol to stream events to clients, provided as a connect
-compatible middleware.
Sucn an endpoint can be consumed in web browsers using EventSource.
Since this library is connect
-compatible, it is usable with frameworks such as the following:
Additionally, this library is GRIP-aware for scaling. In fact, when running on serverless environments such as Next.js, you will almost always want to use GRIP to hold the stream open while your application publishes to it in short-lived connections.
Supported GRIP servers include:
Author: Katsuyuki Ohmuro kats@fanout.io
Based on Previous work by Justin Karneges justin@fanout.io, Benjamin Goering bengoering@gmail.com
Construct a ConnectEventStream
object. This object's constructor takes an optional object that
has grip
and gripPrefix
.
grip
is optional and can be any of the following:
null
. This is the default, and GRIP will not be used.parseGripUri
. The common use case would be to pass in process.env.GRIP_URL
.control_uri
, control_iss
, and key
, used to initialize a GRIP publisher.Publisher
object from @fanoutio/grip
. Publishing through connect-eventstream
will then end up
publishing to all channels on that Publisher
object.gripPrefix
is optional and defaults to 'events-'
if not specified. This can be used to
namespace GRIP events.
import { ConnectEventStream } from "@fanoutio/connect-eventstream";
const connectEventStream = new ConnectEventStream({grip:process.env.GRIP_URI});
You need to create this object once as a singleton and then refer to it from all routes, as events sent over the publisher will only be seen by requests listening on the same instance. This also means that if your application has several processes running, published events will only go to HTTP Connections on the process that publishes the message. To scale to more than one web server process, you'll need to use GRIP, and make sure you publish each event from one place.
Add routes, and use connectEventStream
to create handlers. For this you have two options:
connectEventStream
as a function, and pass in a string or array of strings. These strings will be used
as the names of the channel(s) to listen to. Any tokens in the channel names delimited by {
and }
will be replaced
by their corresponding values from route parameters.import { ConnectEventStream } from "@fanoutio/connect-eventstream";
export const CHANNEL_NAME = 'test';
const connectEventStream = new ConnectEventStream({grip:process.env.GRIP_URI});
// localhost:7999/api/events (listens on 'test' because it is literal string passed in)
app.get('/api/events', connectEventStream(CHANNEL_NAME));
// localhost:7999/api/events/test (listens on 'test' because {id} is replaced by route parameter)
app.get('/api/events/:id', connectEventStream('{id}'));
connectEventStream
as a function, and pass in a function that takes a request
object and returns
a string or an array of strings. These strings will be used as the names of the channels to listen to.See Publishing Events section below
Next.js's development server continuously monitors and rebuilds files. Each time this happens, your singleton instance of ConnectEventStream will be recreated and previous instances will become unreachable.
To keep the singleton accessible, use the getConnectEventStreamSingleton
function exported from this package. This function takes an object as
an argument, and this is the same object that you would pass to the
constructor of ConnectEventStream
.
Add API routes to your to Next.js application in the standard way, to handle requests to serve
event streams. From these API routes, call connectEventStream
in the same way as in Express
and then export them as the default export from your route files.
connectEventStream
and pass in a string or array of strings./lib/eventStream.js
import { getConnectEventStreamSingleton } from "@fanoutio/connect-eventstream";
export const CHANNEL_NAME = 'test';
export const connectEventStream = getConnectEventStreamSingleton({grip: process.env.GRIP_URL});
/api/events.js
import { connectEventStream, CHANNEL_NAME } from "../../lib/eventStream";
// localhost:7999/api/events (listens on 'test' because it is literal string passed in)
export default connectEventStream(CHANNEL_NAME);
/api/events/[id].js
import { connectEventStream } from "../../lib/eventStream";
// localhost:7999/api/events/test (listens on 'test' because {id} is replaced by route parameter)
export default connectEventStream('{id}');
connectEventStream
and pass in a function that returns a string or an
array of strings.See Publishing Events section below
To publish, call connectEventStream.publishEvent(channel, { event, data })
.
event
is the string name and data
is a JavaScript object that represents the Server Sent
Event. This is an async
function, so you may await
it if you wish to block until the event
has sent. Notably, if GRIP is being used, this will block until GRIP publish has completed.
await connectEventStream.publishEvent(CHANNEL_NAME, { event: 'message', 'data': { name: 'John' } });
Alternatively, if you will be sending many events to the same channel, you can get a
ChannelPublisher
by calling connectEventStream.getChannelPublisher(channel)
.
Then you can call publishEvent({ event, data })
on the returned object.
const publisher = connectEventStream.getChannelPublisher('test');
await publisher.publishEvent({ event: 'message', 'data': { name: 'Alice' } });
await publisher.publishEvent({ event: 'message', 'data': { name: 'Bob' } });
If you wish to pipe a stream, you can call connectEventStream.createChannelWritable(channel)
and
pass the name of a channel. This will return a stream.Writeable
object whose write()
method can
be used to emit objects to clients listening to the appropriate channels from the event streams
endpoints created above.
const writable = connectEventStream.createChannelWritable('test'); // or publisher.createWritable()
writable.write({ event: 'message', 'data': { baz: [ 'hi', 'ho', 'hello', ] } });
writable.end();
There will be appropriate backpressure on this Writeable
so that
writing goes only as fast as events can be dispatched, which is especially important when
publishing through GRIP.
If you wish to run connect-eventstream
's functionality directly, for example
in a conditional way, you may call connectEventStream.run(req, res, channels)
.
app.get('/', async (req, res, next) => {
// Only do connectEventStream if header 'foo' has value 'bar'
if (req.headers['foo'] === 'bar') {
try {
await connectEventStream.run(req, res, ['test']);
} catch(ex) {
next(ex instanceof Error ? ex : new Error(ex));
}
} else {
next();
}
});
getConnectEventStreamSingleton
takes an optional second parameter.
There may be advanced scenarios where you need more than one instance of
ConnectEventStream
. In such a case you can use this second parameter
to identify each instance.
import { getConnectEventStreamSingleton } from "@fanoutio/connect-eventstream";
const connectEventStream1 = new getConnectEventStreamSingleton({grip:process.env.GRIP_URI_1});
const connectEventStream2 = new getConnectEventStreamSingleton({grip:process.env.GRIP_URI_2});
FAQs
Connect-compatible middleware that enables the easy creation of EventStream endpoints
We found that @fanoutio/connect-eventstream demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket researchers uncover a malicious npm package posing as a tool for detecting vulnerabilities in Etherium smart contracts.
Security News
Research
A supply chain attack on Rspack's npm packages injected cryptomining malware, potentially impacting thousands of developers.
Research
Security News
Socket researchers discovered a malware campaign on npm delivering the Skuld infostealer via typosquatted packages, exposing sensitive data.