Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

centro-js

Package Overview
Dependencies
Maintainers
1
Versions
30
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

centro-js

Pub-sub and work queue service on Primus or TCP. Supports wildcards, streams and back-pressure. Just Node and a filesystem required.

  • 0.0.1
  • Source
  • npm
  • Socket score

Version published
Weekly downloads
13
decreased by-61.76%
Maintainers
1
Weekly downloads
 
Created
Source

= centro{nbsp}{nbsp}{nbsp}image:https://circleci.com/gh/davedoesdev/centro.svg?style=svg[Build Status,link=https://circleci.com/gh/davedoesdev/centro] image:https://coveralls.io/repos/github/davedoesdev/centro/badge.svg[Coverage Status,link=https://coveralls.io/github/davedoesdev/centro] image:https://img.shields.io/npm/v/centro-js.svg[NPM version,link=https://www.npmjs.com/package/centro-js] :prewrap!: :toc: :toclevels: 3 :toc-placement: preamble

Centro is a Node.js module for publishing and subscribing to messages over a network. It includes code for running a server and clients which connect to a server.

Messages are published to topics which support wildcard matching (single- and multi-level).

Each message has its own content stream, with full back-pressure support, and can be delivered to multiple interested clients or to exactly one.

All messages are stored in a directory on the server machine's filesystem so no external services are required. You can run multiple server instances on the same directory, for example one per CPU core.

Clients can connect to the server using https://github.com/primus/primus[Primus] (Websockets), TCP, HTTP or in-memory streams.

API documentation is available http://rawgit.davedoesdev.com/davedoesdev/centro/master/docs/index.html[here].

== Example

An example of running a server and clients using different transports is described below. All the example files are available in the link:test/example[] directory.

[[server]] === Server

Here's a server which listens on all transports.

[source,javascript] .server.js

var centro = require('centro-js');

var config = { allowed_algs: ['PS256'], <1> transports: [{ server: 'tcp', config: { port: 8800 } }, { server: 'primus', config: { port: 8801 } }, { server: 'http', config: { port: 8802 } }, { server: 'in-mem', authorize_config: { ANONYMOUS_MODE: true } <2> }] };

var server = new centro.CentroServer(config);

server.on('ready', function () { console.log('READY.'); });

<1> <<authz-tokens,Authorization tokens>> signed with these algorithms are accepted. <2> In-memory transport doesn't require authorization tokens. <3> Connections may be refused until ready is emitted.

This is just an example -- you only need to list transports to which your applications will connect.

[[authz-tokens]] === Authorization tokens

Transports not configured for ANONYMOUS_MODE expect clients to present an authorization token when they connect.

Centro uses https://github.com/davedoesdev/authorize-jwt[authorize-jwt] to verify tokens. Your tokens must be http://self-issued.info/docs/draft-ietf-oauth-json-web-token.html[JSON Web Tokens]. The token format that Centro expects is described http://rawgit.davedoesdev.com/davedoesdev/centro/master/docs/schema/web/index.html[here].

You should generate a keypair, add the public key to authorize-jwt's keystore and sign your tokens with the private key.

Here's a program which generates a keypair using https://github.com/quartzjer/ursa[ursa], adds the public key to Centro's keystore with the identifier \http://davedoesdev.com, and writes the private key to priv_key.pem:

[source,javascript] .add_key.js

var uri = 'http://davedoesdev.com', <1> authorize_jwt = require('centro-js').authorize_jwt, assert = require('assert'), path = require('path'), fs = require('fs'), ursa = require('ursa'), priv_key = ursa.generatePrivateKey(2048, 65537), <2> pub_key = priv_key.toPublicPem('utf8'); <3>

authorize_jwt( { db_type: 'pouchdb', <4> db_for_update: true, <5> no_changes: true <6> }, function (err, authz) { assert.ifError(err); authz.keystore.add_pub_key(uri, pub_key, function (err) <7> { assert.ifError(err); fs.writeFile(path.join(__dirname, 'priv_key.pem'), <8> priv_key.toPrivatePem(), assert.ifError); authz.keystore.deploy(); <9> }); });

<1> Unique identifier for the keypair. <2> Generate the keypair. <3> Get the public key in PEM form. <4> You can use couchdb but you'll have to set up your own http://couchdb.apache.org/[CouchDB] server. <5> We're going to update the keystore. <6> We're not interested in changes to the keystore -- we're just going to update the public key and exit. <7> Associate the public key with \http://davedoesdev.com. <8> The private key is not stored in the keystore but needs to be available when you want to sign authorization tokens. Here we write it to disk but this is just an example -- you probably want a more secure way of storing it. <9> https://pouchdb.com/[PouchDB]-based keystores update a master database and then replicate changes to reader databases. Here we deploy() the master database to let any active reader databases know we're done updating.

Then you need to make a JWT, using the private key to sign it.

The iss claim in the token should be the unique issuer ID associated with \http://davedoesdev.com in Centro's keystore. You can use the https://github.com/davedoesdev/pub-keystore#pubkeystoreprototypeget_pub_key_by_uriuri-cb[`get_pub_key_by_uri`] method to retrieve the issuer ID. Clients which use tokens with different issuer IDs can't send messages to each other.

The access_control claim in the token should specify to which topics clients that present this token can publish and subscribe. Topics should be in AMQP format: . delimits words, * matches exactly one word and # matches zero or more words. See https://github.com/davedoesdev/mqlobber-access-control[mqlobber-access-control] for more details.

You can use any JWT module to generate your tokens. Here's an example using https://github.com/davedoesdev/node-jsjws[jsjws]:

[source,javascript] .make_token.js

var uri = 'http://davedoesdev.com', authorize_jwt = require('centro-js').authorize_jwt, jsjws = require('jsjws'), assert = require('assert'), path = require('path'), fs = require('fs'), ursa = require('ursa');

fs.readFile(path.join(__dirname, 'priv_key.pem'), function (err, priv_key) <1> { assert.ifError(err);

var expiry = new Date();
expiry.setHours(expiry.getHours() + 24); <2>

authorize_jwt( <3>
{
    db_type: 'pouchdb',
    deploy_name: 'token',
    no_changes: true,
    silent: true
}, function (err, authz)
{
    assert.ifError(err);
    authz.keystore.get_pub_key_by_uri(uri, function (err, pub_key, issuer_id) <4>
    {
        assert.ifError(err);
        assert(pub_key);
        assert(issuer_id);
        console.log(new jsjws.JWT().generateJWTByKey({ alg: 'PS256' },
        {
            iss: issuer_id, <5>
            access_control: { <6>
                subscribe: { allow: ['#'], disallow: [] },
                publish: { allow: ['#'], disallow: [] }
            }
        }, expiry, ursa.createPrivateKey(priv_key))); <7>
    });
});

});

<1> Read the private key. This is just an example -- you should have a more secure way of storing private keys. <2> Set token expiry to 24 hours. <3> Open the keystore for reading. <4> Retrieve the issuer ID for \http://davedoesdev.com. <5> Use the issuer ID in the token. <6> Allow clients using this token to subscribe and publish to any topic. <7> Supply the expiry time and private key for signing.

The token is valid for 24 hours, allows clients which use it to publish and subscribe to any topic and is written to standard output. The client examples below expect it in an environment variable called CENTRO_TOKEN so you might do something like this to set it:

[source,bash]

export CENTRO_TOKEN=$(node make_token.js)

=== Node clients

==== TCP

Subscribe to topics given on the command line and display the topic and content of each message received:

[source,javascript] .subscribe_tcp.js

var centro = require('centro-js'), net = require('net'), assert = require('assert');

function display_message(s, info) { console.log('topic:', info.topic); <1> s.pipe(process.stdout); <2> }

net.createConnection(8800, function () <3> { centro.stream_auth(this, <4> { token: process.env.CENTRO_TOKEN <5> }).on('ready', function () { for (var topic of process.argv.slice(2)) { this.subscribe(topic, display_message, assert.ifError); <6> } }); });

<1> Display the message's topic <2> Pipe the message's content stream to standard output. <3> Open a TCP connection to the server on port 8800. <4> The TCP transport expects the token on the connection stream. <5> Read the token from the environment. <6> Subscribe to the topics given on the command line.

Publish a message, topic given on the command line and content read from standard input:

[source,javascript] .publish_tcp.js

var centro = require('centro-js'), net = require('net'), assert = require('assert');

net.createConnection(8800, function () { var conn = this;

centro.stream_auth(conn,
{
    token: process.env.CENTRO_TOKEN
}).on('ready', function ()
{
    process.stdin.pipe(this.publish(process.argv[2], function (err) <1><2>
    {
        assert.ifError(err);
        conn.end(); <3>
    }));
});

});

<1> Publish the message to the topic given on the command line. <2> Pipe standard input to the message's content stream. <3> Close the TCP connection, which will also cause the process to exit.

Here's a sample run:

[cols="a,a",frame="none",grid="none"] |===

|[source,bash]

$ node subscribe_tcp.js 'foo.*' topic: foo hello

|[source,bash]

$ echo hello | node publish_tcp.js foo.bar

|===

==== Primus

Here are similar clients which use the Primus transport.

[source,javascript] .subscribe_primus.js

var centro = require('centro-js'), assert = require('assert'), Primus = require('primus'), Socket = Primus.createSocket( { pathname: '/centro/v' + centro.version + '/primus' <1> }), PrimusDuplex = require('primus-backpressure').PrimusDuplex; <2>

function display_message(s, info) { console.log('topic:', info.topic); s.pipe(process.stdout); }

centro.separate_auth( <3> { token: process.env.CENTRO_TOKEN }, function (err, userpass, make_client) { assert.ifError(err);

var socket = new Socket('http://' + userpass + '@localhost:8801', <4>
                        { strategy: false }), <5>
    duplex = new PrimusDuplex(socket);

make_client(duplex).on('ready', function () <6>
{
    for (var topic of process.argv.slice(2))
    {
        this.subscribe(topic, display_message, assert.ifError);
    }
});

});

<1> The Primus transport uses a versioned path. <2> The Primus transport uses https://github.com/davedoesdev/primus-backpressure[primus-backpressure]. <3> The Primus transport expects the token to be supplied in the HTTP request authorization, before the connection stream is established. <4> Open a connection to the server. <5> You should disable Primus's auto-reconnect feature because it doesn't work with Centro. Centro's connections are stateful (they have shared state between the client and server). The server deletes its state immediately upon disconnect. If you need auto-reconnect you should implement it in your application. <6> Establish a connection stream to the server.

[source,javascript] .publish_primus.js

var centro = require('centro-js'), assert = require('assert'), Primus = require('primus'), Socket = Primus.createSocket( { pathname: '/centro/v' + centro.version + '/primus' }), PrimusDuplex = require('primus-backpressure').PrimusDuplex;

centro.separate_auth( { token: process.env.CENTRO_TOKEN }, function (err, userpass, make_client) { assert.ifError(err);

var socket = new Socket('http://' + userpass + '@localhost:8801',
                        { strategy: false }),
    duplex = new PrimusDuplex(socket);

make_client(duplex).on('ready', function ()
{
    process.stdin.pipe(this.publish(process.argv[2], function (err)
    {
        assert.ifError(err);
        duplex.end();
    }));
});

});

=== Browser clients

==== Primus

When you run a Centro server with a Primus transport, Primus itself is made available over HTTP at the following path:

==== /centro/v1/primus/primus.js

So on <<server,the example server>>, it's available at the following URL:

==== http://localhost:8801/centro/v1/primus/primus.js

Of course, the version number may change and the machine may be reachable via a different hostname depending on your DNS configuration.

A https://webpack.github.io/[webpack]ed copy of the Centro client code is available in link:dist/centro.js[].

First we define our user interface in HTML. We'll have a section where you can publish messages and a section where you can see messages which have been published:

[[primus-html]] [source,html] .example_primus.html

Centro Example <1> <2> <3> <4> <5> <6>
topic: <7> message: <8>
<9>
<10>
---- <1> Some CSS is required to lay this out nicely. It's available in link:test/example/browser/example.css[]. <2> Load Primus. <3> Load the Centro client code. <4> Load script to make the example work (see below). <5> When the page loads, initialize the script by calling `connect()`. <6> When the user clicks on the publish button, call `publish()`. <7> Input field for message topic. <8> Input field for message content. <9> Publish button. <10> Displays messages received.

Next we need to write the script which connects to the Centro server and subscribes to and published messages:

[[primus-js]] [source,javascript] .example_primus.js

var publish = function () { event.preventDefault(); }; <1>

function connect() { var topic = document.getElementById('topic'), message = document.getElementById('message'), messages = document.getElementById('messages'), params = new URLSearchParams(window.location.search);

function tag_text(cls, text)
{
    var div = document.createElement('div');
    div.className = cls;
    div.appendChild(document.createTextNode(text));
    return div;
}

function add_message(div) <2>
{
    messages.appendChild(div);
    messages.scrollTop = messages.scrollHeight;
}

centro.separate_auth(
{
    token: params.get('token')
}, function (err, userpass, make_client)
{
    if (err) { throw(err); }

    var primus = new Primus('http://' + userpass + '@localhost:8801',
                            { strategy: false }),
        duplex = new centro.PrimusDuplex(primus),
        client = make_client(duplex);

    client.on('ready', function ()
    {
        add_message(tag_text('status', 'open')); <3>
        this.subscribe(params.get('subscribe'), function (s, info)
        {
            centro.read_all(s, function (v)
            {
                var msg = document.createElement('div');
                msg.className = 'message';
                msg.appendChild(tag_text('topic', info.topic));
                msg.appendChild(tag_text('data', v.toString()));
                add_message(msg); <4>
            });
        });

        publish = function ()
        {
            event.preventDefault();
            client.publish(topic.value).end(message.value); <5>
        };
    });

    primus.on('close', function ()
    {
        add_message(tag_text('status', 'closed')); <6>
    });
});

}

<1> While the page loads, clicking the publish button does nothing. <2> Function to display a message. <3> Display a message to say the connection stream to the server is open. <4> When we receive a message, display its topic and content. <5> When the user clicks the publish button, publish a message. <6> Display a message to say the connection stream to the server is closed.

==== HTTP

The Centro HTTP transport supports access using HTTP requests, without using the Centro client:

/centro/v1/publish?authz_token=XXX&topic=YYY:: Publish a message (POST request, message content in request body) /centro/v1/subscribe?authz_token=XXX&topic=YYY:: Subscribe to messages (messages delivered using https://www.w3.org/TR/eventsource/[server-sent events])

The HTML for this example is the same as <<primus-html,Primus HTML>> except that we don't need the Primus client or the Centro client:

[source,html] .example_sse.html

Centro Example
topic: message:
----

The script is also similar to the <<primus-js,Primus script>>. It uses an https://www.w3.org/TR/eventsource/#the-eventsource-interface[`EventSource`] to subscribe to messages and POST requests (via https://www.w3.org/TR/XMLHttpRequest/[`XMLHttpRequest`]) to publish messages:

[source,javascript] .example_sse.js

var publish = function () { event.preventDefault(); };

function connect() { var topic = document.getElementById('topic'), message = document.getElementById('message'), messages = document.getElementById('messages'), params = new URLSearchParams(window.location.search);

function tag_text(cls, text)
{
    var div = document.createElement('div');
    div.className = cls;
    div.appendChild(document.createTextNode(text));
    return div;
}

function add_message(div)
{
    messages.appendChild(div);
    messages.scrollTop = messages.scrollHeight;
}

var base_url = 'http://localhost:8802/centro/v1/',
    source = new EventSource(base_url + <1>
                             'subscribe?authz_token=' + params.get('token') +
                             '&topic=' + encodeURIComponent(params.get('subscribe')));

source.onopen = function ()
{
    publish = function ()
    {
        event.preventDefault();
        var r = new XMLHttpRequest();
        r.open('POST', base_url + <2>
                       'publish?authz_token=' + params.get('token') +
                       '&topic=' + encodeURIComponent(topic.value));
        r.send(message.value); <3>
    };

    add_message(tag_text('status', 'open'));
};

source.onerror = function (e)
{
    if (e.target.readyState === EventSource.CONNECTING)
    {
        add_message(tag_text('status', 'connecting'));
    }
    else if (e.target.readyState === EventSource.CLOSED)
    {
        add_message(tag_text('status', 'closed'));
    }
};

var msgs = new Map();

source.addEventListener('start', function (e)
{
    var info = JSON.parse(e.data); <4>
    info.data = ''; <5>
    msgs.set(info.id, info); <6>
});

source.addEventListener('data', function (e)
{
    var info = JSON.parse(e.data);
    msgs.get(info.id).data += info.data; <7>
});

source.addEventListener('end', function (e)
{
    var info = msgs.get(JSON.parse(e.data).id); <8>

    var msg = document.createElement('div');
    msg.className = 'message';
    msg.appendChild(tag_text('topic', info.topic));
    msg.appendChild(tag_text('data', info.data));
    add_message(msg);

    msgs.delete(info.id);
});

source.addEventListener('peer_error', function ()
{
    add_message(tag_text('status', 'error'));
});

}

<1> Create an EventSource which receives messages from the server. We pass the authorization token and the topic we want messages for as query parameters. <2> POST message to the server using an XMLHttpRequest. We pass the authorization token and message topic as query parameters. <3> Send the message content. <4> Each message begins with a start event, which has JSON-encoded data containing the message's topic and unique ID. <5> Message data can be delivered across multiple events. In this example we need a place to accumulate it. <6> Messages can be interleaved so while we're accumulating data, we need to remember them by their unique IDs. <7> Message data arrives in data events and we accumulate it here. <8> When all a message's data has been received, we get an end event. In this example, we display the message's topic and data.

Further details of how messages are delivered using server-sent events are available http://rawgit.davedoesdev.com/davedoesdev/centro/master/docs/index.html#centro-jslibserver_transportshttp[here].

=== In-memory client

The in-mem transport lets you connect from the server process itself without the overhead of a TCP connection. For example, to display every message published on every transport you could add the following to server.js:

[source,javascript] .server.js

var assert = require('assert');

server.on('ready', function () { this.transport_ops['in-mem'].connect(function (err, stream) { assert.ifError(err);

    centro.stream_auth(stream).subscribe('#', function (s, info)
    {
        console.log('topic:', info.topic);
        s.pipe(process.stdout);
    }, assert.ifError);
});

});

=== Other clients (server-sent events)

You can also use the HTTP transport outside the browser and from languages other than Node. As long as you can make POST requests, you can publish messages. To subscribe to messages, you'll need to be able to receive server-sent events.

==== Python

Here's an example Python 3 program which publishes a message, topic given on the command line and content read from standard input:

[source,python] .publish.py

import requests, os, sys params = { 'authz_token': os.environ['CENTRO_TOKEN'], 'topic': sys.argv[1] } requests.post('http://localhost:8802/centro/v1/publish', <1> params=params, data=sys.stdin.buffer).raise_for_status()

<1> Make POST request to publish message.

Subscribe to topics given on the command line and display the topic and content of each message received:

[source,python] .subscribe.py

import requests, sseclient, os, sys, json params = { 'authz_token': os.environ['CENTRO_TOKEN'], 'topic': sys.argv[1:] } response = requests.get('http://localhost:8802/centro/v1/subscribe', <1> params=params, stream=True) response.raise_for_status() client = sseclient.SSEClient(response) <2> for event in client.events(): if (event.event == 'start'): data = json.loads(event.data) print('id:', data['id'], 'topic:', data['topic']) <3> elif (event.event == 'data'): sys.stdout.write(json.loads(event.data)['data'].encode('latin1')) <4> <5> sys.stdout.flush()

<1> Make a long-running GET request to subscribe to messages. <2> Use the https://github.com/mpetazzoni/sseclient[sseclient-py] module to read messages. <3> Display message ID and topic. <4> Display message content. There may be many data events for each message (they will share the same ID). <5> All Centro message data is a byte array. The HTTP transport encodes it in UTF-8 per the https://www.w3.org/TR/eventsource/#the-eventsource-interface[server-sent events spec]. It's encoded such that the UTF-8 data contains only characters that can also be represented in the latin1 (ISO-8859-1) 8-bit encoding. Therefore, to get the message bytes, encode the UTF-8 data using latin1.

==== Node

You can also use the HTTP transport from Node, if you don't want to use Primus or TCP.

[source,javascript] .publish_http.js

process.stdin.pipe(require('http').request( <1> { method: 'POST', hostname: 'localhost', port: 8802, path: '/centro/v1/publish?' + require('querystring').stringify( { authz_token: process.env.CENTRO_TOKEN, topic: process.argv[2] }) }));

<1> Make POST request to publish message.

[source,javascript] .subscribe_http.js

var EventSource = require('eventsource'), <1> es = new EventSource('http://localhost:8802/centro/v1/subscribe?' + require('querystring').stringify( { authz_token: process.env.CENTRO_TOKEN, topic: process.argv.slice(2) }));

es.addEventListener('start', function (e) { var data = JSON.parse(e.data); console.log('id:', data.id, 'topic:', data.topic); });

es.addEventListener('data', function (e) { process.stdout.write(JSON.parse(e.data).data, 'binary'); });

<1> https://github.com/EventSource/eventsource[`EventSource` for Node].

==== Rust

Here are the same example clients written in Rust. To run them, change directory to link:test/example/rust/publish[] or link:test/example/rust/subscribe[] and type cargo run.

[source,rust] .publish.rs

extern crate reqwest; use std::io::{self, Read}; use std::env; use reqwest::{Url, Client}; #[macro_use] extern crate log; extern crate env_logger;

fn main() { env_logger::init().expect("Failed to init logger"); let url_str = "http://localhost:8802/centro/v1/publish"; let token = env::var("CENTRO_TOKEN").expect("no token"); let topic = env::args().nth(1).expect("no topic"); let url = Url::parse_with_params(url_str, &[ ("authz_token", token), ("topic", topic)]) .expect("Failed to parse url"); let response = Client::new().expect("Couldn't create client") .post(url) .body(reqwest::Body::new(io::stdin())) .send() .expect("Failed to send request"); if !response.status().is_success() { error!("HTTP request failed: {}", response.status()); let mut buffer = String::new(); response.take(10000).read_to_string(&mut buffer).expect("Failed to read response"); error!("{}", buffer); } }

[source,rust] .subscribe.rs

extern crate reqwest; extern crate eventsource; extern crate encoding; #[macro_use] extern crate serde_derive; extern crate serde; extern crate serde_json; use std::io::{self, Write}; use std::env; use reqwest::Url; use eventsource::event::Event; use eventsource::reqwest::Client; use encoding::{Encoding, EncoderTrap}; use encoding::all::ISO_8859_1; #[macro_use] extern crate log; extern crate env_logger;

#[derive(Deserialize)] struct Start { id: u64, topic: String }

#[derive(Deserialize)] #[allow(dead_code)] struct Data { id: u64, data: String }

fn parse<'a, T>(data: &'a str) -> Option where T: serde::Deserialize<'a> { match serde_json::from_str::(data) { Ok(start) => { return Some(start); }, Err(err) => { error!("Failed to parse JSON: {}", err); return None; } } }

fn encode(data: &str) -> Option<Vec> { match ISO_8859_1.encode(data, EncoderTrap::Strict) { Ok(bytes) => { return Some(bytes); }, Err(err) => { error!("Failed to covert data to bytes: {}", err); return None; } } }

fn handle<'a, T>(ev: &'a Event, f: &Fn(T) -> ()) where T: serde::Deserialize<'a> { if let Some(v) = parse::(&ev.data) { f(v); } }

fn main() { env_logger::init().expect("Failed to init logger"); let url_str = "http://localhost:8802/centro/v1/subscribe"; let token = env::var("CENTRO_TOKEN").expect("no token"); let token_params = vec![("authz_token", token)]; let topic_params = env::args().skip(1).map(|topic| ("topic", topic)); let url = Url::parse_with_params(url_str, token_params.into_iter().chain(topic_params)) .expect("Failed to parse url"); let client = Client::new(url).expect("Failed to start EventSource"); for event in client { let ev = event.expect("Failed to read event"); if let Some(ref evtype) = ev.event_type { match evtype.as_str() { "start" => handle::(&ev, &|start| println!("id: {} topic: {}", start.id, start.topic)), "data" => handle::(&ev, &|data| if let Some(bytes) = encode(&data.data) { let _ = io::stdout().write(bytes.as_slice()); let _ = io::stdout().flush(); }), _ => {} } } } }

