Research
Security News
Quasar RAT Disguised as an npm Package for Detecting Vulnerabilities in Ethereum Smart Contracts
Socket researchers uncover a malicious npm package posing as a tool for detecting vulnerabilities in Etherium smart contracts.
Pub-sub and work queue service on Primus or TCP. Supports wildcards, streams and back-pressure. Just Node and a filesystem required.
= centro{nbsp}{nbsp}{nbsp}image:https://circleci.com/gh/davedoesdev/centro.svg?style=svg[Build Status,link=https://circleci.com/gh/davedoesdev/centro] image:https://coveralls.io/repos/github/davedoesdev/centro/badge.svg[Coverage Status,link=https://coveralls.io/github/davedoesdev/centro] image:https://img.shields.io/npm/v/centro-js.svg[NPM version,link=https://www.npmjs.com/package/centro-js] :prewrap!: :toc: :toclevels: 3 :toc-placement: preamble
Centro is a Node.js module for publishing and subscribing to messages over a network. It includes code for running a server and clients which connect to a server.
Messages are published to topics which support wildcard matching (single- and multi-level).
Each message has its own content stream, with full back-pressure support, and can be delivered to multiple interested clients or to exactly one.
All messages are stored in a directory on the server machine's filesystem so no external services are required. You can run multiple server instances on the same directory, for example one per CPU core.
Clients can connect to the server using https://github.com/primus/primus[Primus] (Websockets), TCP, HTTP or in-memory streams.
API documentation is available http://rawgit.davedoesdev.com/davedoesdev/centro/master/docs/index.html[here].
== Example
An example of running a server and clients using different transports is described below. All the example files are available in the link:test/example[] directory.
[[server]] === Server
Here's a server which listens on all transports.
var centro = require('centro-js');
var config = { allowed_algs: ['PS256'], <1> transports: [{ server: 'tcp', config: { port: 8800 } }, { server: 'primus', config: { port: 8801 } }, { server: 'http', config: { port: 8802 } }, { server: 'in-mem', authorize_config: { ANONYMOUS_MODE: true } <2> }] };
var server = new centro.CentroServer(config);
<1> <<authz-tokens,Authorization tokens>> signed with these algorithms are accepted.
<2> In-memory transport doesn't require authorization tokens.
<3> Connections may be refused until ready
is emitted.
This is just an example -- you only need to list transports to which your applications will connect.
[[authz-tokens]] === Authorization tokens
Transports not configured for ANONYMOUS_MODE
expect clients to present an
authorization token when they connect.
Centro uses https://github.com/davedoesdev/authorize-jwt[authorize-jwt] to verify tokens. Your tokens must be http://self-issued.info/docs/draft-ietf-oauth-json-web-token.html[JSON Web Tokens]. The token format that Centro expects is described http://rawgit.davedoesdev.com/davedoesdev/centro/master/docs/schema/web/index.html[here].
You should generate a keypair, add the public key to authorize-jwt
's
keystore and sign your tokens with the private key.
Here's a program which generates a keypair using https://github.com/quartzjer/ursa[ursa], adds the public key to Centro's keystore with the identifier
\http://davedoesdev.com
, and writes the private key to priv_key.pem
:
var uri = 'http://davedoesdev.com', <1> authorize_jwt = require('centro-js').authorize_jwt, assert = require('assert'), path = require('path'), fs = require('fs'), ursa = require('ursa'), priv_key = ursa.generatePrivateKey(2048, 65537), <2> pub_key = priv_key.toPublicPem('utf8'); <3>
<1> Unique identifier for the keypair.
<2> Generate the keypair.
<3> Get the public key in PEM form.
<4> You can use couchdb
but you'll have to set up your own http://couchdb.apache.org/[CouchDB] server.
<5> We're going to update the keystore.
<6> We're not interested in changes to the keystore -- we're just going to update the public key and exit.
<7> Associate the public key with \http://davedoesdev.com
.
<8> The private key is not stored in the keystore but needs to be available when you want to sign authorization tokens. Here we write it to disk but this is just an example -- you probably want a more secure way of storing it.
<9> https://pouchdb.com/[PouchDB]-based keystores update a master database and then replicate changes to reader databases. Here we deploy()
the master database to let any active reader databases know we're done updating.
Then you need to make a JWT, using the private key to sign it.
The iss
claim in the token should be the unique issuer ID associated with
\http://davedoesdev.com
in Centro's keystore. You can use the
https://github.com/davedoesdev/pub-keystore#pubkeystoreprototypeget_pub_key_by_uriuri-cb[`get_pub_key_by_uri`] method to retrieve the issuer ID.
Clients which use tokens with different issuer IDs can't send messages to each
other.
The access_control
claim in the token should specify to which topics clients
that present this token can publish and subscribe. Topics should be in
AMQP format: .
delimits words, *
matches exactly one word and #
matches
zero or more words. See https://github.com/davedoesdev/mqlobber-access-control[mqlobber-access-control] for more details.
You can use any JWT module to generate your tokens. Here's an example using https://github.com/davedoesdev/node-jsjws[jsjws]:
var uri = 'http://davedoesdev.com', authorize_jwt = require('centro-js').authorize_jwt, jsjws = require('jsjws'), assert = require('assert'), path = require('path'), fs = require('fs'), ursa = require('ursa');
fs.readFile(path.join(__dirname, 'priv_key.pem'), function (err, priv_key) <1> { assert.ifError(err);
var expiry = new Date();
expiry.setHours(expiry.getHours() + 24); <2>
authorize_jwt( <3>
{
db_type: 'pouchdb',
deploy_name: 'token',
no_changes: true,
silent: true
}, function (err, authz)
{
assert.ifError(err);
authz.keystore.get_pub_key_by_uri(uri, function (err, pub_key, issuer_id) <4>
{
assert.ifError(err);
assert(pub_key);
assert(issuer_id);
console.log(new jsjws.JWT().generateJWTByKey({ alg: 'PS256' },
{
iss: issuer_id, <5>
access_control: { <6>
subscribe: { allow: ['#'], disallow: [] },
publish: { allow: ['#'], disallow: [] }
}
}, expiry, ursa.createPrivateKey(priv_key))); <7>
});
});
<1> Read the private key. This is just an example -- you should have a more secure way of storing private keys.
<2> Set token expiry to 24 hours.
<3> Open the keystore for reading.
<4> Retrieve the issuer ID for \http://davedoesdev.com
.
<5> Use the issuer ID in the token.
<6> Allow clients using this token to subscribe and publish to any topic.
<7> Supply the expiry time and private key for signing.
The token is valid for 24 hours, allows clients which use it to publish and
subscribe to any topic and is written to standard output. The client examples
below expect it in an environment variable called CENTRO_TOKEN
so you might
do something like this to set it:
=== Node clients
==== TCP
Subscribe to topics given on the command line and display the topic and content of each message received:
var centro = require('centro-js'), net = require('net'), assert = require('assert');
function display_message(s, info) { console.log('topic:', info.topic); <1> s.pipe(process.stdout); <2> }
<1> Display the message's topic <2> Pipe the message's content stream to standard output. <3> Open a TCP connection to the server on port 8800. <4> The TCP transport expects the token on the connection stream. <5> Read the token from the environment. <6> Subscribe to the topics given on the command line.
Publish a message, topic given on the command line and content read from standard input:
var centro = require('centro-js'), net = require('net'), assert = require('assert');
net.createConnection(8800, function () { var conn = this;
centro.stream_auth(conn,
{
token: process.env.CENTRO_TOKEN
}).on('ready', function ()
{
process.stdin.pipe(this.publish(process.argv[2], function (err) <1><2>
{
assert.ifError(err);
conn.end(); <3>
}));
});
<1> Publish the message to the topic given on the command line. <2> Pipe standard input to the message's content stream. <3> Close the TCP connection, which will also cause the process to exit.
Here's a sample run:
[cols="a,a",frame="none",grid="none"] |===
|===
==== Primus
Here are similar clients which use the Primus transport.
var centro = require('centro-js'), assert = require('assert'), Primus = require('primus'), Socket = Primus.createSocket( { pathname: '/centro/v' + centro.version + '/primus' <1> }), PrimusDuplex = require('primus-backpressure').PrimusDuplex; <2>
function display_message(s, info) { console.log('topic:', info.topic); s.pipe(process.stdout); }
centro.separate_auth( <3> { token: process.env.CENTRO_TOKEN }, function (err, userpass, make_client) { assert.ifError(err);
var socket = new Socket('http://' + userpass + '@localhost:8801', <4>
{ strategy: false }), <5>
duplex = new PrimusDuplex(socket);
make_client(duplex).on('ready', function () <6>
{
for (var topic of process.argv.slice(2))
{
this.subscribe(topic, display_message, assert.ifError);
}
});
<1> The Primus transport uses a versioned path. <2> The Primus transport uses https://github.com/davedoesdev/primus-backpressure[primus-backpressure]. <3> The Primus transport expects the token to be supplied in the HTTP request authorization, before the connection stream is established. <4> Open a connection to the server. <5> You should disable Primus's auto-reconnect feature because it doesn't work with Centro. Centro's connections are stateful (they have shared state between the client and server). The server deletes its state immediately upon disconnect. If you need auto-reconnect you should implement it in your application. <6> Establish a connection stream to the server.
var centro = require('centro-js'), assert = require('assert'), Primus = require('primus'), Socket = Primus.createSocket( { pathname: '/centro/v' + centro.version + '/primus' }), PrimusDuplex = require('primus-backpressure').PrimusDuplex;
centro.separate_auth( { token: process.env.CENTRO_TOKEN }, function (err, userpass, make_client) { assert.ifError(err);
var socket = new Socket('http://' + userpass + '@localhost:8801',
{ strategy: false }),
duplex = new PrimusDuplex(socket);
make_client(duplex).on('ready', function ()
{
process.stdin.pipe(this.publish(process.argv[2], function (err)
{
assert.ifError(err);
duplex.end();
}));
});
=== Browser clients
==== Primus
When you run a Centro server with a Primus transport, Primus itself is made available over HTTP at the following path:
So on <<server,the example server>>, it's available at the following URL:
Of course, the version number may change and the machine may be reachable via a different hostname depending on your DNS configuration.
A https://webpack.github.io/[webpack]ed copy of the Centro client code is available in link:dist/centro.js[].
First we define our user interface in HTML. We'll have a section where you can publish messages and a section where you can see messages which have been published:
Next we need to write the script which connects to the Centro server and subscribes to and published messages:
var publish = function () { event.preventDefault(); }; <1>
function connect() { var topic = document.getElementById('topic'), message = document.getElementById('message'), messages = document.getElementById('messages'), params = new URLSearchParams(window.location.search);
function tag_text(cls, text)
{
var div = document.createElement('div');
div.className = cls;
div.appendChild(document.createTextNode(text));
return div;
}
function add_message(div) <2>
{
messages.appendChild(div);
messages.scrollTop = messages.scrollHeight;
}
centro.separate_auth(
{
token: params.get('token')
}, function (err, userpass, make_client)
{
if (err) { throw(err); }
var primus = new Primus('http://' + userpass + '@localhost:8801',
{ strategy: false }),
duplex = new centro.PrimusDuplex(primus),
client = make_client(duplex);
client.on('ready', function ()
{
add_message(tag_text('status', 'open')); <3>
this.subscribe(params.get('subscribe'), function (s, info)
{
centro.read_all(s, function (v)
{
var msg = document.createElement('div');
msg.className = 'message';
msg.appendChild(tag_text('topic', info.topic));
msg.appendChild(tag_text('data', v.toString()));
add_message(msg); <4>
});
});
publish = function ()
{
event.preventDefault();
client.publish(topic.value).end(message.value); <5>
};
});
primus.on('close', function ()
{
add_message(tag_text('status', 'closed')); <6>
});
});
<1> While the page loads, clicking the publish button does nothing. <2> Function to display a message. <3> Display a message to say the connection stream to the server is open. <4> When we receive a message, display its topic and content. <5> When the user clicks the publish button, publish a message. <6> Display a message to say the connection stream to the server is closed.
==== HTTP
The Centro HTTP transport supports access using HTTP requests, without using the Centro client:
/centro/v1/publish?authz_token=XXX&topic=YYY
:: Publish a message (POST request, message content in request body)
/centro/v1/subscribe?authz_token=XXX&topic=YYY
:: Subscribe to messages (messages delivered using https://www.w3.org/TR/eventsource/[server-sent events])
The HTML for this example is the same as <<primus-html,Primus HTML>> except that we don't need the Primus client or the Centro client:
The script is also similar to the <<primus-js,Primus script>>. It uses an https://www.w3.org/TR/eventsource/#the-eventsource-interface[`EventSource`] to subscribe to messages and POST requests (via https://www.w3.org/TR/XMLHttpRequest/[`XMLHttpRequest`]) to publish messages:
var publish = function () { event.preventDefault(); };
function connect() { var topic = document.getElementById('topic'), message = document.getElementById('message'), messages = document.getElementById('messages'), params = new URLSearchParams(window.location.search);
function tag_text(cls, text)
{
var div = document.createElement('div');
div.className = cls;
div.appendChild(document.createTextNode(text));
return div;
}
function add_message(div)
{
messages.appendChild(div);
messages.scrollTop = messages.scrollHeight;
}
var base_url = 'http://localhost:8802/centro/v1/',
source = new EventSource(base_url + <1>
'subscribe?authz_token=' + params.get('token') +
'&topic=' + encodeURIComponent(params.get('subscribe')));
source.onopen = function ()
{
publish = function ()
{
event.preventDefault();
var r = new XMLHttpRequest();
r.open('POST', base_url + <2>
'publish?authz_token=' + params.get('token') +
'&topic=' + encodeURIComponent(topic.value));
r.send(message.value); <3>
};
add_message(tag_text('status', 'open'));
};
source.onerror = function (e)
{
if (e.target.readyState === EventSource.CONNECTING)
{
add_message(tag_text('status', 'connecting'));
}
else if (e.target.readyState === EventSource.CLOSED)
{
add_message(tag_text('status', 'closed'));
}
};
var msgs = new Map();
source.addEventListener('start', function (e)
{
var info = JSON.parse(e.data); <4>
info.data = ''; <5>
msgs.set(info.id, info); <6>
});
source.addEventListener('data', function (e)
{
var info = JSON.parse(e.data);
msgs.get(info.id).data += info.data; <7>
});
source.addEventListener('end', function (e)
{
var info = msgs.get(JSON.parse(e.data).id); <8>
var msg = document.createElement('div');
msg.className = 'message';
msg.appendChild(tag_text('topic', info.topic));
msg.appendChild(tag_text('data', info.data));
add_message(msg);
msgs.delete(info.id);
});
source.addEventListener('peer_error', function ()
{
add_message(tag_text('status', 'error'));
});
<1> Create an EventSource
which receives messages from the server. We pass the
authorization token and the topic we want messages for as query parameters.
<2> POST message to the server using an XMLHttpRequest
. We pass the
authorization token and message topic as query parameters.
<3> Send the message content.
<4> Each message begins with a start
event, which has JSON-encoded data
containing the message's topic and unique ID.
<5> Message data can be delivered across multiple events. In this example we need a place to accumulate it.
<6> Messages can be interleaved so while we're accumulating data, we need to remember them by their unique IDs.
<7> Message data arrives in data
events and we accumulate it here.
<8> When all a message's data has been received, we get an end
event. In this example, we display the message's topic and data.
Further details of how messages are delivered using server-sent events are available http://rawgit.davedoesdev.com/davedoesdev/centro/master/docs/index.html#centro-jslibserver_transportshttp[here].
=== In-memory client
The in-mem
transport lets you connect from the server process itself without
the overhead of a TCP connection. For example, to display every message
published on every transport you could add the following to server.js
:
var assert = require('assert');
server.on('ready', function () { this.transport_ops['in-mem'].connect(function (err, stream) { assert.ifError(err);
centro.stream_auth(stream).subscribe('#', function (s, info)
{
console.log('topic:', info.topic);
s.pipe(process.stdout);
}, assert.ifError);
});
=== Other clients (server-sent events)
You can also use the HTTP transport outside the browser and from languages other than Node. As long as you can make POST requests, you can publish messages. To subscribe to messages, you'll need to be able to receive server-sent events.
==== Python
Here's an example Python 3 program which publishes a message, topic given on the command line and content read from standard input:
<1> Make POST request to publish message.
Subscribe to topics given on the command line and display the topic and content of each message received:
<1> Make a long-running GET request to subscribe to messages.
<2> Use the https://github.com/mpetazzoni/sseclient[sseclient-py] module to
read messages.
<3> Display message ID and topic.
<4> Display message content. There may be many data
events for each message
(they will share the same ID).
<5> All Centro message data is a byte array. The HTTP transport encodes it in
UTF-8 per the https://www.w3.org/TR/eventsource/#the-eventsource-interface[server-sent events spec]. It's encoded such that the UTF-8 data contains only
characters that can also be represented in the latin1 (ISO-8859-1) 8-bit
encoding. Therefore, to get the message bytes, encode the UTF-8 data using
latin1.
==== Node
You can also use the HTTP transport from Node, if you don't want to use Primus or TCP.
<1> Make POST request to publish message.
var EventSource = require('eventsource'), <1> es = new EventSource('http://localhost:8802/centro/v1/subscribe?' + require('querystring').stringify( { authz_token: process.env.CENTRO_TOKEN, topic: process.argv.slice(2) }));
es.addEventListener('start', function (e) { var data = JSON.parse(e.data); console.log('id:', data.id, 'topic:', data.topic); });
<1> https://github.com/EventSource/eventsource[`EventSource` for Node].
==== Rust
Here are the same example clients written in Rust. To run them, change directory
to link:test/example/rust/publish[] or link:test/example/rust/subscribe[] and
type cargo run
.
extern crate reqwest; use std::io::{self, Read}; use std::env; use reqwest::{Url, Client}; #[macro_use] extern crate log; extern crate env_logger;
extern crate reqwest; extern crate eventsource; extern crate encoding; #[macro_use] extern crate serde_derive; extern crate serde; extern crate serde_json; use std::io::{self, Write}; use std::env; use reqwest::Url; use eventsource::event::Event; use eventsource::reqwest::Client; use encoding::{Encoding, EncoderTrap}; use encoding::all::ISO_8859_1; #[macro_use] extern crate log; extern crate env_logger;
#[derive(Deserialize)] struct Start { id: u64, topic: String }
#[derive(Deserialize)] #[allow(dead_code)] struct Data { id: u64, data: String }
fn parse<'a, T>(data: &'a str) -> Option where T: serde::Deserialize<'a> { match serde_json::from_str::(data) { Ok(start) => { return Some(start); }, Err(err) => { error!("Failed to parse JSON: {}", err); return None; } } }
fn encode(data: &str) -> Option<Vec> { match ISO_8859_1.encode(data, EncoderTrap::Strict) { Ok(bytes) => { return Some(bytes); }, Err(err) => { error!("Failed to covert data to bytes: {}", err); return None; } } }
fn handle<'a, T>(ev: &'a Event, f: &Fn(T) -> ()) where T: serde::Deserialize<'a> { if let Some(v) = parse::(&ev.data) { f(v); } }
==== Closure
Here are the same example clients written in Clojure. To run them, change
directory to link:test/example/clojure/publish[] or
link:test/example/clojure/subscribe[] and type lein run
.
(ns publish.core (:gen-class) (:require [clj-http.client :as client]))
(ns subscribe.core (:gen-class) (:require [cheshire.core :as json]) (:import [javax.ws.rs.client ClientBuilder] [org.glassfish.jersey.media.sse SseFeature EventSource EventListener]))
(deftype OnStart [] EventListener (onEvent [_ e] (let [data (json/decode (.readData e) true)] (println "id:" (:id data) "topic:" (:topic data)))))
(deftype OnData [] EventListener (onEvent [_ e] (let [data (json/decode (.readData e) true)] (.write System/out (.getBytes (:data data) "ISO-8859-1")) (flush))))
==== Elixir
Here are the same example clients written in Elixir. To build them, change
directory to link:test/example/elixir/apps/publish[] or
link:test/example/elixir/apps/subscribe[] and type mix escript.build
.
== Installation
== Licence
link:LICENCE[MIT]
== Test
(make sure you do grunt keys
at least once first)
== Lint
== Coverage
https://istanbul.js.org/[Istanbul] results are available http://rawgit.davedoesdev.com/davedoesdev/centro/master/coverage/lcov-report/index.html[here].
Coveralls page is https://coveralls.io/r/davedoesdev/centro[here].
FAQs
Pub-sub and work queue service on Primus or TCP. Supports wildcards, streams and back-pressure. Just Node and a filesystem required.
The npm package centro-js receives a total of 13 weekly downloads. As such, centro-js popularity was classified as not popular.
We found that centro-js demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket researchers uncover a malicious npm package posing as a tool for detecting vulnerabilities in Etherium smart contracts.
Security News
Research
A supply chain attack on Rspack's npm packages injected cryptomining malware, potentially impacting thousands of developers.
Research
Security News
Socket researchers discovered a malware campaign on npm delivering the Skuld infostealer via typosquatted packages, exposing sensitive data.