Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

primus

Package Overview
Dependencies
Maintainers
1
Versions
109
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

primus - npm Package Compare versions

Comparing version 0.0.0 to 0.1.0

.npmignore

144

index.js
'use strict';
var Spark = require('./spark');
var Transformer = require('./transformer')
, Spark = require('./spark');
/**
* Primus is a unversal wrapper for real-time frameworks that provides a common
* Primus is a universal wrapper for real-time frameworks that provides a common
* interface for server and client interaction.

@@ -15,18 +16,22 @@ *

function Primus(server, options) {
if (!(this instanceof Primus)) return new Primus(server, options);
options = options || {};
this.transporter = null;
this.encoder = null;
this.decoder = null;
this.transformer = null; // Reference to the real-time engine instance.
this.encoder = null; // Shorthand to the parser's encoder.
this.decoder = null; // Shorthand to the parser's decoder.
this.sparks = 0; // Increment id for connection ids
this.connected = 0; // Connection counter;
this.connections = Object.create(null); // Connection storage.
this.server = server;
this.pathname = options.pathname || '/primus';
this.parsers(options.parser);
this.pathname = options.pathname || '/primus';
this.Spark = Spark.bind(Spark, this);
this.connections = Object.create(null);
this.initialiase(options.transport);
this.initialise(options.transformer || options.transport);
}
Primus.prototype.__proto__ = require('events').EventEmitter;
Primus.prototype.__proto__ = require('events').EventEmitter.prototype;

@@ -38,3 +43,3 @@ //

get: function read() {
return require('fs').readFileSync('./primus.js', 'utf-8');
return require('fs').readFileSync(__dirname + '/primus.js', 'utf-8');
}

@@ -44,22 +49,44 @@ });

//
// Lazy compile the primus.js JavaScript client for Node.js
//
Object.defineProperty(Primus.prototype, 'Socket', {
get: function () {
return require('load').compiler(this.library(true), 'primus.js', {
__filename: 'primus.js',
__dirname: __dirname
});
}
});
//
// Expose the current version number.
//
Primus.prototype.version = require('./package.json');
Primus.prototype.version = require('./package.json').version;
/**
* Initialise the real-time transport that was choosen.
* Initialise the real-time transport that was chosen.
*
* @param {String} transformer The name of the transformer
* @param {Mixed} Transformer The name of the transformer or a constructor;
* @api private
*/
Primus.prototype.initialise = function initialise(transformer) {
var Transformer = require('.//'+ (transformer || 'ws').toLowerCase());
Primus.prototype.initialise = function initialise(Transformer) {
Transformer = Transformer || 'websockets';
if ('string' === typeof Transformer) {
Transformer = require('./transformers/'+ Transformer.toLowerCase());
}
if ('function' !== typeof Transformer) {
throw new Error('The given transformer is not a constructor');
}
this.transformer = new Transformer(this);
this.on('connection', function connection(stream) {
this.connected++;
this.connections[stream.id] = stream;
});
this.on('disconnected', function disconnected(stream) {
this.on('disconnection', function disconnected(stream) {
this.connected--;
delete this.connections[stream.id];

@@ -72,3 +99,3 @@ });

*
* @param {Function} fn
* @param {Function} fn The function that is called every iteration.
* @api public

@@ -78,3 +105,3 @@ */

for (var stream in this.connections) {
fn(this.connections[stream], stream);
fn(this.connections[stream], stream, this.connections);
}

@@ -88,42 +115,89 @@

*
* @param {String} type Parse name.
* @param {Mixed} parser Parse name or parser Object.
* @api private
*/
Primus.prototype.parsers = function parsers(type) {
var parser = require('./parsers/'+ (type || 'json').toLowerCase());
Primus.prototype.parsers = function parsers(parser) {
parser = parser || 'json';
if ('string' === typeof parser) {
parser = require('./parsers/'+ parser.toLowerCase());
}
if ('object' !== typeof parser) {
throw new Error('The given parser is not an Object');
}
this.encoder = parser.encoder;
this.decoder = parser.decoder;
this.parser = parser;
return this;
};
/**
* Generate a front-end library.
* Generate a client library.
*
* @returns {String} The client side library.
* @param {Boolean} noframework Don't include the library.
* @returns {String} The client library.
* @api public
*/
Primus.prototype.library = function compile() {
Primus.prototype.library = function compile(noframework) {
var encoder = this.encoder.client || this.encoder
, decoder = this.decoder.client || this.decoder
, library = this.transporter.library || ''
, transport = this.transporter.client
, client = this.client;
, library = this.transformer.library || ''
, transport = this.transformer.client
, parser = this.parser.library || '';
//
// Add a simple export wrapper so it can be used as Node.js, amd or browser
// client.
//
var client = [
'(function (name, context, definition) {',
' if (typeof module !== "undefined" && module.exports) {',
' module.exports = definition();',
' } else if (typeof define == "function" && define.amd) {',
' define(definition);',
' } else {',
' context[name] = definition();',
' }',
'})("Primus", this, function PRIMUS() {',
this.client
].join('\n');
//
// Replace some basic content.
//
client = client
.replace('= null; // @import {primus::version}', '"'+ this.version +'"')
.replace('= null; // @import {primus::transport}', transport.toString())
.replace('= null; // @import {primus::encoder}', encoder.toString())
.replace('= null; // @import {primus::decoder}', decoder.toString())
.replace('= null; // @import {primus::pathname}', this.pathname)
.replace('/* {primus::library} */', library);
.replace('null; // @import {primus::pathname}', '"'+ this.pathname.toString() +'"')
.replace('null; // @import {primus::version}', '"'+ this.version +'"')
.replace('null; // @import {primus::transport}', transport.toString())
.replace('null; // @import {primus::encoder}', encoder.toString())
.replace('null; // @import {primus::decoder}', decoder.toString());
return client;
//
// Add the parser inside the closure, to prevent global leaking.
//
if (parser && parser.length) client += parser;
//
// Close the export wrapper and return the client. If we need to add
// a library, we should add them after we've created our closure and module
// exports. Some libraries seem to fail hard once they are wrapped in our
// closure so I'll rather expose a global variable instead of having to monkey
// patch to much code.
//
return client + ' return Primus; });' + (noframework ? '' : library);
};
//
// Expose the constructors of our Spark and Transformer so it can be extended by
// a third party if needed.
//
Primus.Transformer = Transformer;
Primus.Spark = Spark;
//
// Expose the module.
//
module.exports = Primus;
{
"name": "primus",
"version": "0.0.0",
"version": "0.1.0",
"description": "Primus is a simple abstraction around real-time frameworks. It allows you to easily switch between different frameworks without any code changes.",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
"test": "NODE_ENV=testing ./node_modules/.bin/mocha $(find test -name '*.test.js')",
"browser": "./node_modules/.bin/testling ."
},
"repository": "",
"repository": {
"type": "git",
"url": "git://github.com/3rd-Eden/primus.git"
},
"keywords": [

@@ -15,2 +19,3 @@ "primus",

"websocket",
"ws",
"engine.io",

@@ -23,4 +28,35 @@ "socket.io",

"dependencies": {
"extendable": "0.0.3"
"extendable": "0.0.x",
"load": "0.1.x"
},
"testling": {
"harness": "mocha-bdd",
"files": "test/*.browser.js",
"browsers": [
"ie/6..latest",
"chrome/22..latest",
"firefox/16..latest",
"safari/latest",
"opera/11.0..latest",
"iphone/6",
"ipad/6",
"android-browser/latest"
]
},
"devDependencies": {
"browserchannel": "1.0.x",
"chai": "1.7.x",
"engine.io": "git://github.com/3rd-Eden/engine.io.git#issue/178",
"engine.io-client": "0.6.x",
"jsonh": "0.0.x",
"mocha": "1.11.x",
"pre-commit": "0.0.x",
"socket.io": "0.9.x",
"socket.io-client": "0.9.x",
"sockjs": "0.3.x",
"testling": "*",
"ws": "0.4.x",
"sockjs-client-node": "0.0.0",
"request": "~2.21.0"
}
}

@@ -1,219 +0,373 @@

(function primus() {
'use strict';
'use strict';
/* {primus::library} */
/**
* Primus in a real-time library agnostic framework for establishing real-time
* connections with servers.
*
* @constructor
* @param {String} url The url of your server.
* @param {Object} options The configuration.
* @api private
*/
function Primus(url, options) {
if (!(this instanceof Primus)) return new Primus(url);
options = options || {};
/**
* Primus in a real-time library agnostic framework for establishing real-time
* connections with servers.
*
* @param {String} url The url of your server.
*/
function Primus(url) {
if (!(this instanceof Primus)) return new Primus(url);
this.buffer = []; // Stores premature send data.
this._events = {}; // Stores the events.
this.writable = true; // Silly stream compatibility.
this.readable = true; // Silly stream compatibility.
this.url = this.parse(url); // Parse the url to a readable format.
this.backoff = options.reconnect || {}; // Stores the backoff configuration.
this.readyState = Primus.CLOSED; // The readyState of the connection.
this.events = {}; // Stores the events.
this.backoff = {}; // Stores the backoff configuration.
this.url = this.parse(url);
if (Stream) Stream.call(this); // Initialize a stream interface.
this.initialise().connect();
}
this.initialise().open();
}
/**
* Initialise the Primus and setup all parsers and internal listeners.
*
* @api private
*/
Primus.prototype.initialise = function initalise() {
var primus = this;
Primus.OPENING = 0; // We're opening the connection.
Primus.CLOSED = 1; // No active connection.
Primus.OPEN = 2; // The connection is open.
this.on('primus::data', function message(data) {
primus.decoder(data, function decoding(err, packet) {
//
// Do a "save" emit('error') when we fail to parse a message. We don't
// want to throw here as listening to errors should be optional.
//
if (err) return primus.listeners('error').length && socket.emit('error', err);
primus.emit('data', packet);
});
});
//
// It's possible that we're running in Node.js or in a Node.js compatible
// environment such as browserify. In these cases we want to use some build in
// libraries to minimize our dependence on the DOM.
//
var Stream, parse;
this.on('primus::end', function end() {
this.reconnect(function (fail, backoff) {
primus.backoff = backoff; // Save the opts again of this backoff.
if (fail) return self.emit('end', fail);
try {
parse = require('url').parse;
Stream = require('stream');
// Try to re-open the connection again.
primus.emit('primus::reconnect');
}, primus.backoff);
});
Primus.prototype = new Stream();
} catch (e) {
parse = function parse(url) {
var a = document.createElement('a');
a.href = url;
return this;
return a;
};
}
/**
* Establish a connection with the server.
*
* @api private
*/
Primus.prototype.connect = function connect() {
this.emit('primus::connect', this.uri());
};
/**
* Initialise the Primus and setup all parsers and internal listeners.
*
* @api private
*/
Primus.prototype.initialise = function initalise() {
var primus = this;
/**
* Close the connection.
*
* @api public
*/
Primus.prototype.end = function end() {
this.on('outgoing::open', function opening() {
primus.readyState = Primus.OPENING;
});
};
this.on('incoming::open', function opened() {
primus.readyState = Primus.OPEN;
primus.emit('open');
/**
* Exponential backoff algorithm for retry aperations. It uses an randomized
* retry so we don't DDOS our server when it goes down under presure.
*
* @param {Function} callback Callback to be called after the timeout.
* @param {Object} opts Options for configuring the timeout.
* @api private
*/
Primus.prototype.backoff = function backoff(callback, opts) {
opts = opts || {};
if (primus.buffer.length) {
for (var i = 0, length = primus.buffer.length; i < length; i++) {
primus.write(primus.buffer[i]);
}
opts.maxDelay = opts.maxDelay || Infinity; // Maximum delay
opts.minDelay = opts.minDelay || 500; // Minimum delay
opts.retries = opts.retries || 25; // Amount of allowed retries
opts.attempt = (+opts.attempt || 0) + 1; // Current attempt
opts.factor = opts.factor || 2; // Backoff factor
// Bailout if we are about to make to much attempts. Please note that we use
// `>` because we already incremented the value above.
if (opts.attempt > opts.retries || opts.backoff) {
return callback(new Error('Unable to retry'), opts);
primus.buffer.length = 0;
}
});
// Prevent duplicate backoff attempts.
opts.backoff = true;
this.on('incoming::data', function message(raw) {
primus.decoder(raw, function decoding(err, packet) {
//
// Do a "save" emit('error') when we fail to parse a message. We don't
// want to throw here as listening to errors should be optional.
//
if (err) return primus.listeners('error').length && primus.emit('error', err);
if ('primus::server::close' === packet) return primus.emit('incoming::end', packet);
primus.emit('data', packet, raw);
});
});
// Calculate the timeout, but make it randomly so we don't retry connections
// at the same interval and defeat the purpose. This exponential backoff is
// based on the work of:
this.on('incoming::end', function end(intentional) {
if (primus.readyState === Primus.CLOSED) return;
primus.readyState = Primus.CLOSED;
//
// http://dthain.blogspot.nl/2009/02/exponential-backoff-in-distributed.html
opts.timeout = opts.attempt !== 1
? Math.min(Math.round(
(Math.random() * 1) * opts.minDelay * Math.pow(opts.factor, opts.attempt)
), opts.maxDelay)
: opts.minDelay;
// Some transformers emit garbage when they close the connection. Like the
// reason why it closed etc, we should explicitly check if WE send an
// intentional message.
//
if ('primus::server::close' === intentional) return primus.emit('end');
setTimeout(function delay() {
opts.backoff = false;
callback(undefined, opts);
}, opts.timeout);
this.reconnect(function reconnect(fail, backoff) {
primus.backoff = backoff; // Save the opts again of this backoff.
if (fail) return primus.emit('end');
return this;
};
//
// Try to re-open the connection again.
//
primus.emit('reconnect', backoff);
primus.emit('outgoing::reconnect');
}, primus.backoff);
});
/**
* Parse the connection string.
*
* @param {String} url Connection url
* @returns {Object} Parsed connection.
* @api public
*/
Primus.prototype.parse = function parse(url) {
var a = document.createElement('a');
a.href = url;
//
// Setup the real-time client.
//
this.client();
return a;
};
return this;
};
/**
* Generates a connection url.
*
* @returns {String} The url.
* @api private
*/
Primus.prototype.uri = function uri() {
var server = [];
/**
* Establish a connection with the server.
*
* @api private
*/
Primus.prototype.open = function open() {
this.emit('outgoing::open');
server.push(this.url.protocol === 'https:' ? 'wss:' : 'ws:', '');
server.push(this.url.host, this.pathname.slice(1));
return this;
};
//
// Optionally add a search query
//
if (this.url.search) server.push(this.url.search);
return server.join('/');
};
/**
* Send a new message.
*
* @param {Mixed} data The data that needs to be written.
* @returns {Boolean} Always returns true.
* @api public
*/
Primus.prototype.write = function write(data) {
var primus = this;
/**
* Emit an event to all registered event listeners.
*
* @param {String} event The name of the event.
* @returns {Boolean} Indication if we've emitted an event.
* @api public
*/
Primus.prototype.emit = function emit(event) {
if (!(event in this.events)) return false;
if (this.readyState === Primus.OPEN) {
this.encoder(data, function encoded(err, packet) {
//
// Do a "save" emit('error') when we fail to parse a message. We don't
// want to throw here as listening to errors should be optional.
//
if (err) return primus.listeners('error').length && primus.emit('error', err);
primus.emit('outgoing::data', packet);
});
} else {
primus.buffer.push(data);
}
var args = Array.prototype.slice.call(arguments, 1)
, length = this.events[event].length
, i = 0;
return true;
};
for (; i < length; i++) {
this.events[event][i].apply(this, args);
}
/**
* Close the connection.
*
* @param {Mixed} data last packet of data.
* @api public
*/
Primus.prototype.end = function end(data) {
if (this.readyState === Primus.CLOSED) return this;
if (data) this.write(data);
return true;
};
this.writable = false;
this.readyState = Primus.CLOSED;
/**
* Register a new EventListener for the given event.
*
* @param {String} event Name of the event.
* @param {Functon} fn Callback function.
* @api public
*/
Primus.prototype.on = function on(event, fn) {
if (!(event in this.events)) this.events[event] = [];
this.events[event].push(fn);
this.emit('outgoing::end');
this.emit('end');
return this;
};
return this;
};
/**
* Simple emit wrapper that returns a function that emits an event once it's
* called. This makes it easier for transports to emit specific events. The
* scope of this function is limited as it will only emit one single argument.
*
* @param {String} event Name of the event that we should emit.
* @param {Function} parser Argument parser.
* @api public
*/
Primus.prototype.emits = function emits(event, parser) {
var primus = this;
/**
* Exponential backoff algorithm for retry operations. It uses an randomized
* retry so we don't DDOS our server when it goes down under pressure.
*
* @param {Function} callback Callback to be called after the timeout.
* @param {Object} opts Options for configuring the timeout.
* @api private
*/
Primus.prototype.reconnect = function reconnect(callback, opts) {
opts = opts || {};
return function emit(arg) {
var data = parser ? parser.apply(primus, arguments) : arg;
opts.maxDelay = opts.maxDelay || Infinity; // Maximum delay.
opts.minDelay = opts.minDelay || 500; // Minimum delay.
opts.retries = opts.retries || 10; // Amount of allowed retries.
opts.attempt = (+opts.attempt || 0) + 1; // Current attempt.
opts.factor = opts.factor || 2; // Backoff factor.
//
// Timeout is required to prevent crashes on WebSockets connections on
// mobile devices.
//
setTimeout(function timeout() {
primus.emit('primus::'+ event, data);
}, 0);
};
};
// Bailout if we are about to make to much attempts. Please note that we use
// `>` because we already incremented the value above.
if (opts.attempt > opts.retries || opts.backoff) {
return callback(new Error('Unable to retry'), opts);
}
// Prevent duplicate backoff attempts.
opts.backoff = true;
// Calculate the timeout, but make it randomly so we don't retry connections
// at the same interval and defeat the purpose. This exponential backoff is
// based on the work of:
//
// These libraries are automatically are automatically inserted at the
// serverside using the Primus#library method.
// http://dthain.blogspot.nl/2009/02/exponential-backoff-in-distributed.html
opts.timeout = opts.attempt !== 1
? Math.min(Math.round(
(Math.random() * 1) * opts.minDelay * Math.pow(opts.factor, opts.attempt)
), opts.maxDelay)
: opts.minDelay;
setTimeout(function delay() {
opts.backoff = false;
callback(undefined, opts);
}, opts.timeout);
return this;
};
/**
* Parse the connection string.
*
* @param {String} url Connection url.
* @returns {Object} Parsed connection.
* @api public
*/
Primus.prototype.parse = parse;
/**
* Generates a connection uri.
*
* @param {String} protocol The protocol that should used to crate the uri.
* @param {Boolean} querystring Do we need to include a querystring.
* @returns {String} The url.
* @api private
*/
Primus.prototype.uri = function uri(protocol, querystring) {
var server = [];
server.push(this.url.protocol === 'https:' ? protocol +'s:' : protocol +':', '');
server.push(this.url.host, this.pathname.slice(1));
//
Primus.prototype.client = null; // @import {primus::transport};
Primus.prototype.pathname = null; // @import {primus::pathname};
Primus.prototype.encoder = null; // @import {primus::encoder};
Primus.prototype.decoder = null; // @import {primus::decoder};
Primus.prototype.version = null; // @import {primus::version};
})(this);
// Optionally add a search query.
//
if (this.url.search && querystring) server.push(this.url.search);
return server.join('/');
};
/**
* Return a list of assigned event listeners.
*
* @param {String} event The events that should be listed.
* @returns {Array}
* @api public
*/
Primus.prototype.listeners = function listeners(event) {
return (this._events[event] || []).slice(0);
};
/**
* Emit an event to all registered event listeners.
*
* @param {String} event The name of the event.
* @returns {Boolean} Indication if we've emitted an event.
* @api public
*/
Primus.prototype.emit = function emit(event) {
if (!(event in this._events)) return false;
var args = Array.prototype.slice.call(arguments, 1)
, length = this._events[event].length
, i = 0;
for (; i < length; i++) {
this._events[event][i].apply(this, args);
}
return true;
};
/**
* Register a new EventListener for the given event.
*
* @param {String} event Name of the event.
* @param {Functon} fn Callback function.
* @api public
*/
Primus.prototype.on = function on(event, fn) {
if (!(event in this._events)) this._events[event] = [];
this._events[event].push(fn);
return this;
};
/**
* Remove event listeners.
*
* @param {String} event The event we want to remove.
* @param {Function} fn The listener that we need to find.
* @api public
*/
Primus.prototype.removeListener = function removeListener(event, fn) {
if (!this._events || !(event in this._events)) return this;
var listeners = this._events[event]
, events = [];
for (var i = 0, length = listeners.length; i < length; i++) {
if (!fn || listeners[i] === fn) continue;
events.push(listeners[i]);
}
//
// Reset the array, or remove it completely if we have no more listeners.
//
if (events.length) this._events[event] = events;
else delete this._events[event];
return this;
};
/**
* Simple emit wrapper that returns a function that emits an event once it's
* called. This makes it easier for transports to emit specific events. The
* scope of this function is limited as it will only emit one single argument.
*
* @param {String} event Name of the event that we should emit.
* @param {Function} parser Argument parser.
* @api public
*/
Primus.prototype.emits = function emits(event, parser) {
var primus = this;
return function emit(arg) {
var data = parser ? parser.apply(primus, arguments) : arg;
//
// Timeout is required to prevent crashes on WebSockets connections on
// mobile devices. We need to handle these edge cases in our own library
// as we cannot be certain that all frameworks fix these issues.
//
setTimeout(function timeout() {
primus.emit('incoming::'+ event, data);
}, 0);
};
};
/**
* Syntax sugar, adopt a Socket.IO like API.
*
* @param {String} url The url we want to connect to.
* @param {Object} options Connection options.
* @returns {Primus}
* @api public
*/
Primus.connect = function connect(url, options) {
return new Primus(url, options);
};
//
// These libraries are automatically are automatically inserted at the
// serverside using the Primus#library method.
//
Primus.prototype.pathname = null; // @import {primus::pathname};
Primus.prototype.client = null; // @import {primus::transport};
Primus.prototype.encoder = null; // @import {primus::encoder};
Primus.prototype.decoder = null; // @import {primus::decoder};
Primus.prototype.version = null; // @import {primus::version};

@@ -0,3 +1,9 @@

```
FYI: Consider this module broken, dead until 1.0 is released.
```
# Primus
[![Build Status](https://travis-ci.org/3rd-Eden/primus.png)](https://travis-ci.org/3rd-Eden/primus)
Primus, the creator god of transformers but now also known as universal wrapper

@@ -9,2 +15,10 @@ for real-time frameworks. There are a lot of real-time frameworks available for

### Highlights
1. Effortless switching between real-time frameworks and message parsers.
2. Clean and stream compatible interface for client and server.
3. Fixes bugs in frameworks and real-time communication where needed.
4. Build with love and passion for real-time.
5. Reconnect that actually works.
### Installation

@@ -18,4 +32,512 @@

### Getting started
Primus doesn't ship with real-time frameworks as dependencies, it assumes that
you as user adds them your self as a dependency. This is done to keep the module
as light weight as possible. This works because `require` in will walk through
your directories searching for `node_module` folders that have these matching
dependencies.
Primus needs to be "attached" to a HTTP compatible server. These includes the
build in `http` and `https` servers but also the `spdy` module as it has the
same API as node servers. Creating a new Primus instance is relatively straight
forward:
```js
'use strict';
var Primus = require('primus')
, http = require('http');
var server = http.createServer(/* request handler */)
, primus = new Primus(server, {/* options */});
```
In addition to support different frameworks we've also made it possible to use
custom encoding and decoding libraries. We're using `JSON` by default but you
could also use `msgpack` or `JSONH` for example (but these parsers need to be
supported by Primus, so check out the parser folder for examples). To set parser
you can supply a `parser` configuration option:
```js
var primus = new Primus(server, { parser: 'JSON' });
```
All parsers have an `async` interface for error handling.
As most libraries come with their own client-side framework for making the
connection we've also created a small wrapper for this. The library can be
retrieved using:
```js
primus.library();
```
Which returns the client-side library. It's not minified as that is out of the
scope of this project. You can store this on a CDN or on your static server. Do
what ever you want with it, but I would advice you to regenerate that file every
time you redeploy so it always contains a client side library that is compatible
with your back-end.
Once you're all set up you can start listening for connections. These
connections are announced through the `connection` event.
```js
primus.on('connection', function (spark) {
// spark is the new connection.
});
```
Disconnects are announced using a `disconnection` event:
```js
primus.on('disconnected', funciton (spark) {
// the spark that disconnected
});
```
The `spark` the actual real-time socket/connection. Sparks have a really low
level interface and only expose a couple properties that are cross engine
supported. The interface is modeled towards a Node.js stream compatible
interface.
#### spark.headers
The `spark.headers` property contains contains the headers of either the request
that started a handshake with the server or the headers of the actual real-time
connection. This depends on the module you are using.
#### spark.address
The `spark.address` property contains the `remoteAddress` and `remotePort` of the
connection. If you're running your server behind a reverse proxy it will be
useless to you and you should probably be checking the `spark.headers` for
`x-fowarded-xxx` headers instead.
#### spark.query
The `spark.query` contains the query string you used to connect to server. It's
parsed to a object. Please note that this is not available for all supported
transformers, but it's proven to be to useful to not implement it because one
silly tranformer refuses to support it. Yes.. I'm looking at you,
browserchannel.
#### spark.id
This is the connection id we use to identify the connection. This should not be
seen as a "session id" and can change between disconnects and reconnects.
#### spark.write(data)
You can use the `spark.write` method to send data over the socket. The data is
automatically encoded for you using the `parser` that you've set while creating
the Primus instance. This method always returns `true` so back pressure isn't
handled.
```js
spark.write({ foo: 'bar' });
```
#### spark.end()
The `spark.end()` closes the connection.
#### spark.emits(event, parser)
This method is mostly used internally. It returns a function that emits assigned
`event` every time it's called. It only emits the first received argument or the
result of the optional `parser` call. The `parser` function receives all
arguments and can parse it down to a single value or just extracts the useful
information from the data. Please note that the data that is received here isn't
decoded yet.
```js
spark.emits('event', function parser(structure) {
return structure.data;
});
```
#### spark.on('data')
The `data` event is emitted when a message is received from the client. It's
automatically decoded by the specified decoder.
```js
spark.on('data', function message(data) {
// the message we've received.
});
```
#### spark.on('end')
The `end` event is emitted when the client has disconnected.
```js
primus.on('connection', function (spark) {
console.log('connection has the following headers', spark.headers);
console.log('connection was made from', spark.address);
console.log('connection id', spark.id);
spark.on('data', function (data) {
console.log('recieved data from the client', data);
if ('foo' !== data.secrethandshake) spark.end();
spark.write({ foo: 'bar' });
spark.write('banana');
});
spark.write('Hello world');
})
```
### Connecting from the browser.
Primus comes with it's client framework which can be compiled using
`primus.library()` as mentioned above. To create a connection you can simply
create a new Primus instance:
```js
var primus = new Primus(url, { options });
//
// But it can be easier, with some syntax sugar.
//
var primus = Primus.connect(url, { options });
```
#### primus.write(message)
Once you've created your primus instance you're ready to go. When you want to
write data to your server you can just call the `.write` method:
```js
primus.write('message');
```
It automatically encodes your messages using the parser that you've specified on
the server. So sending objects back and forth between the server is nothing
different then just writing:
```js
primus.write({ foo: 'bar' });
```
When you are sending messages to the server, you don't have to wait for the
`open` event to happen, the client will automatically buffer all the data you've
send and automatically write it to the server once it's connected. The client
supports a couple of different events.
#### primus.on('data')
The `data` event is the most important event of the whole library. It's emitted
when we receive data from the server. The data that is received is already
decoded by the specified parser.
```js
primus.on('data', function message(data) {
console.log('Received a new message from the server', data);
});
```
#### primus.on('open')
The `open` event is emitted when we've successfully created a connection with
the server. It will also be emitted when we've successfully reconnected when the
connection goes down unintentionally.
```js
primus.on('open', function open() {
console.log('Connection is alive and kicking');
});
```
#### primus.on('error')
The `error` event is emitted when something breaks that is out of our control.
Unlike Node.js, we do not throw an error if no error event listener is
specified. The cause of an error could be that we've failed to encode or decode
a message or we failed to create a connection.
```js
primus.on('error', function error(err) {
console.error('Something horrible has happend', err, err.message);
});
```
#### primus.on('reconnect')
The `reconnect` event is emitted when we're attempting to reconnect to the
server. This all happens transparently and it's just a way for you to know when
these reconnects are actually happening.
```js
primus.on('reconnecting', function () {
console.log('reconnecting');
})
```
#### primus.on('end')
The `end` event is emitted when we've closed the connection. When this event is
emitted you should consider your connection to be fully dead with no way of
reconnecting. But it's also emitted when the server closes the connection.
```js
primus.on('end', function () {
console.log('connection closed');
});
```
#### primus.end()
When you want to close the connection you can call the `primus.end()` method.
After this the connection should be considered dead and a new connection needs
to be made using `Primus.connect(url)` or `primus = new Primus(url)` if you want
to talk with the server again.
```js
primus.end();
```
#### Reconnecting
When the connection goes down unexpectedly a automatic reconnect process is
started. It's using a randomized exponential backoff algorithm to prevent
clients to DDOS your server when you reboot as they will all be re-connecting at
different times. The reconnection can be configured using the `options` argument
in `Primus` and you should add these options to the `backoff` property:
```js
primus = Primus.connect(url, {
backoff: {
maxDelay: Infinity // Number: The max delay for a reconnect retry.
, minDelay: 500 // Number: The minimum delay before we reconnect.
, retries: 10 // Number: How many times should we attempt to reconnect.
, factor: 2 // Number The backoff factor.
}
});
```
Please do note when we reconnect, you will receive a new `connection` event on
the server. As the previous connection was completely dead and should there for
be considered a new connection.
If you are interested in learning more about the backoff algorithm you might
want to read http://dthain.blogspot.nl/2009/02/exponential-backoff-in-distributed.html
```js
var primus = Primus.connect(url);
primus.on('data', function (message) {
console.log('recieved a message', message);
primus.write({ echo: message });
});
primus.write('hello world');
```
### Supported real-time frameworks
The following transformers/transports are supported in Primus:
#### engine.io
Engine.io is the low level transport functionality of Socket.io 1.0. It supports
multiple transports for creating a real-time connection. It uses transport
upgrading instead of downgrading which makes it more resilient to blocking
proxies and firewalls. To enable `engine.io` you need to install the `engine.io`
module:
```
npm install engine.io --save
```
And tell `Primus` that you want to us `engine.io` as transformer:
```js
var primus = new Primus(server, { transformer: 'engine.io' });
```
If you want to use the client interface inside of Node.js you also need to
install the `engine.io-client`:
```
npm install engine.io-client --save
```
And then you can access it from your server instance:
```js
var Socket = primus.Socket;
, socket = new Socket('url');
```
#### WebSockets
If you are targeting a high end audience or maybe just something for internal
uses you can use a pure WebSocket server. This uses the `ws` WebSocket module
which is known to be one if not the fastest WebSocket server available in
Node.js and supports all protocol specifications. To use pure WebSockets you
need to install the `ws` module:
```
npm install ws --save
```
And tell `Primus` that you want to use `WebSockets` as transformer:
```js
var primus = new Primus(server, { transformer: 'websockets' });
```
The `WebSockets` transformer comes with build in client support and can be
accessed using:
```js
var Socket = primus.Socket;
, socket = new Socket('url');
```
#### Browserchannel
Browserchannel was the original technology that GMail used for their real-time
communication. It's designed for same domain communication and does not use
WebSockets. To use browserchannel you need to install the `browserchannel`
module:
```
npm install browserchannel --save
```
And tell `Primus` that you want to use `browserchannel` as transformer:
```js
var primus = new Primus(server, { transformer: 'browserchannel' });
```
The `browserchannel` transformer comes with build in client support and can be
accessed using:
```js
var Socket = primus.Socket;
, socket = new Socket('url');
```
#### SockJS
SockJS is a real-time server that focuses on cross-domain connections and does
this by using multiple transports. To use SockJS you need to install the
`sockjs` module:
```
npm install sockjs --save
```
And tell `Primus` that you want to use `sockjs` as transformer:
```js
var primus = new Primus(server, { transformer: 'sockjs' });
```
If yo want to use the client interface inside of Node.js you also need to
install the `sockjs-client-node` module:
```
npm install socket.io-client --save
```
And then you can access it from your server instance:
```js
var Socket = primus.Socket;
, socket = new Socket('url');
```
#### Socket.IO
The Socket.IO transport was written against Socket.IO 0.9.x. It was one of the
first real-time servers written on Node.js and is one of the most used modules
in Node.js. It uses multiple transports to connect the server. To use Socket.IO
you need to install the `socket.io` module:
```
npm install socket.io --save
```
And tell `Primus` that you want to use `socket.io` as transformer:
```js
var primus = new Primus(server, { transformer: 'socket.io' });
```
If you want to use the client interface inside of Node.js you also need to
install the `socket.io-client`:
```
npm install socket.io-client --save
```
And then you can access it from your server instance:
```js
var Socket = primus.Socket;
, socket = new Socket('url');
```
As you can see from the examples above, it doesn't matter how you write the name
of the transformer, we just `toLowerCase()` everything.
### Transformer inconsistencies
- Browserchannel does not give you access to the `remotePort` of the incoming
connection. So when you access `spark.address` the `port` property will be set
to `1337` by default.
- Browserchannel and SockJS do not support connections with query strings. You
can still supply a query string in the `new Primus('http://localhost:80?q=s')`
but it will not be accessible in the `spark.query` property.
- Browserchannel is the only transformer that does not support cross domain
connections.
- SockJS and Browserchannel are originally written in CoffeeScript which can
make it harder to debug when their internals are failing.
- Engine.IO and SockJS do not ship their client-side library with their server
side component. We're bundling a snapshot of these libraries inside of Primus.
We will always be targeting the latest version of these transformers when we
bundle the library.
- There are small bugs in Engine.IO that are causing our tests to fail. I've
submitted patches for these bugs, but they have been reject for silly reasons.
The bug causes closed connections to say open. If you're experiencing this you
can apply this [patch](/3rd-Eden/engine.io/commit/0cf81270e9d5700).
### Versioning
All `0.x.x` releases should be considered unstable and not ready for production.
The version number is layed out as: `major.minor.patch` and tries to follow
semver as closely as possible but this is how we use our version numbering:
<dl>
<dt>major</dt>
<dd>
<p>
A major and possible breaking change has been made in the primus core.
These changes are not backwards compatible with older versions.
</p>
</dd>
<dt>minor</dt>
<dd>
<p>
New features are added or a big change has happend with one of the
real-time libraries that we've supporting.
</p>
</dd>
<dt>patch</dt>
<dd>
<p>
A bug has been fixed, without any major internal and breaking changes.
</p>
</dd>
</dl>
### License
MIT
'use strict';
var parse = require('querystring').parse
, forwarded = require('./forwarded');
/**

@@ -11,18 +14,37 @@ * The Spark is an indefinable, indescribable energy or soul of a transformer

* @param {Object} headers The request headers for this connection.
* @param {Object} address The remoteAddress and port.
* @param {Object} address The object that holds the remoteAddress and port.
* @param {Object} query The query string of request.
* @param {String} id An optional id of the socket, or we will generate one.
* @api public
*/
function Spark(primus, headers, address) {
this.primus = primus; // References to the primus.
this.headers = headers; // The request headers.
this.address = address; // The remote address.
function Spark(primus, headers, address, query, id) {
this.primus = primus; // References to the primus.
this.headers = headers || {}; // The request headers.
this.remote = address || {}; // The remote address location.
this.query = query || {}; // The query string.
this.id = id || this.uuid(); // Unique id for socket.
this.writable = true; // Silly stream compatiblity.
this.readable = true; // Silly stream compatiblity.
this.writable = true; // Silly stream compatiblity.
this.readable = true; // Silly stream compatiblity.
//
// Parse our query string.
//
if ('string' === typeof this.query) this.query = parse(this.query);
this.initialise();
}
Spark.prototype.__proto__ = require('events').EventEmitter.prototype;
Spark.prototype.__proto__ = require('stream').prototype;
//
// Lazy parse interface for ip address information. As nobody is always
// interested in this, we're going to defer parsing until it's actually needed.
//
Object.defineProperty(Spark.prototype, 'address', {
get: function address() {
return forwarded(this.remote, this.headers);
}
});
/**

@@ -35,3 +57,3 @@ * Attach hooks and automatically announce a new connection.

var primus = this.primus
, socket = this;
, spark = this;

@@ -41,4 +63,4 @@ //

//
this.on('primus::data', function message(data) {
primus.decoder(data, function decoding(err, packet) {
spark.on('incoming::data', function message(raw) {
primus.decoder(raw, function decoding(err, packet) {
//

@@ -48,4 +70,4 @@ // Do a "save" emit('error') when we fail to parse a message. We don't

//
if (err) return socket.listeners('error').length && socket.emit('error', err);
socket.emit('data', packet);
if (err) return spark.listeners('error').length && spark.emit('error', err);
spark.emit('data', packet, raw);
});

@@ -57,12 +79,19 @@ });

//
this.on('primus::end', function disconnect() {
socket.emit('end');
socket.removeAllListeners();
spark.on('incoming::end', function disconnect() {
spark.emit('end');
});
//
// End is triggered by both incoming and outgoing events.
//
spark.on('end', function () {
spark.removeAllListeners();
primus.emit('disconnection', spark);
});
//
// Announce a new connection.
//
process.nextTick(function tick() {
primus.emit('connection', this);
primus.emit('connection', spark);
});

@@ -72,2 +101,12 @@ };

/**
* Generate a unique uuid.
*
* @returns {String} uuid.
* @api private
*/
Spark.prototype.uuid = function uuid() {
return Date.now() +'$'+ this.primus.sparks++;
};
/**
* Simple emit wrapper that returns a function that emits an event once it's

@@ -82,8 +121,8 @@ * called. This makes it easier for transports to emit specific events. The

Spark.prototype.emits = function emits(event, parser) {
var socket = this;
var spark = this;
return function emit(arg) {
var data = parser ? parser.apply(socket, arguments) : arg;
var data = parser ? parser.apply(spark, arguments) : arg;
socket.emit('primus::'+ event, data);
spark.emit('incoming::'+ event, data);
};

@@ -93,3 +132,3 @@ };

/**
* Send a new message to a given socket.
* Send a new message to a given spark.
*

@@ -101,5 +140,5 @@ * @param {Mixed} data The data that needs to be written.

Spark.prototype.write = function write(data) {
var socket = this;
var spark = this;
this.primus.encoder(data, function encoded(err, packet) {
spark.primus.encoder(data, function encoded(err, packet) {
//

@@ -109,4 +148,4 @@ // Do a "save" emit('error') when we fail to parse a message. We don't

//
if (err) return socket.listeners('error').length && socket.emit('error', err);
socket.emit('data', packet);
if (err) return spark.listeners('error').length && spark.emit('error', err);
spark.emit('outgoing::data', packet);
});

@@ -120,7 +159,20 @@

*
* @api private
* @param {Mixed} data Optional closing data.
* @api public
*/
Spark.prototype.end = function end() {
this.emit('end');
this.removeAllListeners();
Spark.prototype.end = function end(data) {
if (data) this.write(data);
//
// Tell our connection that this is a intended close and that is shouldn't do
// any reconnect operations.
//
this.write('primus::server::close');
var spark = this;
process.nextTick(function tick() {
spark.emit('outgoing::end');
spark.emit('end');
});
};

@@ -127,0 +179,0 @@

'use strict';
var querystring = require('querystring').parse
, url = require('url').parse;
//
// Used to fake middleware's
//
function noop() {}
/**

@@ -7,6 +15,7 @@ * Transformer skeletons

* @constructor
* @param {Primus} primus Reference to the primus
* @param {Primus} primus Reference to the primus.
* @api public
*/
function Transformer(primus) {
this.Spark = primus.Spark;
this.primus = primus;

@@ -20,3 +29,27 @@ this.service = null;

//
// Simple logger shortcut.
//
Object.defineProperty(Transformer.prototype, 'logger', {
get: function logger() {
return {
error: this.log.bind(this.primus, 'log', 'error'), // Log error <line>.
warn: this.log.bind(this.primus, 'log', 'warn'), // Log warn <line>.
info: this.log.bind(this.primus, 'log', 'info'), // Log info <line>.
debug: this.log.bind(this.primus, 'log', 'debug'), // Log debug <line>.
plain: this.log.bind(this.primus, 'log') // Log x <line>.
};
}
});
/**
* Simple log handler that will emit log messages under the given `type`.
*
* @api private
*/
Transformer.prototype.log = function log(type) {
this.emit.apply(this, arguments);
};
/**
* Create the server and attach the apropriate event listeners.

@@ -29,11 +62,23 @@ *

var server = this.primus.server;
server.listeners('request').map(this.on.bind(this, 'previous::request'));
server.listeners('upgrade').map(this.on.bind(this, 'previous::upgrade'));
//
// Remove the old listeners as we want to be the first request handler for all
// events.
//
server.removeAllListeners('request');
server.removeAllListeners('upgrade');
//
// Start listening for incoming requests if we have a listener assigned to us.
//
if (this.listeners('request').length) {
this.primus.server.on('request', this.request.bind(this));
if (this.listeners('request').length || this.listeners('previous::request').length) {
server.on('request', this.request.bind(this));
}
if (this.listeners('upgrade').length) {
this.primus.server.on('upgrade', this.upgrade.bind(this));
if (this.listeners('upgrade').length || this.listeners('previous::upgrade').length) {
server.on('upgrade', this.upgrade.bind(this));
}

@@ -51,3 +96,5 @@ };

Transformer.prototype.request = function request(req, res) {
this.emit('request', req, res);
if (!this.test(req)) return this.emit('previous::request', req, res);
this.emit('request', req, res, noop);
};

@@ -64,6 +111,35 @@

*/
Transformer.prototype.upgrade = function upgrade(req, res, head) {
this.emit('upgrade');
Transformer.prototype.upgrade = function upgrade(req, socket, head) {
//
// Copy buffer to prevent large buffer retention in Node core.
// @see jmatthewsr-ms/node-slab-memory-issues
//
var buffy = new Buffer(head.length);
head.copy(upgrade);
if (!this.test(req)) return this.emit('previous::upgrade', req, socket, buffy);
this.emit('upgrade', req, socket, buffy, noop);
};
/**
* Check if we should accept this request.
*
* @param {Request} req HTTP Request.
* @returns {Boolean} Do we need to accept this request.
* @api private
*/
Transformer.prototype.test = function test(req) {
req.uri = url(req.url);
var route = this.primus.pathname
, accepted = req.uri.pathname.slice(0, route.length) === route;
if (!accepted) this.emit('unknown', req);
//
// Make sure that the first part of the path matches.
//
return accepted;
};
//

@@ -70,0 +146,0 @@ // Make the transporter extendable.

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc