rethinkdbdash
Advanced tools
Comparing version 1.16.3 to 1.16.4
@@ -7,4 +7,7 @@ var Readable = require('stream').Readable; | ||
function Stream(cursor) { | ||
this._cursor = cursor; | ||
if (cursor) this._cursor = cursor; | ||
this._pending = 0; // How many time we called _read while no cursor was available | ||
this._index = 0; | ||
this._maxRecursion = 1000; // Hardcoded | ||
Readable.call(this, { | ||
@@ -17,47 +20,115 @@ objectMode: true | ||
Stream.prototype._setCursor = function(cursor) { | ||
if (cursor instanceof Cursor === false) { | ||
this.emit('error', new Error("Cannot create a stream on a single value.")); | ||
return this; | ||
} | ||
this._cursor = cursor; | ||
this._fetchAndDecrement(); | ||
} | ||
Stream.prototype._read = function(size) { | ||
if (this._cursor === undefined) { | ||
this._pending++; | ||
return; | ||
} | ||
this._recursion = 0; | ||
this._fetch(); | ||
} | ||
//TODO: Refactor with _fetch? | ||
Stream.prototype._fetchAndDecrement = function() { | ||
var self = this; | ||
//Avoid maximum call stack errors | ||
var maxRecursion = 1000; // Hardcoded | ||
var recursion = 0; | ||
var fetch = function() { | ||
if (self._cursor._closed === true) { | ||
self.push(null); | ||
} | ||
else { | ||
self._cursor._next().then(function(data) { | ||
// Silently drop null values for now | ||
if (data === null) { | ||
if (recursion++ === maxRecursion) { | ||
process.nextTick(fetch); | ||
self._pending--; | ||
if (self._pending < 0) { | ||
return; | ||
} | ||
if (self._cursor._closed === true) { | ||
self.push(null); | ||
} | ||
else { | ||
self._cursor._next().then(function(data) { | ||
// Silently drop null values for now | ||
if (data === null) { | ||
if (self._recursion++ === self._maxRecursion) { | ||
//Avoid maximum call stack errors | ||
process.nextTick(function() { | ||
self._fetchAndDecrement(); | ||
}); | ||
} | ||
else { | ||
self._fetchAndDecrement(); | ||
} | ||
} | ||
else { | ||
if (self.push(data) !== false) { | ||
if (self._recursion++ === self._maxRecursion) { | ||
process.nextTick(function() { | ||
self._fetchAndDecrement(); | ||
}); | ||
} | ||
else { | ||
fetch(); | ||
self._fetchAndDecrement(); | ||
} | ||
} | ||
} | ||
}).error(function(error) { | ||
if (error.message.match(/No more rows in the/)) { | ||
self.push(null); | ||
} | ||
else if (error.message === "You cannot retrieve data from a cursor that is closed.") { | ||
// if the user call `close`, the cursor may reject pending requests. We just | ||
// ignore them here. | ||
} | ||
else { | ||
self.emit('error', error); | ||
} | ||
}); | ||
} | ||
} | ||
Stream.prototype._fetch = function() { | ||
var self = this; | ||
if (self._cursor._closed === true) { | ||
self.push(null); | ||
} | ||
else { | ||
self._cursor._next().then(function(data) { | ||
// Silently drop null values for now | ||
if (data === null) { | ||
if (self._recursion++ === self._maxRecursion) { | ||
process.nextTick(function() { | ||
self._fetchAndDecrement(); | ||
}); | ||
} | ||
else { | ||
if (self.push(data) !== false) { | ||
if (recursion++ === maxRecursion) { | ||
process.nextTick(fetch); | ||
} | ||
else { | ||
fetch(); | ||
} | ||
self._fetch(); | ||
} | ||
} | ||
else { | ||
if (self.push(data) !== false) { | ||
if (self._recursion++ === self._maxRecursion) { | ||
process.nextTick(function() { | ||
self._fetchAndDecrement(); | ||
}); | ||
} | ||
else { | ||
self._fetch(); | ||
} | ||
} | ||
}).error(function(error) { | ||
if (error.message.match(/No more rows in the/)) { | ||
self.push(null); | ||
} | ||
else if (error.message === "You cannot retrieve data from a cursor that is closed.") { | ||
// if the user call `close`, the cursor may reject pending requests. We just | ||
// ignore them here. | ||
} | ||
else { | ||
self.emit('error', error); | ||
} | ||
}); | ||
} | ||
} | ||
}).error(function(error) { | ||
if (error.message.match(/No more rows in the/)) { | ||
self.push(null); | ||
} | ||
else if (error.message === "You cannot retrieve data from a cursor that is closed.") { | ||
// if the user call `close`, the cursor may reject pending requests. We just | ||
// ignore them here. | ||
} | ||
else { | ||
self.emit('error', error); | ||
} | ||
}); | ||
} | ||
fetch(); | ||
} | ||
@@ -64,0 +135,0 @@ |
{ | ||
"name": "rethinkdbdash", | ||
"version": "1.16.3", | ||
"version": "1.16.4", | ||
"description": "A Node.js driver for RethinkDB with promises and a connection pool", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
@@ -122,7 +122,8 @@ rethinkdbdash | ||
cursor: <boolean> // if you want a cursor by default instead of an array or feed, default false | ||
stream: <boolean> // if you want a stream by default instead of an array or feed, default false | ||
} | ||
``` | ||
_Note_: The option `{stream: true}` that asynchronously returns a stream is deprecated. Use `toStream` instead. | ||
#### Promises #### | ||
@@ -211,13 +212,17 @@ | ||
Rethinkdbdash automatically coerce cursor to arrays. If you want a stream, you can call the | ||
`run` command with the option `{stream: true}` or import the driver with `{stream: true}`. | ||
If you prefer streams over cursors and arrays, you can use `toStream`. This synchronous method | ||
returns a stream that you can pipe. | ||
```js | ||
var stream = yield r.expr([1, 2, 3]).run({stream: true}); | ||
stream.pipe(stringifier).pipe(writableStream); | ||
r.expr([1, 2, 3]).toStream() | ||
.on('error', handleError) | ||
.pipe(stringifier) | ||
.on('error', handleError) | ||
.pipe(writableStream); | ||
``` | ||
_Note_: Make sure to not pass the option `cursor: true` or a cursor will be returned. | ||
_Note:_ The stream will emit an error if you provide it with a single value (arrays and grouped data | ||
work fine). | ||
_Note_: `null` values are currently dropped from streams. | ||
_Note:_ `null` values are currently dropped from streams. | ||
@@ -224,0 +229,0 @@ |
@@ -293,1 +293,46 @@ var config = require('./config.js'); | ||
It("toStream", function* (done) { | ||
try { | ||
stream = r.db(dbName).table(tableName).toStream(); | ||
stream.once('readable', function() { | ||
var doc = stream.read(); | ||
if (doc === null) { | ||
done(new Error("stream.read() should not return null when readable was emitted")); | ||
} | ||
var count = 1; | ||
stream.on('data', function(data) { | ||
count++; | ||
if (count === numDocs) { | ||
done(); | ||
} | ||
}); | ||
}); | ||
} | ||
catch(e) { | ||
done(e); | ||
} | ||
}) | ||
It("toStream - with grouped data", function* (done) { | ||
try { | ||
stream = r.db(dbName).table(tableName).group({index: 'id'}).toStream(); | ||
stream.once('readable', function() { | ||
var doc = stream.read(); | ||
if (doc === null) { | ||
done(new Error("stream.read() should not return null when readable was emitted")); | ||
} | ||
var count = 1; | ||
stream.on('data', function(data) { | ||
count++; | ||
if (count === numDocs) { | ||
done(); | ||
} | ||
}); | ||
}); | ||
} | ||
catch(e) { | ||
done(e); | ||
} | ||
}) | ||
Sorry, the diff of this file is too big to display
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
569855
16867
300
15
102