socket.io-stream
Advanced tools
Comparing version 0.4.0 to 0.5.0
@@ -30,6 +30,6 @@ var Socket = require('./socket') | ||
/** | ||
* Creates a new writable stream. | ||
* Creates a new duplex stream. | ||
* | ||
* @param {Object} options | ||
* @return {IOStream} writable stream | ||
* @return {IOStream} duplex stream | ||
* @api public | ||
@@ -36,0 +36,0 @@ */ |
@@ -1,3 +0,4 @@ | ||
var util = require('util'); | ||
var Duplex = require('readable-stream').Duplex; | ||
var util = require('util') | ||
, Duplex = require('readable-stream').Duplex | ||
, debug = require('debug')('socket.io-stream:iostream'); | ||
@@ -32,8 +33,37 @@ | ||
this._writable = false; | ||
this.destroyed = false; | ||
this.once('finish', this._onfinish.bind(this)); | ||
this.on('error', this._onerror.bind(this)); | ||
// default to *not* allowing half open sockets | ||
this.allowHalfOpen = options && options.allowHalfOpen || false; | ||
this.on('finish', this._onfinish); | ||
this.on('end', this._onend); | ||
this.on('error', this._onerror); | ||
} | ||
/** | ||
* Ensures that no more I/O activity happens on this stream. | ||
* Not necessary in the usual case. | ||
* | ||
* @api public | ||
*/ | ||
IOStream.prototype.destroy = function() { | ||
debug('destroy'); | ||
if (this.destroyed) { | ||
debug('already destroyed'); | ||
return; | ||
} | ||
this.readable = this.writable = false; | ||
if (this.socket) { | ||
debug('clean up'); | ||
this.socket.cleanup(this.id); | ||
this.socket = null; | ||
} | ||
this.destroyed = true; | ||
}; | ||
/** | ||
@@ -131,5 +161,5 @@ * Local read | ||
// end after flushing buffer. | ||
this.pushBuffer.push(this._onend.bind(this)); | ||
this.pushBuffer.push(this._done.bind(this)); | ||
} else { | ||
this._onend(); | ||
this._done(); | ||
} | ||
@@ -143,3 +173,3 @@ }; | ||
*/ | ||
IOStream.prototype._onend = function() { | ||
IOStream.prototype._done = function() { | ||
this._readable = false; | ||
@@ -152,4 +182,8 @@ | ||
/** | ||
* Local socket just finished | ||
* send 'end' event to remote | ||
* the user has called .end(), and all the bytes have been | ||
* sent out to the other side. | ||
* If allowHalfOpen is false, or if the readable side has | ||
* ended already, then destroy. | ||
* If allowHalfOpen is true, then we need to set writable false, | ||
* so that only the writable side will be cleaned up. | ||
* | ||
@@ -159,6 +193,46 @@ * @api private | ||
IOStream.prototype._onfinish = function() { | ||
debug('_onfinish'); | ||
// Local socket just finished | ||
// send 'end' event to remote | ||
this.socket._end(this.id); | ||
this.writable = false; | ||
this._writableState.ended = true; | ||
if (!this.readable || this._readableState.ended) { | ||
debug('_onfinish: ended, destroy %s', this._readableState); | ||
return this.destroy(); | ||
} | ||
debug('_onfinish: not ended'); | ||
if (!this.allowHalfOpen) { | ||
this.read(0); | ||
this.push(null); | ||
} | ||
}; | ||
/** | ||
* the EOF has been received, and no more bytes are coming. | ||
* if the writable side has ended already, then clean everything | ||
* up. | ||
* | ||
* @api private | ||
*/ | ||
IOStream.prototype._onend = function() { | ||
debug('_onend'); | ||
this.readable = false; | ||
if (!this.writable || this._writableState.finished) { | ||
debug('_onend: %s', this._writableState); | ||
return this.destroy(); | ||
} | ||
debug('_onend: not finished'); | ||
if (!this.allowHalfOpen) { | ||
this.end(); | ||
} | ||
}; | ||
/** | ||
* When error in local stream | ||
@@ -172,8 +246,9 @@ * notyify remote | ||
IOStream.prototype._onerror = function(err) { | ||
if (!this.socket) return; | ||
// check if the error came from the corresponding stream. | ||
if (err.remote) return; | ||
// check if the error came from remote stream. | ||
if (!err.remote && this.socket) { | ||
// notify the error to the corresponding remote stream. | ||
this.socket._error(this.id, err); | ||
} | ||
// notify the error to the corresponding write-stream. | ||
this.socket._error(this.id, err); | ||
this.destroy(); | ||
}; |
var util = require('util') | ||
, EventEmitter = require('events').EventEmitter | ||
, IOStream = require('./iostream') | ||
, uuid = require('./uuid') | ||
, debug = require('debug')('socket.io-stream:socket') | ||
, emit = EventEmitter.prototype.emit | ||
, slice = Array.prototype.slice | ||
, uuid = require('./uuid'); | ||
, on = EventEmitter.prototype.on | ||
, slice = Array.prototype.slice; | ||
@@ -42,3 +43,3 @@ | ||
var eventName = exports.event; | ||
sio.on(eventName, this._onstream.bind(this)); | ||
sio.on(eventName, emit.bind(this)); | ||
sio.on(eventName + '-read', this._onread.bind(this)); | ||
@@ -48,6 +49,14 @@ sio.on(eventName + '-write', this._onwrite.bind(this)); | ||
sio.on(eventName + '-error', this._onerror.bind(this)); | ||
sio.on('error', this.emit.bind(this, 'error')); | ||
sio.on('error', emit.bind(this, 'error')); | ||
sio.on('disconnect', this._ondisconnect.bind(this)); | ||
} | ||
/** | ||
* Original emit function. | ||
* | ||
* @api private | ||
*/ | ||
Socket.prototype.$emit = emit; | ||
/** | ||
* Emits streams to this corresponding server/client. | ||
@@ -66,14 +75,27 @@ * | ||
Socket.prototype.on = function(type, options, listener) { | ||
if (~exports.events.indexOf(type)) { | ||
return on.apply(this, arguments); | ||
} | ||
if ('function' == typeof options) { | ||
listener = options; | ||
options = null; | ||
} | ||
this._onstream(type, options, listener); | ||
return this; | ||
}; | ||
/** | ||
* Sends a new stream request. | ||
* | ||
* @param {String} event type | ||
* @api private | ||
*/ | ||
Socket.prototype._stream = function() { | ||
Socket.prototype._stream = function(type) { | ||
debug('sending new streams'); | ||
var args = slice.call(arguments) | ||
, type = args.shift() | ||
, sio = this.sio | ||
, pos = []; | ||
var args = slice.call(arguments, 1) | ||
, pos = [] | ||
, sio = this.sio; | ||
@@ -85,3 +107,3 @@ args = args.map(function(stream, i) { | ||
if (stream.socket) { | ||
if (stream.socket || stream.destroyed) { | ||
throw new Error('stream has already been sent.'); | ||
@@ -99,9 +121,2 @@ } | ||
// add listers to clean up. | ||
['finish', 'error'].forEach(function(type) { | ||
stream.on(type, function() { | ||
delete this.streams[id]; | ||
}.bind(this)); | ||
}, this); | ||
// represent a stream in an id. | ||
@@ -111,4 +126,3 @@ return id; | ||
args.unshift(exports.event, pos, type); | ||
sio.emit.apply(sio, args); | ||
sio.emit.apply(sio, [exports.event, type, pos].concat(args)); | ||
}; | ||
@@ -147,40 +161,41 @@ | ||
* | ||
* @param {String} event type | ||
* @param {Object} options for streams | ||
* @param {Function} listener | ||
* | ||
* @api private | ||
*/ | ||
Socket.prototype._onstream = function() { | ||
debug('new streams'); | ||
Socket.prototype._onstream = function(type, options, listener) { | ||
if ('function' != typeof listener) { | ||
throw TypeError('listener must be a function'); | ||
} | ||
var args = slice.call(arguments) | ||
, pos = args.shift() | ||
, type = args.shift() | ||
, streams; | ||
function onstream(pos) { | ||
debug('new streams'); | ||
var args = slice.call(arguments, 1); | ||
args = args.map(function(id, i) { | ||
if (!~pos.indexOf(i)) { | ||
// arg is not a stream. | ||
return id; | ||
} | ||
args = args.map(function(id, i) { | ||
if (!~pos.indexOf(i)) { | ||
// arg is not a stream. | ||
return id; | ||
} | ||
if (this.streams[id]) { | ||
this._error(id, 'id already exists'); | ||
return; | ||
} | ||
if (this.streams[id]) { | ||
this._error(id, 'id already exists'); | ||
return; | ||
} | ||
// TODO: enable to set options. | ||
var stream = this.streams[id] = new IOStream(/*options*/); | ||
stream.id = id; | ||
stream.socket = this; | ||
var stream = this.streams[id] = new IOStream(options); | ||
stream.id = id; | ||
stream.socket = this; | ||
// add listeners to clean up. | ||
['end', 'error'].forEach(function(type) { | ||
stream.on(type, function() { | ||
delete this.streams[id]; | ||
}.bind(this)); | ||
return stream; | ||
}, this); | ||
return stream; | ||
}, this); | ||
listener.apply(this, args); | ||
} | ||
// for removeListener | ||
onstream.listener = listener; | ||
args.unshift(type); | ||
emit.apply(this, args); | ||
on.call(this, type, onstream); | ||
}; | ||
@@ -233,1 +248,19 @@ | ||
}; | ||
Socket.prototype._ondisconnect = function() { | ||
var stream; | ||
for (var id in this.streams) { | ||
stream = this.streams[id]; | ||
stream.destroy(); | ||
// Close streams when the underlaying | ||
// socket.io connection is closed (regardless why) | ||
stream.emit('close'); | ||
stream.emit('error', new Error('Connection aborted')); | ||
} | ||
}; | ||
Socket.prototype.cleanup = function(id) { | ||
delete this.streams[id]; | ||
}; | ||
{ | ||
"name": "socket.io-stream", | ||
"version": "0.4.0", | ||
"version": "0.5.0", | ||
"description": "stream for socket.io", | ||
@@ -5,0 +5,0 @@ "author": "Naoyuki Kanezawa <naoyuki.kanezawa@gmail.com>", |
@@ -86,4 +86,4 @@ # Socket.IO stream | ||
### socket.on(event, listener) | ||
Add a `listener` for `event`. `listener` will take streams with any data as arguments. | ||
### socket.on(event, [options], [listener]) | ||
Add a `listener` for `event`. `listener` will take streams with any data as arguments. `options` is an object for streams. | ||
@@ -90,0 +90,0 @@ ### ss.createStream([options]) |
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
160135
1022