Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

penseur

Package Overview
Dependencies
Maintainers
1
Versions
103
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

penseur - npm Package Compare versions

Comparing version 7.14.0 to 8.0.0

410

lib/db.js

@@ -7,5 +7,5 @@ 'use strict';

const Hoek = require('hoek');
const Items = require('items');
const Joi = require('joi');
const RethinkDB = require('rethinkdb');
const Id = require('./id');

@@ -126,42 +126,25 @@ const Table = require('./table');

connect(callback) {
async connect() {
this._connect((err) => {
await this._connect();
const exists = await this._exists();
if (err) {
return callback(err);
}
if (!exists) {
this.close();
throw Boom.internal(`Missing database: ${this.name}`);
}
this._exists((err, exists) => {
await this._verify();
if (err) {
return callback(err);
}
// Reconnect changes feeds
if (!exists) {
this.close();
return callback(Boom.internal(`Missing database: ${this.name}`));
}
this._verify((err) => {
if (err) {
return callback(err);
}
// Reconnect changes feeds
const each = (feedId, next) => {
const feed = this._feeds[feedId];
return feed.table.changes(feed.criteria, feed.options, next);
};
Items.serial(Object.keys(this._feeds), each, callback);
});
});
});
const feeds = Object.keys(this._feeds);
for (let i = 0; i < feeds.length; ++i) {
const feedId = feeds[i];
const feed = this._feeds[feedId];
await feed.table.changes(feed.criteria, feed.options);
}
}
_connect(callback) {
async _connect() {

@@ -178,39 +161,41 @@ const settings = this._connectionOptions || {};

RethinkDB.connect(settings, (err, connection) => {
const connection = await RethinkDB.connect(settings);
this._connection = connection;
this._connectionOptions = settings;
if (err) {
return callback(err);
}
this._connection.on('error', (err) => this._settings.onError(err));
this._connection.on('timeout', () => this._settings.onError(Boom.internal('Database connection timeout')));
this._connection.once('close', async () => {
this._connection = connection;
this._connectionOptions = settings;
const reconnect = this._willReconnect();
this._settings.onDisconnect(reconnect);
this._connection.on('error', (err) => this._settings.onError(err));
this._connection.on('timeout', () => this._settings.onError(Boom.internal('Database connection timeout')));
this._connection.once('close', () => {
if (!reconnect) {
return;
}
this._settings.onDisconnect(this._willReconnect());
if (this._willReconnect()) {
let first = true;
const loop = (err) => {
let first = true;
const loop = async (err) => {
if (!err) {
return;
}
first = false;
await Hoek.wait(this._settings.reconnectTimeout && !first ? this._settings.reconnectTimeout : 0);
first = false;
setTimeout(() => { // Prevents stack overflow if connect() fails on same tick
this._settings.onError(err);
this.connect(loop);
}, this._settings.reconnectTimeout && !first ? this._settings.reconnectTimeout : 0);
};
this.connect(loop);
this._settings.onError(err);
try {
await this.connect();
}
});
catch (err) {
await loop(err);
}
};
this._settings.onConnect();
return callback();
try {
await this.connect();
}
catch (err) {
await loop(err);
}
});
this._settings.onConnect();
}

@@ -223,10 +208,8 @@

close(next) {
async close() {
next = next || Hoek.ignore;
this._connectionOptions = null; // Stop reconnections
if (!this._connection) {
return next();
return;
}

@@ -245,11 +228,7 @@

this._connection.close((err) => { // Explicit callback to avoid generating a promise
if (this._connection) {
this._connection.removeAllListeners();
this._connection = null;
}
return next(err);
});
await this._connection.close();
if (this._connection) {
this._connection.removeAllListeners();
this._connection = null;
}
}

@@ -339,15 +318,7 @@

establish(tables, callback) {
async establish(tables) {
if (!this._connection) {
this._connect((err) => {
if (err) {
return callback(err);
}
return this.establish(tables, callback);
});
return;
await this._connect();
return this.establish(tables);
}

@@ -358,155 +329,97 @@

const finalize = (err) => {
const exists = await this._exists();
if (!exists) {
await RethinkDB.dbCreate(this.name).run(this._connection);
}
if (err) {
return callback(err);
}
await this._createTable(byName);
return this._verify();
}
return this._createTable(byName, (err) => {
async _exists() {
if (err) {
return callback(err);
}
const names = await RethinkDB.dbList().run(this._connection);
return (names.indexOf(this.name) !== -1);
}
return this._verify(callback);
});
};
async _createTable(tables) {
this._exists((err, exists) => {
const configs = await RethinkDB.db(this.name).tableList().map((table) => RethinkDB.db(this.name).table(table).config()).run(this._connection);
if (err ||
exists) {
const existing = {};
configs.forEach((config) => {
return finalize(err);
}
RethinkDB.dbCreate(this.name).run(this._connection, finalize);
existing[config.name] = config;
});
}
_exists(callback) {
const names = Object.keys(tables);
for (let i = 0; i < names.length; ++i) {
const name = names[i];
RethinkDB.dbList().run(this._connection, (err, names) => {
let tableOptions = tables[name];
if (tableOptions === false) {
continue;
}
if (err) {
return callback(err);
if (tableOptions === true) {
tableOptions = {};
}
return callback(null, names.indexOf(this.name) !== -1);
});
}
// Check primary key
_createTable(tables, callback) {
const primaryKey = tableOptions.primary || 'id';
const existingConfig = existing[name];
let drop = false;
if (existingConfig &&
existingConfig.primary_key !== primaryKey) {
RethinkDB.db(this.name).tableList().map((table) => RethinkDB.db(this.name).table(table).config()).run(this._connection, (err, configs) => {
if (err) {
return callback(err);
drop = RethinkDB.db(this.name).tableDrop(name);
}
const existing = {};
configs.forEach((config) => {
// Create new table
existing[config.name] = config;
});
if (!existingConfig ||
drop) {
const each = (name, next) => {
const create = RethinkDB.db(this.name).tableCreate(name, { primaryKey });
const change = (drop ? RethinkDB.and(drop, create) : create);
await change.run(this._connection);
}
else {
let tableOptions = tables[name];
if (tableOptions === false) {
return next();
}
// Reuse existing table
if (tableOptions === true) {
tableOptions = {};
if (tableOptions.purge !== false) { // Defaults to true
await this.tables[name].empty();
}
const finalize = (err) => {
if (err) {
return next(err);
if (tableOptions.secondary !== false) { // false means leave as-is (vs null or empty array which drops existing)
for (let j = 0; j < existingConfig.indexes.length; ++j) {
const index = existingConfig.indexes[j];
await RethinkDB.db(this.name).table(name).indexDrop(index).run(this._connection);
}
if (!tableOptions.secondary) {
return next();
}
return this.tables[name].index(tableOptions.secondary, next);
};
// Check primary key
const primaryKey = tableOptions.primary || 'id';
const existingConfig = existing[name];
let drop = false;
if (existingConfig &&
existingConfig.primary_key !== primaryKey) {
drop = RethinkDB.db(this.name).tableDrop(name);
}
}
// Create new table
if (!tableOptions.secondary) {
continue;
}
if (!existingConfig ||
drop) {
const create = RethinkDB.db(this.name).tableCreate(name, { primaryKey });
const change = (drop ? RethinkDB.and(drop, create) : create);
return change.run(this._connection, (err) => finalize(err));
}
// Reuse existing table
const recreate = () => {
if (tableOptions.secondary === false) { // false means leave as-is (vs null or empty array which drops existing)
return finalize();
}
const eachIndex = (index, nextIndex) => RethinkDB.db(this.name).table(name).indexDrop(index).run(this._connection, nextIndex);
Items.parallel(existingConfig.indexes, eachIndex, (err) => finalize(err));
};
if (tableOptions.purge === false) { // Defaults to true
return recreate();
}
this.tables[name].empty((err) => {
if (err) {
return next(err);
}
return recreate();
});
};
const names = Object.keys(tables);
Items.parallel(names, each, callback);
});
await this.tables[name].index(tableOptions.secondary);
}
}
_verify(callback) {
async _verify() {
const each = (name, next) => {
const names = Object.keys(this.tables);
for (let i = 0; i < names.length; ++i) {
const name = names[i];
const table = this.tables[name];
Id.verify(table, { allocate: false }, (err) => {
if (err) {
return next(err);
}
return Unique.verify(table, next);
});
};
Items.serial(Object.keys(this.tables), each, callback);
await Id.verify(table, { allocate: false });
await Unique.verify(table);
}
}
run(request /* options, callback */) {
run(request, options = {}) {
const options = arguments.length === 3 ? arguments[1] : {};
const callback = arguments.length === 3 ? arguments[2] : arguments[1];
// Extract table name from ReQL object

@@ -524,9 +437,9 @@

return this._run(request, options, table, 'run', null, callback);
return this._run(request, options, table, 'run');
}
_run(request, options, table, action, inputs, callback) {
async _run(request, options, table, action, inputs = null) {
if (!this._connection) {
return Table._error(table, action, 'Database disconnected', inputs, Hoek.nextTick(callback));
throw Table._error(table, action, 'Database disconnected', inputs);
}

@@ -546,27 +459,12 @@

request.run(this._connection, options, (err, result) => {
if (err) {
return Table._error(table, action, err, inputs, callback);
}
try {
const result = await request.run(this._connection, options);
if (result === null) {
return callback(null, null);
return null;
}
if (result.errors) {
return Table._error(table, action, result.first_error, inputs, callback);
throw Table._error(table, action, result.first_error, inputs);
}
const empty = (results) => {
if (!results ||
!Array.isArray(results)) {
return results;
}
return (results.length ? results : null);
};
// Single item

@@ -577,3 +475,3 @@

return callback(null, empty(result));
return internals.empty(result);
}

@@ -583,28 +481,9 @@

const cursor = result;
const finalize = (err, results) => {
if (err) {
return Table._error(table, action, err, inputs, callback);
}
cursor.close();
return callback(null, empty(results));
};
if (!callback.each) {
return cursor.toArray(finalize);
}
const each = (err, item) => {
if (err) {
return finalize(err);
}
callback.each(item);
};
return cursor.each(each, finalize);
});
const results = await result.toArray();
result.close();
return internals.empty(results);
}
catch (err) {
throw Table._error(table, action, err, inputs);
}
}

@@ -706,3 +585,3 @@

this.tables[table].changes = function (criteria, each, callback) {
this.tables[table].changes = function (criteria, each) {

@@ -712,3 +591,3 @@ const error = Boom.internal('Simulated database error');

process.nextTick(() => each(error));
return callback(null, { close: Hoek.ignore });
return { close: Hoek.ignore };
};

@@ -729,12 +608,11 @@

const callback = arguments[arguments.length - 1];
if (value !== undefined) {
if (value instanceof Error) {
return callback(value);
return Promise.reject(value);
}
return callback(null, value);
return value;
}
return callback(Boom.internal('Simulated database error'));
return Promise.reject(Boom.internal('Simulated database error'));
};

@@ -767,1 +645,13 @@ };

};
internals.empty = function (results) {
if (!results ||
!Array.isArray(results)) {
return results;
}
return (results.length ? results : null);
};

@@ -8,3 +8,2 @@ 'use strict';

const Hoek = require('hoek');
const Items = require('items');
const Radix62 = require('radix62');

@@ -38,7 +37,7 @@

if (!allowArray) {
return new Error('Array of ids not supported');
throw new Error('Array of ids not supported');
}
if (!ids.length) {
return new Error('Empty array of ids not supported');
throw new Error('Empty array of ids not supported');
}

@@ -49,6 +48,2 @@

const id = internals.validate(ids[i]);
if (id instanceof Error) {
return id;
}
normalized.push(id);

@@ -67,3 +62,3 @@ }

if (id.id === undefined) {
return new Error('Invalid object id');
throw new Error('Invalid object id');
}

@@ -77,3 +72,3 @@

return new Error('Invalid null or undefined id');
throw new Error('Invalid null or undefined id');
}

@@ -84,3 +79,3 @@

return new Error(`Invalid id length: ${id}`);
throw new Error(`Invalid id length: ${id}`);
}

@@ -115,6 +110,6 @@

exports.wrap = function (table, items, callback) {
exports.wrap = async function (table, items) {
if (!table._id) {
return Hoek.nextTick(callback)(null, items);
return items;
}

@@ -135,26 +130,11 @@

if (!identifiers.length) {
return Hoek.nextTick(callback)(null, items);
return items;
}
const each = (item, next) => {
for (let i = 0; i < identifiers.length; ++i) {
const identifier = identifiers[i];
identifier[table.primary] = await internals[table._id.type](table);
}
internals[table._id.type](table, (err, id) => {
if (err) {
return next(err);
}
item[table.primary] = id;
return next();
});
};
Items.serial(identifiers, each, (err) => {
if (err) {
return callback(err);
}
return callback(null, Array.isArray(items) ? result : result[0]);
});
return (Array.isArray(items) ? result : result[0]);
};

@@ -181,30 +161,24 @@

internals.uuid = function (table, callback) {
internals.uuid = function (table) {
return Hoek.nextTick(callback)(null, exports.uuid());
return exports.uuid();
};
internals.increment = function (table, callback) {
internals.increment = async function (table) {
exports.verify(table, { allocate: true }, (err, allocated) => {
const allocated = await exports.verify(table, { allocate: true });
if (err) {
return callback(err);
}
if (allocated) {
return internals.radix(allocated, table._id.radix);
}
if (allocated) {
return callback(null, internals.radix(allocated, table._id.radix));
}
table._id.table.next(table._id.record, table._id.key, 1, (err, value) => {
if (err) {
err.message = `Failed allocating increment id: ${table.name}`;
return callback(err);
}
return callback(null, internals.radix(value, table._id.radix));
});
});
try {
const value = await table._id.table.next(table._id.record, table._id.key, 1);
return internals.radix(value, table._id.radix);
}
catch (err) {
err.message = `Failed allocating increment id: ${table.name}`;
throw err;
}
};

@@ -223,3 +197,3 @@

exports.verify = function (table, options, callback) {
exports.verify = async function (table, options) {

@@ -229,3 +203,3 @@ if (!table._id ||

return Hoek.nextTick(callback)();
return;
}

@@ -235,70 +209,69 @@

create[table._id.table.name] = { purge: false, secondary: false };
table._db._createTable(create, (err) => {
try {
await table._db._createTable(create);
}
catch (err) {
err.message = `Failed creating increment id table: ${table.name}`;
throw err;
}
if (err) {
err.message = `Failed creating increment id table: ${table.name}`;
return callback(err);
}
let record;
try {
record = await table._id.table.get(table._id.record);
}
catch (err) {
err.message = `Failed verifying increment id record: ${table.name}`;
throw err;
}
table._id.table.get(table._id.record, (err, record) => {
// Record found
if (err) {
err.message = `Failed verifying increment id record: ${table.name}`;
return callback(err);
}
let initialId = table._id.initial - 1;
let allocatedId = null;
// Record found
if (options.allocate) {
++initialId;
allocatedId = initialId;
}
let initialId = table._id.initial - 1;
let allocatedId = null;
if (record) {
if (record[table._id.key] === undefined) {
if (options.allocate) {
++initialId;
allocatedId = initialId;
// Set key
const changes = {};
changes[table._id.key] = initialId;
try {
await table._id.table.update(table._id.record, changes);
}
catch (err) {
err.message = `Failed initializing key-value pair to increment id record: ${table.name}`;
throw err;
}
if (record) {
if (record[table._id.key] === undefined) {
table._id.verified = true;
return allocatedId;
}
// Set key
if (!Hoek.isInteger(record[table._id.key])) {
throw Boom.internal(`Increment id record contains non-integer value: ${table.name}`);
}
const changes = {};
changes[table._id.key] = initialId;
table._id.table.update(table._id.record, changes, (err) => {
table._id.verified = true;
return;
}
if (err) {
err.message = `Failed initializing key-value pair to increment id record: ${table.name}`;
return callback(err);
}
// Insert record
table._id.verified = true;
return callback(null, allocatedId);
});
return;
}
else if (!Hoek.isInteger(record[table._id.key])) {
return callback(Boom.internal(`Increment id record contains non-integer value: ${table.name}`));
}
table._id.verified = true;
return callback();
}
// Insert record
const item = { id: table._id.record };
item[table._id.key] = initialId;
table._id.table.insert(item, (err, key) => {
if (err) {
err.message = `Failed inserting increment id record: ${table.name}`;
return callback(err);
}
table._id.verified = true;
return callback(null, allocatedId);
});
});
});
const item = { id: table._id.record };
item[table._id.key] = initialId;
try {
await table._id.table.insert(item);
table._id.verified = true;
return allocatedId;
}
catch (err) {
err.message = `Failed inserting increment id record: ${table.name}`;
throw err;
}
};

@@ -7,4 +7,4 @@ 'use strict';

const Hoek = require('hoek');
const Items = require('items');
const RethinkDB = require('rethinkdb');
const Cursor = require('./cursor');

@@ -44,22 +44,16 @@ const Criteria = require('./criteria');

get(ids, options, callback) {
get(ids, options = {}) {
if (!callback) {
callback = options;
options = {};
}
const diag = { ids, options };
const batch = Array.isArray(ids);
ids = Id.normalize(ids, true);
if (ids instanceof Error) {
return this._error('get', ids.message, diag, Hoek.nextTick(callback));
try {
ids = Id.normalize(ids, true);
}
if (batch) {
return this._run(this._refine(this.raw.getAll(RethinkDB.args(ids)), options), 'get', diag, callback);
catch (err) {
return Promise.reject(this._error('get', err.message, diag));
}
return this._run(this._refine(this.raw.get(ids), options), 'get', diag, callback);
const query = (batch ? this.raw.getAll(RethinkDB.args(ids)) : this.raw.get(ids));
return this._run(this._refine(query, options), 'get', diag);
}

@@ -95,33 +89,23 @@

all(options, callback) {
all(options = {}) {
if (!callback) {
callback = options;
options = {};
}
const selection = this.raw;
if (options.chunks) {
return this._chunks(selection, null, 'all', options, callback);
}
this._run(this._refine(selection, options, true), 'all', null, callback);
return this._run(this._refine(this.raw, options, true), 'all');
}
exist(id, callback) {
exist(id) {
const diag = { id };
id = Id.normalize(id, false);
if (id instanceof Error) {
return this._error('exist', id.message, diag, Hoek.nextTick(callback));
try {
id = Id.normalize(id, false);
}
catch (err) {
return Promise.reject(this._error('exist', err.message, diag));
}
this._run(this.raw.get(id).ne(null), 'exist', diag, callback);
return this._run(this.raw.get(id).ne(null), 'exist', diag);
}
distinct(criteria, fields, callback) {
async distinct(criteria, fields) {
if (!callback) {
callback = fields;
if (!fields) {
fields = criteria;

@@ -134,120 +118,43 @@ criteria = null;

const selection = Criteria.select(criteria, this).pluck(fields).distinct();
this._run(selection, 'distinct', { criteria, fields }, (err, result) => {
const result = await this._run(selection, 'distinct', { criteria, fields });
if (!result) {
return null;
}
if (err) {
return callback(err);
}
if (fields.length === 1) {
return result.map((item) => item[fields[0]]);
}
if (!result) {
return callback(null, null);
}
if (fields.length === 1) {
return callback(null, result.map((item) => item[fields[0]]));
}
return callback(null, result);
});
return result;
}
query(criteria, options, callback) {
query(criteria, options = {}) {
if (!callback) {
callback = options;
options = {};
}
const diag = { criteria, options };
const selection = Criteria.select(criteria, this);
if (options.chunks) {
return this._chunks(selection, diag, 'query', options, callback);
}
this._run(this._refine(selection, options), 'query', diag, callback);
return this._run(this._refine(selection, options), 'query', diag);
}
_chunks(selection, diag, action, options, callback) {
async single(criteria) {
Hoek.assert(options.from === undefined && options.count === undefined, 'Cannot use chunks option with from or count');
Hoek.assert(callback.each, 'Must use apiece callback with each handler when specifying chunks option');
const diag = { criteria };
const result = await this._run(Criteria.select(criteria, this), 'single', diag);
if (!result) {
return null;
}
const settings = Hoek.clone(options);
delete settings.chunks;
settings.count = options.chunks;
if (!options.sort) {
settings.sort = { key: this.primary };
if (result.length !== 1) {
return Promise.reject(this._error('single', 'Found multiple items', diag));
}
let count = null;
const step = (err, results) => {
if (err) {
return callback(err);
}
if (results) {
results.forEach(step.each);
}
if (count !== null &&
count < options.chunks) {
return callback();
}
count = 0;
if (settings.from === undefined) {
settings.from = 0;
}
else {
settings.from += options.chunks;
}
this._run(this._refine(selection, settings, action === 'all'), action, diag, step);
};
step.each = (item) => {
++count;
return callback.each(item);
};
return step();
return result[0];
}
single(criteria, callback) {
count(criteria) {
const diag = { criteria };
this._run(Criteria.select(criteria, this), 'single', diag, (err, result) => {
if (err) {
return callback(err);
}
if (!result) {
return callback(null, null);
}
if (result.length !== 1) {
return this._error('single', 'Found multiple items', diag, callback);
}
return callback(null, result[0]);
});
return this._run(Criteria.select(criteria, this).count(), 'count', { criteria });
}
count(criteria, callback) {
async insert(items, options = {}) {
this._run(Criteria.select(criteria, this).count(), 'count', { criteria }, callback);
}
insert(items, options, callback) {
if (!callback) {
callback = options;
options = {};
}
if (!options.chunks ||

@@ -257,3 +164,3 @@ !Array.isArray(items) ||

return this._insert(items, options, callback);
return this._insert(items, options);
}

@@ -263,16 +170,2 @@

let result = [];
const each = (batch, next) => {
this._insert(batch, options, (err, ids) => {
if (err) {
return next(err);
}
result = result.concat(ids);
return next();
});
};
const batches = [];

@@ -285,78 +178,70 @@ let left = items;

Items.serial(batches, each, (err) => callback(err, result));
}
let result = [];
_insert(items, options, callback) {
for (let i = 0; i < batches.length; ++i) {
const batch = batches[i];
const diag = { items, options };
const ids = await this._insert(batch, options);
result = result.concat(ids);
}
Id.wrap(this, items, (err, wrapped) => {
return result;
}
if (err) {
return this._error('insert', err, diag, callback);
}
async _insert(items, options) {
Unique.reserve(this, wrapped, options.merge === true, (err, postUnique) => {
const diag = { items, options };
if (err) {
return this._error('insert', err, diag, callback);
}
let wrapped;
let postUnique;
const opt = {
conflict: options.merge ? 'update' : 'error',
returnChanges: !!postUnique
};
try {
wrapped = await Id.wrap(this, items);
postUnique = await Unique.reserve(this, wrapped, options.merge === true);
}
catch (err) {
throw this._error('insert', err, diag);
}
this._run(this.raw.insert(wrapped, opt), 'insert', diag, (err, result) => {
const opt = {
conflict: options.merge ? 'update' : 'error',
returnChanges: !!postUnique
};
if (err) {
return callback(err);
}
const result = await this._run(this.raw.insert(wrapped, opt), 'insert', diag);
if (postUnique) {
await postUnique(result.changes);
}
const allocated = (err) => {
// Single item
if (err) {
return callback(err);
}
if (!Array.isArray(wrapped)) {
return (wrapped[this.primary] !== undefined ? wrapped[this.primary] : result.generated_keys[0]);
}
// Single item
// Items array
if (!Array.isArray(wrapped)) {
return callback(null, wrapped[this.primary] !== undefined ? wrapped[this.primary] : result.generated_keys[0]);
}
const generated = result.generated_keys || [];
if (generated.length === wrapped.length) {
return result.generated_keys;
}
// Items array
// Mixed array
const generated = result.generated_keys || [];
if (generated.length === wrapped.length) {
return callback(null, result.generated_keys);
}
const ids = [];
let g = 0;
for (let i = 0; i < wrapped.length; ++i) {
if (wrapped[i][this.primary] !== undefined) {
ids.push(wrapped[i][this.primary]);
}
else {
ids.push(result.generated_keys[g++]);
}
}
// Mixed array
return ids;
const ids = [];
let g = 0;
for (let i = 0; i < wrapped.length; ++i) {
if (wrapped[i][this.primary] !== undefined) {
ids.push(wrapped[i][this.primary]);
}
else {
ids.push(result.generated_keys[g++]);
}
}
return callback(null, ids);
};
if (!postUnique) {
return allocated();
}
return postUnique(result.changes, allocated);
});
});
});
}
update(ids, changes, callback) {
async update(ids, changes) {

@@ -368,4 +253,3 @@ const diag = { ids, changes };

if (!callback) { // (updates, callback)
callback = changes;
if (!changes) { // (updates)
changes = ids;

@@ -375,3 +259,3 @@ ids = changes.map((item) => item.id);

else {
if (typeof ids[0] === 'object') { // (updates, options, callback)
if (typeof ids[0] === 'object') { // (updates, options)
const options = changes;

@@ -392,4 +276,8 @@ changes = ids;

const each = (batch, next) => this._update(batch.map((item) => item.id), batch, diag, next);
return Items.serial(batches, each, (err) => callback(err));
for (let i = 0; i < batches.length; ++i) {
const batch = batches[i];
await this._update(batch.map((item) => item.id), batch, diag);
}
return;
}

@@ -399,3 +287,3 @@

}
else { // (ids, changes, callback)
else { // (ids, changes)
Hoek.assert(!Array.isArray(changes), 'Changes cannot be an array when ids is an array');

@@ -405,43 +293,39 @@ }

}
else { // (id, changes, callback)
else { // (id, changes)
Hoek.assert(changes && typeof changes === 'object', 'Invalid changes object');
}
return this._update(ids, changes, diag, callback);
return this._update(ids, changes, diag);
}
_update(ids, changes, diag, callback) {
async _update(ids, changes, diag) {
const batch = Array.isArray(ids);
ids = Id.normalize(ids, true);
if (ids instanceof Error) {
return this._error('update', ids.message, diag, Hoek.nextTick(callback));
try {
ids = Id.normalize(ids, true);
}
catch (err) {
throw this._error('update', err.message, diag);
}
Unique.reserve(this, changes, (batch ? true : ids), (err, postUnique) => {
let postUnique;
try {
postUnique = await Unique.reserve(this, changes, (batch ? true : ids));
}
catch (err) {
throw this._error('update', err, diag);
}
if (err) {
return this._error('update', err, diag, callback);
}
const wrapped = Modifier.wrap(changes, this);
const opts = { returnChanges: !!postUnique };
const query = (batch ? this.raw.getAll(RethinkDB.args(ids)) : this.raw.get(ids));
const result = await this._run(query.replace(wrapped, opts), 'update', diag);
const wrapped = Modifier.wrap(changes, this);
const opts = { returnChanges: !!postUnique };
const query = (batch ? this.raw.getAll(RethinkDB.args(ids)) : this.raw.get(ids));
this._run(query.replace(wrapped, opts), 'update', diag, (err, result) => {
if (err) {
return callback(err);
}
if (!postUnique) {
return callback(null);
}
return postUnique(result.changes, callback);
});
});
if (postUnique) {
return postUnique(result.changes);
}
}
next(id, field, value, callback) {
async next(id, field, value) {

@@ -452,23 +336,19 @@ const changes = {};

const diag = { id, field, value };
id = Id.normalize(id, false);
if (id instanceof Error) {
return this._error('next', id.message, diag, Hoek.nextTick(callback));
try {
id = Id.normalize(id, false);
}
catch (err) {
throw this._error('next', err.message, diag);
}
this._run(this.raw.get(id).update(changes, { returnChanges: true }), 'next', diag, (err, result) => {
const result = await this._run(this.raw.get(id).update(changes, { returnChanges: true }), 'next', diag);
if (!result.replaced) {
throw this._error('next', 'No item found to update', diag);
}
if (err) {
return callback(err);
}
if (!result.replaced) {
return this._error('next', 'No item found to update', diag, callback);
}
const inc = result.changes[0].new_val[field];
return callback(null, inc);
});
const inc = result.changes[0].new_val[field];
return inc;
}
remove(criteria, callback) {
async remove(criteria) {

@@ -480,6 +360,8 @@ const diag = { criteria };

if (isIds) {
criteria = Id.normalize(criteria, true);
if (criteria instanceof Error) {
return this._error('remove', criteria.message, diag, Hoek.nextTick(callback));
try {
criteria = Id.normalize(criteria, true);
}
catch (err) {
throw this._error('remove', err.message, diag);
}
}

@@ -491,48 +373,38 @@

this._run(selection.delete(), 'remove', diag, (err, result) => {
const result = await this._run(selection.delete(), 'remove', diag);
if (isIds &&
!isBatch &&
!result.deleted) {
if (err) {
return callback(err);
}
if (isIds &&
!isBatch &&
!result.deleted) {
return this._error('remove', 'No item found to remove', diag, callback);
}
return callback(null);
});
throw this._error('remove', 'No item found to remove', diag);
}
}
empty(callback) {
async empty() {
this._run(this.raw.delete(), 'empty', null, (err, result) => {
return callback(err, result ? result.deleted : 0);
});
const result = await this._run(this.raw.delete(), 'empty');
return result.deleted;
}
sync(callback) {
async sync() {
if (!this._db._connection) {
return this._error('sync', 'Database disconnected', null, Hoek.nextTick(callback));
throw this._error('sync', 'Database disconnected');
}
this.raw.sync().run(this._db._connection, (err, result) => {
if (err) {
return this._error('sync', err, null, callback);
}
return callback(null);
});
try {
await this.raw.sync().run(this._db._connection);
}
catch (err) {
throw this._error('sync', err);
}
}
index(indexes, callback) {
async index(indexes) {
const pending = [];
const names = [];
const each = (index, next) => {
indexes = [].concat(indexes);
for (let i = 0; i < indexes.length; ++i) {
let index = indexes[i];
if (typeof index === 'string') {

@@ -551,19 +423,13 @@ index = { name: index };

args.push(options);
this._run(this.raw.indexCreate.apply(this.raw, args), 'index', null, next);
};
pending.push(this._run(this.raw.indexCreate.apply(this.raw, args), 'index'));
}
Items.parallel([].concat(indexes), each, (err) => {
if (err) {
return callback(err);
}
this._run(this.raw.indexWait(RethinkDB.args(names)), 'indexWait', null, callback);
});
await Promise.all(pending);
return this._run(this.raw.indexWait(RethinkDB.args(names)), 'indexWait');
}
changes(criteria, options, callback) {
async changes(criteria, options) {
if (!this._db._connection) {
return this._error('changes', 'Database disconnected', criteria, Hoek.nextTick(callback));
throw this._error('changes', 'Database disconnected', criteria);
}

@@ -601,67 +467,68 @@

request.changes().run(this._db._connection, settings, (err, dbCursor) => {
let dbCursor;
try {
dbCursor = await request.changes().run(this._db._connection, settings);
}
catch (err) {
throw this._error('changes', err, criteria);
}
if (err) {
return this._error('changes', err, criteria, callback);
}
const cursor = new Cursor(dbCursor, this, feedId);
const each = (item, next) => {
const cursor = new Cursor(dbCursor, this, feedId);
const each = (item, next) => {
const type = internals.changeTypes[item.type];
if (type === 'initial' &&
item.new_val === null) {
const type = internals.changeTypes[item.type];
if (type === 'initial' &&
item.new_val === null) {
return next(); // Initial result for missing id
}
return next(); // Initial result for missing id
}
const update = {
id: item.old_val ? item.old_val[this.primary] : item.new_val[this.primary],
type,
before: item.old_val || null,
after: item.new_val || null
};
options.handler(null, update);
return next();
const update = {
id: item.old_val ? item.old_val[this.primary] : item.new_val[this.primary],
type,
before: item.old_val || null,
after: item.new_val || null
};
dbCursor.eachAsync(each, (err) => {
options.handler(null, update);
return next();
};
// Changes cursor ends only with an error
dbCursor.eachAsync(each, (err) => {
if (err.msg === 'Cursor is closed.') {
return;
}
// Changes cursor ends only with an error
const disconnected = (err.msg === 'Connection is closed.');
const willReconnect = (disconnected && feedId && this._db._willReconnect()) || false;
cursor.close(false);
if (err.msg === 'Cursor is closed.') {
return;
}
if (feedId &&
!willReconnect) {
const disconnected = (err.msg === 'Connection is closed.');
const willReconnect = (disconnected && feedId && this._db._willReconnect()) || false;
cursor.close(false);
delete this._db._feeds[feedId];
}
if (feedId &&
!willReconnect) {
return this._error('changes', err, criteria, options.handler, { disconnected, willReconnect });
});
delete this._db._feeds[feedId];
}
return callback(null, cursor);
return options.handler(this._error('changes', err, criteria, { disconnected, willReconnect }));
});
return cursor;
}
_run(request, action, inputs, callback) {
_run(request, action, inputs) {
return this._db._run(request, {}, this.name, action, inputs, callback);
return this._db._run(request, {}, this.name, action, inputs);
}
_error(action, err, inputs, callback, flags) {
_error(action, err, inputs, flags) {
return internals.error(this.name, action, err, inputs, callback, flags);
return internals.error(this.name, action, err, inputs, flags);
}
static _error(table, action, err, inputs, callback, flags) {
static _error(table, action, err, inputs, flags) {
return internals.error(table, action, err, inputs, callback, flags);
return internals.error(table, action, err, inputs, flags);
}

@@ -671,3 +538,3 @@ };

internals.error = function (table, action, err, inputs, callback, flags) {
internals.error = function (table, action, err, inputs, flags) {

@@ -688,3 +555,3 @@ const message = (typeof err === 'string' ? err : err.message);

return callback(error);
return error;
};

@@ -7,3 +7,2 @@ 'use strict';

const Hoek = require('hoek');
const Items = require('items');

@@ -47,6 +46,6 @@

exports.reserve = function (table, items, updateId, callback) {
exports.reserve = async function (table, items, updateId, callback) {
if (!table._unique) {
return Hoek.nextTick(callback)();
return;
}

@@ -68,6 +67,2 @@

if (values !== undefined) {
if (values.isBoom) {
return callback(values);
}
if (values.length) {

@@ -88,83 +83,76 @@ reserve.push({ rule, values, id: typeof updateId !== 'boolean' ? updateId : item[table.primary] });

const cleanup = (!release.length ? null : (changes, finalize) => {
let cleanup = null;
if (release.length) {
cleanup = async (changes) => {
const change = changes[0]; // Always includes one change
const eachRule = (rule, next) => {
const change = changes[0]; // Always includes one change
for (let i = 0; i < release.length; ++i) {
const rule = release[i];
if (!change.old_val) {
return next();
}
if (!change.old_val) {
continue;
}
let released = internals.reach(change.old_val, rule.path);
if (!released) {
return next();
}
let released = internals.reach(change.old_val, rule.path);
if (!released) {
continue;
}
const taken = internals.reach(change.new_val, rule.path);
if (taken) {
released = released.filter((value) => taken.indexOf(value) === -1);
}
const taken = internals.reach(change.new_val, rule.path);
if (taken) {
released = released.filter((value) => taken.indexOf(value) === -1);
}
if (!released.length) {
return next();
if (!released.length) {
continue;
}
await rule.table.remove(released);
}
return rule.table.remove(released, next);
};
}
return Items.serial(release, eachRule, finalize);
});
// Reserve new values
if (!reserve.length) {
return Hoek.nextTick(callback)(null, cleanup);
return cleanup;
}
exports.verify(table, (err) => {
await exports.verify(table);
if (err) {
return callback(err);
}
for (let i = 0; i < reserve.length; ++i) {
const field = reserve[i];
const each = (field, next) => {
// Try to get existing reservations
// Try to get existing reservations
let values = field.values;
const existing = await field.rule.table.get(values);
let values = field.values;
field.rule.table.get(values, (err, existing) => {
if (err) {
return next(err);
if (existing) {
const existingIds = [];
for (let j = 0; j < existing.length; ++j) {
const item = existing[j];
if (item[field.rule.key] !== field.id) {
throw Boom.internal(`Action will violate unique restriction on ${item.id} in table ${field.rule.table.name}`);
}
if (existing) {
const existingIds = [];
for (let i = 0; i < existing.length; ++i) {
const item = existing[i];
if (item[field.rule.key] !== field.id) {
return next(Boom.internal(`Action will violate unique restriction on ${item.id} in table ${field.rule.table.name}`));
}
existingIds.push(item.id);
}
existingIds.push(item.id);
}
values = values.filter((value) => existingIds.indexOf(value) === -1);
}
values = values.filter((value) => existingIds.indexOf(value) === -1);
}
const reservations = [];
const now = Date.now();
values.forEach((value) => {
const reservations = [];
const now = Date.now();
values.forEach((value) => {
const rsv = { id: value, created: now };
rsv[field.rule.key] = field.id;
reservations.push(rsv);
});
const rsv = { id: value, created: now };
rsv[field.rule.key] = field.id;
reservations.push(rsv);
});
await field.rule.table.insert(reservations);
}
field.rule.table.insert(reservations, next);
});
};
Items.serial(reserve, each, (err) => callback(err, cleanup));
});
return cleanup;
};

@@ -239,3 +227,3 @@

return Boom.internal('Cannot add an array as single value to unique index value');
throw Boom.internal('Cannot add an array as single value to unique index value');
}

@@ -248,3 +236,3 @@

return Boom.internal('Cannot increment unique index value'); // type: increment
throw Boom.internal('Cannot increment unique index value'); // type: increment
}

@@ -256,3 +244,3 @@

exports.verify = function (table, callback) {
exports.verify = async function (table) {

@@ -262,29 +250,19 @@ if (!table._unique ||

return Hoek.nextTick(callback)();
return;
}
const each = (name, next) => {
for (let i = 0; i < table._unique.tables.length; ++i) {
const name = table._unique.tables[i];
const create = {};
create[name] = { purge: false, secondary: false };
table._db._createTable(create, (err) => {
if (err) {
err.message = `Failed creating unique table: ${name}`;
return next(err);
}
return next();
});
};
Items.serial(table._unique.tables, each, (err) => {
if (err) {
return callback(err);
try {
await table._db._createTable(create);
}
catch (err) {
err.message = `Failed creating unique table: ${name}`;
throw err;
}
}
table._unique.verified = true;
return callback();
});
table._unique.verified = true;
};
{
"name": "penseur",
"description": "Lightweight RethinkDB wrapper",
"version": "7.14.0",
"version": "8.0.0",
"author": "Eran Hammer <eran@hammer.io> (http://hueniverse.com)",

@@ -12,9 +12,8 @@ "repository": "git://github.com/hueniverse/penseur",

"engines": {
"node": ">=6.3.0"
"node": ">=8.7.0"
},
"dependencies": {
"boom": "5.x.x",
"hoek": "4.x.x",
"items": "2.x.x",
"joi": "11.x.x",
"boom": "6.x.x",
"hoek": "5.x.x",
"joi": "13.x.x",
"radix62": "1.x.x",

@@ -24,5 +23,5 @@ "rethinkdb": "2.3.x"

"devDependencies": {
"apiece": "1.x.x",
"code": "4.x.x",
"lab": "14.x.x"
"code": "5.x.x",
"lab": "15.x.x",
"teamwork": "2.x.x"
},

@@ -29,0 +28,0 @@ "scripts": {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc