New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

rethinkdbdash

Package Overview
Dependencies
Maintainers
1
Versions
153
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rethinkdbdash - npm Package Compare versions

Comparing version 1.16.3 to 1.16.4

143

lib/stream.js

@@ -7,4 +7,7 @@ var Readable = require('stream').Readable;

function Stream(cursor) {
this._cursor = cursor;
if (cursor) this._cursor = cursor;
this._pending = 0; // How many time we called _read while no cursor was available
this._index = 0;
this._maxRecursion = 1000; // Hardcoded
Readable.call(this, {

@@ -17,47 +20,115 @@ objectMode: true

Stream.prototype._setCursor = function(cursor) {
if (cursor instanceof Cursor === false) {
this.emit('error', new Error("Cannot create a stream on a single value."));
return this;
}
this._cursor = cursor;
this._fetchAndDecrement();
}
Stream.prototype._read = function(size) {
if (this._cursor === undefined) {
this._pending++;
return;
}
this._recursion = 0;
this._fetch();
}
//TODO: Refactor with _fetch?
Stream.prototype._fetchAndDecrement = function() {
var self = this;
//Avoid maximum call stack errors
var maxRecursion = 1000; // Hardcoded
var recursion = 0;
var fetch = function() {
if (self._cursor._closed === true) {
self.push(null);
}
else {
self._cursor._next().then(function(data) {
// Silently drop null values for now
if (data === null) {
if (recursion++ === maxRecursion) {
process.nextTick(fetch);
self._pending--;
if (self._pending < 0) {
return;
}
if (self._cursor._closed === true) {
self.push(null);
}
else {
self._cursor._next().then(function(data) {
// Silently drop null values for now
if (data === null) {
if (self._recursion++ === self._maxRecursion) {
//Avoid maximum call stack errors
process.nextTick(function() {
self._fetchAndDecrement();
});
}
else {
self._fetchAndDecrement();
}
}
else {
if (self.push(data) !== false) {
if (self._recursion++ === self._maxRecursion) {
process.nextTick(function() {
self._fetchAndDecrement();
});
}
else {
fetch();
self._fetchAndDecrement();
}
}
}
}).error(function(error) {
if (error.message.match(/No more rows in the/)) {
self.push(null);
}
else if (error.message === "You cannot retrieve data from a cursor that is closed.") {
// if the user call `close`, the cursor may reject pending requests. We just
// ignore them here.
}
else {
self.emit('error', error);
}
});
}
}
Stream.prototype._fetch = function() {
var self = this;
if (self._cursor._closed === true) {
self.push(null);
}
else {
self._cursor._next().then(function(data) {
// Silently drop null values for now
if (data === null) {
if (self._recursion++ === self._maxRecursion) {
process.nextTick(function() {
self._fetchAndDecrement();
});
}
else {
if (self.push(data) !== false) {
if (recursion++ === maxRecursion) {
process.nextTick(fetch);
}
else {
fetch();
}
self._fetch();
}
}
else {
if (self.push(data) !== false) {
if (self._recursion++ === self._maxRecursion) {
process.nextTick(function() {
self._fetchAndDecrement();
});
}
else {
self._fetch();
}
}
}).error(function(error) {
if (error.message.match(/No more rows in the/)) {
self.push(null);
}
else if (error.message === "You cannot retrieve data from a cursor that is closed.") {
// if the user call `close`, the cursor may reject pending requests. We just
// ignore them here.
}
else {
self.emit('error', error);
}
});
}
}
}).error(function(error) {
if (error.message.match(/No more rows in the/)) {
self.push(null);
}
else if (error.message === "You cannot retrieve data from a cursor that is closed.") {
// if the user call `close`, the cursor may reject pending requests. We just
// ignore them here.
}
else {
self.emit('error', error);
}
});
}
fetch();
}

@@ -64,0 +135,0 @@

{
"name": "rethinkdbdash",
"version": "1.16.3",
"version": "1.16.4",
"description": "A Node.js driver for RethinkDB with promises and a connection pool",

@@ -5,0 +5,0 @@ "main": "lib/index.js",

@@ -122,7 +122,8 @@ rethinkdbdash

cursor: <boolean> // if you want a cursor by default instead of an array or feed, default false
stream: <boolean> // if you want a stream by default instead of an array or feed, default false
}
```
_Note_: The option `{stream: true}` that asynchronously returns a stream is deprecated. Use `toStream` instead.
#### Promises ####

@@ -211,13 +212,17 @@

Rethinkdbdash automatically coerce cursor to arrays. If you want a stream, you can call the
`run` command with the option `{stream: true}` or import the driver with `{stream: true}`.
If you prefer streams over cursors and arrays, you can use `toStream`. This synchronous method
returns a stream that you can pipe.
```js
var stream = yield r.expr([1, 2, 3]).run({stream: true});
stream.pipe(stringifier).pipe(writableStream);
r.expr([1, 2, 3]).toStream()
.on('error', handleError)
.pipe(stringifier)
.on('error', handleError)
.pipe(writableStream);
```
_Note_: Make sure to not pass the option `cursor: true` or a cursor will be returned.
_Note:_ The stream will emit an error if you provide it with a single value (arrays and grouped data
work fine).
_Note_: `null` values are currently dropped from streams.
_Note:_ `null` values are currently dropped from streams.

@@ -224,0 +229,0 @@

@@ -293,1 +293,46 @@ var config = require('./config.js');

It("toStream", function* (done) {
try {
stream = r.db(dbName).table(tableName).toStream();
stream.once('readable', function() {
var doc = stream.read();
if (doc === null) {
done(new Error("stream.read() should not return null when readable was emitted"));
}
var count = 1;
stream.on('data', function(data) {
count++;
if (count === numDocs) {
done();
}
});
});
}
catch(e) {
done(e);
}
})
It("toStream - with grouped data", function* (done) {
try {
stream = r.db(dbName).table(tableName).group({index: 'id'}).toStream();
stream.once('readable', function() {
var doc = stream.read();
if (doc === null) {
done(new Error("stream.read() should not return null when readable was emitted"));
}
var count = 1;
stream.on('data', function(data) {
count++;
if (count === numDocs) {
done();
}
});
});
}
catch(e) {
done(e);
}
})

Sorry, the diff of this file is too big to display

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc