Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

socket.io-stream

Package Overview
Dependencies
Maintainers
1
Versions
15
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

socket.io-stream - npm Package Compare versions

Comparing version 0.4.0 to 0.5.0

4

lib/index.js

@@ -30,6 +30,6 @@ var Socket = require('./socket')

/**
* Creates a new writable stream.
* Creates a new duplex stream.
*
* @param {Object} options
* @return {IOStream} writable stream
* @return {IOStream} duplex stream
* @api public

@@ -36,0 +36,0 @@ */

@@ -1,3 +0,4 @@

var util = require('util');
var Duplex = require('readable-stream').Duplex;
var util = require('util')
, Duplex = require('readable-stream').Duplex
, debug = require('debug')('socket.io-stream:iostream');

@@ -32,8 +33,37 @@

this._writable = false;
this.destroyed = false;
this.once('finish', this._onfinish.bind(this));
this.on('error', this._onerror.bind(this));
// default to *not* allowing half open sockets
this.allowHalfOpen = options && options.allowHalfOpen || false;
this.on('finish', this._onfinish);
this.on('end', this._onend);
this.on('error', this._onerror);
}
/**
* Ensures that no more I/O activity happens on this stream.
* Not necessary in the usual case.
*
* @api public
*/
IOStream.prototype.destroy = function() {
debug('destroy');
if (this.destroyed) {
debug('already destroyed');
return;
}
this.readable = this.writable = false;
if (this.socket) {
debug('clean up');
this.socket.cleanup(this.id);
this.socket = null;
}
this.destroyed = true;
};
/**

@@ -131,5 +161,5 @@ * Local read

// end after flushing buffer.
this.pushBuffer.push(this._onend.bind(this));
this.pushBuffer.push(this._done.bind(this));
} else {
this._onend();
this._done();
}

@@ -143,3 +173,3 @@ };

*/
IOStream.prototype._onend = function() {
IOStream.prototype._done = function() {
this._readable = false;

@@ -152,4 +182,8 @@

/**
* Local socket just finished
* send 'end' event to remote
* the user has called .end(), and all the bytes have been
* sent out to the other side.
* If allowHalfOpen is false, or if the readable side has
* ended already, then destroy.
* If allowHalfOpen is true, then we need to set writable false,
* so that only the writable side will be cleaned up.
*

@@ -159,6 +193,46 @@ * @api private

IOStream.prototype._onfinish = function() {
debug('_onfinish');
// Local socket just finished
// send 'end' event to remote
this.socket._end(this.id);
this.writable = false;
this._writableState.ended = true;
if (!this.readable || this._readableState.ended) {
debug('_onfinish: ended, destroy %s', this._readableState);
return this.destroy();
}
debug('_onfinish: not ended');
if (!this.allowHalfOpen) {
this.read(0);
this.push(null);
}
};
/**
* the EOF has been received, and no more bytes are coming.
* if the writable side has ended already, then clean everything
* up.
*
* @api private
*/
IOStream.prototype._onend = function() {
debug('_onend');
this.readable = false;
if (!this.writable || this._writableState.finished) {
debug('_onend: %s', this._writableState);
return this.destroy();
}
debug('_onend: not finished');
if (!this.allowHalfOpen) {
this.end();
}
};
/**
* When error in local stream

@@ -172,8 +246,9 @@ * notyify remote

IOStream.prototype._onerror = function(err) {
if (!this.socket) return;
// check if the error came from the corresponding stream.
if (err.remote) return;
// check if the error came from remote stream.
if (!err.remote && this.socket) {
// notify the error to the corresponding remote stream.
this.socket._error(this.id, err);
}
// notify the error to the corresponding write-stream.
this.socket._error(this.id, err);
this.destroy();
};
var util = require('util')
, EventEmitter = require('events').EventEmitter
, IOStream = require('./iostream')
, uuid = require('./uuid')
, debug = require('debug')('socket.io-stream:socket')
, emit = EventEmitter.prototype.emit
, slice = Array.prototype.slice
, uuid = require('./uuid');
, on = EventEmitter.prototype.on
, slice = Array.prototype.slice;

@@ -42,3 +43,3 @@

var eventName = exports.event;
sio.on(eventName, this._onstream.bind(this));
sio.on(eventName, emit.bind(this));
sio.on(eventName + '-read', this._onread.bind(this));

@@ -48,6 +49,14 @@ sio.on(eventName + '-write', this._onwrite.bind(this));

sio.on(eventName + '-error', this._onerror.bind(this));
sio.on('error', this.emit.bind(this, 'error'));
sio.on('error', emit.bind(this, 'error'));
sio.on('disconnect', this._ondisconnect.bind(this));
}
/**
* Original emit function.
*
* @api private
*/
Socket.prototype.$emit = emit;
/**
* Emits streams to this corresponding server/client.

@@ -66,14 +75,27 @@ *

Socket.prototype.on = function(type, options, listener) {
if (~exports.events.indexOf(type)) {
return on.apply(this, arguments);
}
if ('function' == typeof options) {
listener = options;
options = null;
}
this._onstream(type, options, listener);
return this;
};
/**
* Sends a new stream request.
*
* @param {String} event type
* @api private
*/
Socket.prototype._stream = function() {
Socket.prototype._stream = function(type) {
debug('sending new streams');
var args = slice.call(arguments)
, type = args.shift()
, sio = this.sio
, pos = [];
var args = slice.call(arguments, 1)
, pos = []
, sio = this.sio;

@@ -85,3 +107,3 @@ args = args.map(function(stream, i) {

if (stream.socket) {
if (stream.socket || stream.destroyed) {
throw new Error('stream has already been sent.');

@@ -99,9 +121,2 @@ }

// add listers to clean up.
['finish', 'error'].forEach(function(type) {
stream.on(type, function() {
delete this.streams[id];
}.bind(this));
}, this);
// represent a stream in an id.

@@ -111,4 +126,3 @@ return id;

args.unshift(exports.event, pos, type);
sio.emit.apply(sio, args);
sio.emit.apply(sio, [exports.event, type, pos].concat(args));
};

@@ -147,40 +161,41 @@

*
* @param {String} event type
* @param {Object} options for streams
* @param {Function} listener
*
* @api private
*/
Socket.prototype._onstream = function() {
debug('new streams');
Socket.prototype._onstream = function(type, options, listener) {
if ('function' != typeof listener) {
throw TypeError('listener must be a function');
}
var args = slice.call(arguments)
, pos = args.shift()
, type = args.shift()
, streams;
function onstream(pos) {
debug('new streams');
var args = slice.call(arguments, 1);
args = args.map(function(id, i) {
if (!~pos.indexOf(i)) {
// arg is not a stream.
return id;
}
args = args.map(function(id, i) {
if (!~pos.indexOf(i)) {
// arg is not a stream.
return id;
}
if (this.streams[id]) {
this._error(id, 'id already exists');
return;
}
if (this.streams[id]) {
this._error(id, 'id already exists');
return;
}
// TODO: enable to set options.
var stream = this.streams[id] = new IOStream(/*options*/);
stream.id = id;
stream.socket = this;
var stream = this.streams[id] = new IOStream(options);
stream.id = id;
stream.socket = this;
// add listeners to clean up.
['end', 'error'].forEach(function(type) {
stream.on(type, function() {
delete this.streams[id];
}.bind(this));
return stream;
}, this);
return stream;
}, this);
listener.apply(this, args);
}
// for removeListener
onstream.listener = listener;
args.unshift(type);
emit.apply(this, args);
on.call(this, type, onstream);
};

@@ -233,1 +248,19 @@

};
Socket.prototype._ondisconnect = function() {
var stream;
for (var id in this.streams) {
stream = this.streams[id];
stream.destroy();
// Close streams when the underlaying
// socket.io connection is closed (regardless why)
stream.emit('close');
stream.emit('error', new Error('Connection aborted'));
}
};
Socket.prototype.cleanup = function(id) {
delete this.streams[id];
};
{
"name": "socket.io-stream",
"version": "0.4.0",
"version": "0.5.0",
"description": "stream for socket.io",

@@ -5,0 +5,0 @@ "author": "Naoyuki Kanezawa <naoyuki.kanezawa@gmail.com>",

@@ -86,4 +86,4 @@ # Socket.IO stream

### socket.on(event, listener)
Add a `listener` for `event`. `listener` will take streams with any data as arguments.
### socket.on(event, [options], [listener])
Add a `listener` for `event`. `listener` will take streams with any data as arguments. `options` is an object for streams.

@@ -90,0 +90,0 @@ ### ss.createStream([options])

Sorry, the diff of this file is too big to display

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc