Comparing version 0.1.0 to 0.2.0
@@ -31,6 +31,6 @@ /* | ||
socket.once('connect', function() { | ||
socket.on('connect', function() { | ||
notConnected = false; | ||
_reqQueue.forEach(function(req) { | ||
client.get.apply(null, req); | ||
streamable.get.apply(null, req); | ||
}); | ||
@@ -40,3 +40,7 @@ _reqQueue = []; | ||
socket.on('disconnect', function() { | ||
notConnected = true; | ||
}); | ||
var handleDone = function(events) { | ||
@@ -49,10 +53,27 @@ return function ajaxDone(data) { | ||
socket.on(data.streamId, function onData(payload) { | ||
if (payload == '\n\n\n\n') { | ||
// this is a completion. | ||
if (payload[0] === '\n\n\n\n') { | ||
socket.removeAllListeners(data.streamId); | ||
events.onEnd(); | ||
// this is a terminating error. | ||
} else if (payload[0] === 'err.') { | ||
events.onError(payload[1]); | ||
socket.removeAllListeners(data.streamId); | ||
events.onEnd(); | ||
// this is a non-terminating error. | ||
} else if (payload[0] === 'err') { | ||
events.onError(payload[1]); | ||
// this is a data payload. | ||
} else { | ||
events.onData(payload); | ||
events.onData.apply(null, payload); | ||
} | ||
}); | ||
// notify server that we're ready to receive | ||
// our response stream. | ||
socket.emit(data.streamId+'ack'); | ||
}; | ||
@@ -65,4 +86,5 @@ }; | ||
events.onError(errorThrown); | ||
events.onEnd(); | ||
}; | ||
socket.removeAllListeners(data.streamId); | ||
events.onEnd(); | ||
}; | ||
}; | ||
@@ -69,0 +91,0 @@ |
@@ -14,5 +14,6 @@ // 1. setup a basic express server | ||
app.get('/foobar', streamable, function(req, res) { | ||
var inter, counter = 31; | ||
var inter, counter = 30; | ||
inter = setInterval(function() { | ||
res.write("foobar: "+counter); | ||
res.write("foobar:", counter, {foo: 'bar'}); | ||
if (counter == 20) { res.error(new Error('send a non-fatal error;')); } | ||
if (--counter == 0) { | ||
@@ -22,3 +23,3 @@ clearInterval(inter); | ||
} | ||
}, 100); | ||
}, 50); | ||
}); | ||
@@ -25,0 +26,0 @@ |
{ | ||
"name" : "streamable", | ||
"version" : "0.1.0", | ||
"version" : "0.2.0", | ||
"description" : "Super simple streaming responses for Connect/Express.", | ||
@@ -5,0 +5,0 @@ "author": { |
@@ -30,3 +30,3 @@ # Streamable: Super simple streaming responses for Connect/Express. | ||
```js | ||
var streamable = require('streamable').streamable(io) | ||
var streamable = require('streamable').streamable(io); | ||
``` | ||
@@ -33,0 +33,0 @@ |
@@ -17,3 +17,3 @@ var randBytes = require('crypto').randomBytes; | ||
/* generates a stream object to be used within | ||
a REST request. basic API: write/end. */ | ||
a REST request. basic API: write/error/end. */ | ||
function genStream(sessionId, streamId) { | ||
@@ -23,14 +23,31 @@ var stream; | ||
var _write = function(data) { | ||
socketSession.emit(streamId, data); | ||
}; | ||
if (socketSession) { | ||
stream = { | ||
sessionId : sessionId, | ||
streamId : streamId, | ||
socket : socketSession, | ||
write: function() { | ||
var args = Array.prototype.slice.call(arguments, 0); | ||
args.unshift(streamId); | ||
socketSession.emit.apply(socketSession, args); | ||
_write(Array.prototype.slice.call(arguments, 0)); | ||
}, | ||
error: function(err) { | ||
if (err instanceof Error && err.message) { err = err.message; } | ||
_write(['err', err]); | ||
}, | ||
fatal: function(err) { | ||
if (err instanceof Error && err.message) { err = err.message; } | ||
_write(['err.', err]); | ||
}, | ||
end: function() { | ||
socketSession.emit(streamId, '\n\n\n\n'); | ||
_write(['\n\n\n\n']); | ||
} | ||
} | ||
@@ -84,7 +101,34 @@ } | ||
streamResponse(req, res, function(stream) { | ||
res.write = stream.write; | ||
res.end = stream.end; | ||
next(); | ||
var ackTimeout, ackEvent = stream.streamId+'ack'; | ||
// if the response stream is not acknowledged | ||
// in a timely fashion, we give up on it. | ||
ackTimeout = setTimeout(function() { | ||
stream.socket.removeAllListeners(ackEvent); | ||
stream.fatal(new Error("stream acknowledgement timed out")); | ||
}, 5000); | ||
// once our response stream has been | ||
// acknowledged, we allow our middleware | ||
// to progress. | ||
stream.socket.once(ackEvent, function() { | ||
clearTimeout(ackTimeout); | ||
// writes a buffer to the stream. | ||
res.write = stream.write; | ||
// writes a non-fatal error to the stream. | ||
res.error = stream.error; | ||
// writes a fatal error to the stream, with intent to close the stream. | ||
res.fatal = stream.fatal; | ||
// closes the stream. | ||
res.end = stream.end; | ||
next(); | ||
}); | ||
}); | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
12686
232