==== Closure

Here are the same example clients written in Clojure. To run them, change directory to link:test/example/clojure/publish[] or link:test/example/clojure/subscribe[] and type lein run.

[source,clojure] .publish.clj

(ns publish.core (:gen-class) (:require [clj-http.client :as client]))

(defn -main "Publish message to example Centro server" [topic] (client/post "http://localhost:8802/centro/v1/publish" {:query-params {"authz_token" (System/getenv "CENTRO_TOKEN") "topic" topic} :body System/in}))

[source,clojure] .subscribe.clj

(ns subscribe.core (:gen-class) (:require [cheshire.core :as json]) (:import [javax.ws.rs.client ClientBuilder] [org.glassfish.jersey.media.sse SseFeature EventSource EventListener]))

(deftype OnStart [] EventListener (onEvent [_ e] (let [data (json/decode (.readData e) true)] (println "id:" (:id data) "topic:" (:topic data)))))

(deftype OnData [] EventListener (onEvent [_ e] (let [data (json/decode (.readData e) true)] (.write System/out (.getBytes (:data data) "ISO-8859-1")) (flush))))

(defn -main "Subscribe to messages from example Centro server" [& topics] (let [token (System/getenv "CENTRO_TOKEN") builder (.register (ClientBuilder/newBuilder) SseFeature) client (.build builder) target (-> (.target client "http://localhost:8802/centro/v1/subscribe") (.queryParam "authz_token"(into-array Object [token])) (.queryParam "topic" (into-array Object topics))) event-source (.build (EventSource/target target))] (.register event-source (OnStart.) "start" (into-array String [])) (.register event-source (OnData.) "data" (into-array String[])) (.open event-source) (println "READY.") (loop [] (Thread/sleep 1000) (recur))))

==== Elixir

Here are the same example clients written in Elixir. To build them, change directory to link:test/example/elixir/apps/publish[] or link:test/example/elixir/apps/subscribe[] and type mix escript.build.

[source,elixir] .publish.ex

defmodule Publish do def main([topic | _]) do HTTPoison.post!("http://localhost:8802/centro/v1/publish", {:stream, IO.stream(:stdio, 100)}, [], params: %{authz_token: System.get_env("CENTRO_TOKEN"), topic: topic}) end end

[source,elixir] .subscribe.ex

defmodule Subscribe do def main(topics) do {:ok, _} = EventsourceEx.new( "http://localhost:8802/centro/v1/subscribe?" <> URI.encode_query([{"authz_token", System.get_env("CENTRO_TOKEN")} | (for topic <- topics, do: {"topic", topic})]), stream_to: self()) loop() end defmodule Start do defstruct [:id, :topic] end defmodule Data do defstruct [:id, :data] end def loop do receive do %EventsourceEx.Message{event: "start", data: data} -> start = Poison.decode!(data, as: %Start{}) :io.format("id: ~B topic: sn", [start.id, start.topic]) %EventsourceEx.Message{event: "data", data: data} -> data = Poison.decode!(data, as: %Data{}) IO.write(:unicode.characters_to_binary(data.data, :utf8, :latin1)) end loop() end end

== Installation

[source,bash]

npm install centro-js

== Licence

link:LICENCE[MIT]

== Test

[source,bash]

grunt test

(make sure you do grunt keys at least once first)

== Lint

[source,bash]

grunt lint

== Coverage

[source,bash]

grunt coverage

https://istanbul.js.org/[Istanbul] results are available http://rawgit.davedoesdev.com/davedoesdev/centro/master/coverage/lcov-report/index.html[here].

Coveralls page is https://coveralls.io/r/davedoesdev/centro[here].

Keywords

FAQs

Package last updated on 02 Jul 2017

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc