Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

ejoy-oplog

Package Overview
Dependencies
Maintainers
1
Versions
3
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

ejoy-oplog - npm Package Compare versions

Comparing version 2.0.3 to 3.0.0

.eslintrc

4

index.js

@@ -1,3 +0,1 @@

exports.default = require('./lib').default
module.exports = exports.default
module.exports = require('./lib')
'use strict';
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.regex = regex;
const Emitter = require('eventemitter3');
const dbg = require('debug');
var _eventemitter = require('eventemitter3');
const events = {
i: 'insert',
u: 'update',
d: 'delete'
};
var _eventemitter2 = _interopRequireDefault(_eventemitter);
var _debug = require('debug');
var _debug2 = _interopRequireDefault(_debug);
var _ = require('./');
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
function regex(pattern) {
pattern = pattern || '*';
pattern = pattern.replace(/[*]/g, '(.*?)');
return new RegExp('^' + pattern + '$', 'i');
function regex(pattern = '*') {
pattern = pattern.replace(/\*/g, '(.*?)');
return new RegExp(`^${pattern}$`, 'i');
}
exports.default = function (ns, oplog) {
var debug = (0, _debug2.default)('mongo-oplog:filter');
var filter = new _eventemitter2.default();
var re = regex(ns);
module.exports = (ns, oplog) => {
const debug = dbg('mongo-oplog:filter');
const filter = new Emitter();
const re = regex(ns);

@@ -37,3 +28,5 @@ debug('initializing filter with re %s', ns);

filter.emit('op', doc);
filter.emit(_.events[doc.op], doc);
if (events[doc.op]) {
filter.emit(events[doc.op], doc);
}
}

@@ -49,3 +42,5 @@

return Object.assign(filter, { destroy: destroy });
};
return Object.assign(filter, { destroy });
};
module.exports.regex = regex;
'use strict';
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.events = undefined;
var _extends = Object.assign || function (target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i]; for (var key in source) { if (Object.prototype.hasOwnProperty.call(source, key)) { target[key] = source[key]; } } } return target; };
require('babel-polyfill');
function _objectWithoutProperties(obj, keys) { var target = {}; for (var i in obj) { if (keys.indexOf(i) >= 0) continue; if (!Object.prototype.hasOwnProperty.call(obj, i)) continue; target[i] = obj[i]; } return target; }
var _eventemitter = require('eventemitter3');
function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, arguments); return new Promise(function (resolve, reject) { function step(key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { return Promise.resolve(value).then(function (value) { step("next", value); }, function (err) { step("throw", err); }); } } return step("next"); }); }; }
var _eventemitter2 = _interopRequireDefault(_eventemitter);
const Emitter = require('eventemitter3');
var _mongodb = require('mongodb');
var _require = require('mongodb');
var _debug = require('debug');
const MongoClient = _require.MongoClient;
var _debug2 = _interopRequireDefault(_debug);
const createDebug = require('debug');
const createFilter = require('./filter');
const createStream = require('./stream');
var _filter = require('./filter');
const MONGO_URI = 'mongodb://127.0.0.1:27017/local';
const debug = createDebug('mongo-oplog');
var _filter2 = _interopRequireDefault(_filter);
var _stream = require('./stream');
var _stream2 = _interopRequireDefault(_stream);
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, arguments); return new Promise(function (resolve, reject) { function step(key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { return Promise.resolve(value).then(function (value) { step("next", value); }, function (err) { step("throw", err); }); } } return step("next"); }); }; }
function _objectWithoutProperties(obj, keys) { var target = {}; for (var i in obj) { if (keys.indexOf(i) >= 0) continue; if (!Object.prototype.hasOwnProperty.call(obj, i)) continue; target[i] = obj[i]; } return target; }
var MONGO_URI = 'mongodb://127.0.0.1:27017/local';
var debug = (0, _debug2.default)('mongo-oplog');
var events = exports.events = {
const events = {
i: 'insert',
u: 'update',
d: 'delete'
};
// Add callback support to promise
var toCb = function toCb(fn) {
return function (cb) {
try {
var val = fn(cb);
if (!cb) return val;else if (val && typeof val.then === 'function') {
return val.then(function (val) {
return cb(null, val);
}).catch(cb);
}
cb(null, val);
} catch (err) {
cb(err);
// Add callback support to promise
};const toCb = fn => cb => {
try {
const value = fn(cb);
if (!cb) return value;
if (value && typeof value.then === 'function') {
return value.then(value_ => cb(null, value_)).catch(cb);
}
};
cb(null, value);
} catch (error) {
cb(error);
}
};
exports.default = function (uri) {
var connect = function () {
var _ref = _asyncToGenerator(regeneratorRuntime.mark(function _callee() {
return regeneratorRuntime.wrap(function _callee$(_context) {
while (1) {
switch (_context.prev = _context.next) {
case 0:
if (!connected) {
_context.next = 2;
break;
}
module.exports = (() => {
var _ref = _asyncToGenerator(function* (uri, options = {}, onReconnect) {
let connect = (() => {
var _ref2 = _asyncToGenerator(function* () {
if (connected) return db;
client = yield new MongoClient(uri, _extends({}, options_, {
loggerLevel: 'info'
})).connect();
return _context.abrupt('return', db);
patchEmitter(client.topology);
case 2:
_context.next = 4;
return _mongodb.MongoClient.connect(uri, opts);
client.topology.on('connectionClosed', function () {
client.close().then(function () {
connected = false;
setTimeout(function () {
connect().then(function () {
return tail();
}).then(function () {
onReconnect(Object.assign(oplog, {
db,
filter,
tail: toCb(tail),
stop: toCb(stop),
destroy: toCb(destroy)
}));
}).catch(console.error);
}, 5000);
});
});
case 4:
db = _context.sent;
db = client.db();
connected = true;
});
connected = true;
return function connect() {
return _ref2.apply(this, arguments);
};
})();
case 6:
case 'end':
return _context.stop();
}
let tail = (() => {
var _ref3 = _asyncToGenerator(function* () {
try {
debug('Connected to oplog database');
stream = yield createStream({ ns, coll, ts, db });
stream.on('end', onend);
stream.on('data', ondata);
stream.on('error', onerror);
return stream;
} catch (error) {
onerror(error);
}
}, _callee, this);
}));
});
return function connect() {
return _ref.apply(this, arguments);
};
}();
return function tail() {
return _ref3.apply(this, arguments);
};
})();
var tail = function () {
var _ref2 = _asyncToGenerator(regeneratorRuntime.mark(function _callee2() {
return regeneratorRuntime.wrap(function _callee2$(_context2) {
while (1) {
switch (_context2.prev = _context2.next) {
case 0:
_context2.prev = 0;
let stop = (() => {
var _ref4 = _asyncToGenerator(function* () {
if (stream) stream.destroy();
debug('streaming stopped');
return oplog;
});
debug('Connected to oplog database');
_context2.next = 4;
return connect();
return function stop() {
return _ref4.apply(this, arguments);
};
})();
case 4:
_context2.next = 6;
return (0, _stream2.default)({ ns: ns, coll: coll, ts: ts, db: db });
let destroy = (() => {
var _ref5 = _asyncToGenerator(function* () {
yield stop();
if (!connected) return oplog;
yield client.close(true);
connected = false;
return oplog;
});
case 6:
stream = _context2.sent;
return function destroy() {
return _ref5.apply(this, arguments);
};
})();
debug('Connected to oplog with NS : %s', ns);
stream.on('end', onend);
stream.on('data', ondata);
stream.on('error', onerror);
return _context2.abrupt('return', stream);
let db;
let client;
let stream;
let connected = false;
case 14:
_context2.prev = 14;
_context2.t0 = _context2['catch'](0);
const ns = options.ns,
since = options.since,
coll = options.coll,
options_ = _objectWithoutProperties(options, ['ns', 'since', 'coll']);
onerror(_context2.t0);
const oplog = new Emitter();
case 17:
case 'end':
return _context2.stop();
}
}
}, _callee2, this, [[0, 14]]);
}));
let ts = since || 0;
uri = uri || MONGO_URI;
return function tail() {
return _ref2.apply(this, arguments);
};
}();
if (typeof uri !== 'string') {
if (uri) {
db = uri.db();
client = uri;
connected = true;
} else {
throw new Error('Invalid mongo db.');
}
} else {
yield connect();
}
var stop = function () {
var _ref3 = _asyncToGenerator(regeneratorRuntime.mark(function _callee3() {
return regeneratorRuntime.wrap(function _callee3$(_context3) {
while (1) {
switch (_context3.prev = _context3.next) {
case 0:
if (stream) stream.destroy();
debug('streaming stopped');
return _context3.abrupt('return', oplog);
function filter(ns) {
return createFilter(ns, oplog);
}
case 3:
case 'end':
return _context3.stop();
}
}
}, _callee3, this);
}));
function ondata(doc) {
if (oplog.ignore) return oplog;
debug('incoming data %j', doc);
ts = doc.ts;
oplog.emit('op', doc);
oplog.emit(events[doc.op], doc);
return oplog;
}
return function stop() {
return _ref3.apply(this, arguments);
};
}();
function onend() {
debug('stream ended');
oplog.emit('end');
return oplog;
}
var destroy = function () {
var _ref4 = _asyncToGenerator(regeneratorRuntime.mark(function _callee4() {
return regeneratorRuntime.wrap(function _callee4$(_context4) {
while (1) {
switch (_context4.prev = _context4.next) {
case 0:
_context4.next = 2;
return stop();
function onerror(error) {
if (/cursor (killed or )?timed out/.test(error.message)) {
debug('cursor timeout - re-tailing %j', error);
tail();
} else {
debug('oplog error %j', error);
oplog.emit('error', error);
}
}
case 2:
_context4.next = 4;
return db.close(true);
function patchEmitter(emitter) {
var oldEmit = emitter.emit;
case 4:
connected = false;
return _context4.abrupt('return', oplog);
emitter.emit = function () {
var emitArgs = arguments;
case 6:
case 'end':
return _context4.stop();
}
if (!/connectionClosed/.test(emitArgs[0])) {
oldEmit.apply(emitter, arguments);
}
}, _callee4, this);
}));
return function destroy() {
return _ref4.apply(this, arguments);
};
}();
var options = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : {};
var db = void 0;
var stream = void 0;
var connected = false;
var ns = options.ns,
since = options.since,
coll = options.coll,
opts = _objectWithoutProperties(options, ['ns', 'since', 'coll']);
var oplog = new _eventemitter2.default();
var ts = since || 0;
uri = uri || MONGO_URI;
if (typeof uri !== 'string') {
if (uri && uri.collection) {
db = uri;
connected = true;
} else {
throw new Error('Invalid mongo db.');
};
}
}
function filter(ns) {
return (0, _filter2.default)(ns, oplog);
}
return Object.assign(oplog, {
db,
filter,
tail: toCb(tail),
stop: toCb(stop),
destroy: toCb(destroy)
});
});
function ondata(doc) {
if (oplog.ignore) return oplog;
debug('incoming data %j', doc);
ts = doc.ts;
oplog.emit('op', doc);
oplog.emit(events[doc.op], doc);
return oplog;
}
return function (_x) {
return _ref.apply(this, arguments);
};
})();
function onend() {
debug('stream ended');
oplog.emit('end');
return oplog;
}
function onerror(err) {
if (/cursor (killed or )?timed out/.test(err.message)) {
debug('cursor timeout - re-tailing %j', err);
tail();
} else {
debug('oplog error %j', err);
oplog.emit('error', err);
throw err;
}
}
return Object.assign(oplog, {
db: db,
filter: filter,
tail: toCb(tail),
stop: toCb(stop),
destroy: toCb(destroy)
});
};
module.exports.events = events;
module.exports.default = module.exports;
'use strict';
Object.defineProperty(exports, "__esModule", {
value: true
});
function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, arguments); return new Promise(function (resolve, reject) { function step(key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { return Promise.resolve(value).then(function (value) { step("next", value); }, function (err) { step("throw", err); }); } } return step("next"); }); }; }
var _mongodb = require('mongodb');
var _require = require('mongodb');
var _filter = require('./filter');
const Timestamp = _require.Timestamp;
function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, arguments); return new Promise(function (resolve, reject) { function step(key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { return Promise.resolve(value).then(function (value) { step("next", value); }, function (err) { step("throw", err); }); } } return step("next"); }); }; }
var _require2 = require('./filter');
exports.default = function () {
var _ref = _asyncToGenerator(regeneratorRuntime.mark(function _callee2(_ref2) {
var time = function () {
var _ref3 = _asyncToGenerator(regeneratorRuntime.mark(function _callee() {
var doc;
return regeneratorRuntime.wrap(function _callee$(_context) {
while (1) {
switch (_context.prev = _context.next) {
case 0:
if (!ts) {
_context.next = 2;
break;
}
const regex = _require2.regex;
return _context.abrupt('return', typeof ts !== 'number' ? ts : (0, _mongodb.Timestamp)(0, ts));
case 2:
_context.next = 4;
return coll.find({}, { ts: 1 }).sort({ $natural: -1 }).limit(1).nextObject();
module.exports = (() => {
var _ref = _asyncToGenerator(function* ({ db, ns, ts, coll }) {
let time = (() => {
var _ref2 = _asyncToGenerator(function* () {
if (ts) return typeof ts !== 'number' ? ts : Timestamp(0, ts);
case 4:
doc = _context.sent;
return _context.abrupt('return', doc ? doc.ts : (0, _mongodb.Timestamp)(0, Date.now() / 1000 | 0));
const doc = yield coll.find({}, { ts: 1 }).sort({ $natural: -1 }).limit(1).next();
case 6:
case 'end':
return _context.stop();
}
}
}, _callee, this);
}));
return doc ? doc.ts : Timestamp(0, Date.now() / 1000 | Math.trunc(0));
});
return function time() {
return _ref3.apply(this, arguments);
return _ref2.apply(this, arguments);
};
}();
})();
var db = _ref2.db,
ns = _ref2.ns,
ts = _ref2.ts,
coll = _ref2.coll;
var query;
return regeneratorRuntime.wrap(function _callee2$(_context2) {
while (1) {
switch (_context2.prev = _context2.next) {
case 0:
if (db) {
_context2.next = 2;
break;
}
if (!db) throw new Error('Mongo db is missing.');
throw new Error('Mongo db is missing.');
const query = {};
case 2:
query = {};
coll = db.collection(coll || 'oplog.rs');
if (ns) query.ns = { $regex: regex(ns) };
query.ts = { $gt: yield time() };
coll = db.collection(coll || 'oplog.rs');
return (yield coll.find(query, {
tailable: true,
awaitData: true,
oplogReplay: true,
noCursorTimeout: true,
numberOfRetries: Number.MAX_VALUE
})).stream();
});
if (ns) query.ns = { $regex: (0, _filter.regex)(ns) };
_context2.next = 7;
return time();
case 7:
_context2.t0 = _context2.sent;
query.ts = {
$gt: _context2.t0
};
_context2.next = 11;
return coll.find(query, {
tailable: true,
awaitData: true,
oplogReplay: true,
noCursorTimeout: true,
numberOfRetries: Number.MAX_VALUE
});
case 11:
return _context2.abrupt('return', _context2.sent.stream());
case 12:
case 'end':
return _context2.stop();
}
}
}, _callee2, undefined);
}));
return function (_x) {
return _ref.apply(this, arguments);
};
}();
})();
{
"name": "ejoy-oplog",
"version": "2.0.3",
"description": "Watch mongodb oplog in a very simple way",
"author": "Jonathan Brumley <cayasso@gmail.com>",
"homepage": "https://github.com/cayasso/mongo-oplog",
"version": "3.0.0",
"description": "Watch mongodb oplog in a simple way",
"author": "Toan Tran <toan@ejoylearning.com>",
"main": "./index.js",
"types": "./src/index.d.ts",
"types": "./lib/index.d.ts",
"scripts": {
"test": "xo && mocha",
"build": "npm run clean && ./node_modules/.bin/babel src -d lib",
"prepublish": "npm run build",
"build": "npm run clean && ./node_modules/.bin/babel src -d lib && cp src/index.d.ts lib/",
"prepare": "npm run build",
"clean": "rm -rf lib/"
},
"repository": {
"type": "git",
"url": "git://github.com/cayasso/mongo-oplog.git"
},
"keywords": [

@@ -30,31 +25,24 @@ "data",

"dependencies": {
"debug": "2.3.x",
"eventemitter3": "^2.0.2",
"mongodb": "~2.2.x",
"babel-polyfill": "^6.20.0"
"debug": "^3.1.0",
"eventemitter3": "^2.0.3",
"mongodb": "3.6.7"
},
"devDependencies": {
"babel-cli": "^6.18.0",
"babel-core": "^6.20.0",
"babel-loader": "^6.2.9",
"babel-plugin-syntax-async-functions": "^6.13.0",
"babel-plugin-transform-async-to-generator": "^6.16.0",
"babel-plugin-transform-es2015-destructuring": "^6.19.0",
"babel-plugin-transform-object-rest-spread": "^6.20.2",
"babel-plugin-transform-regenerator": "^6.20.0",
"babel-plugin-transform-runtime": "^6.15.0",
"babel-preset-es2015": "^6.18.0",
"babel-preset-stage-0": "^6.16.0",
"mocha": "*",
"pre-commit": "1.2.0",
"should": "^11.1.1",
"xo": "^0.17.1"
"@typescript-eslint/eslint-plugin": "5.29.0",
"@typescript-eslint/parser": "5.29.0",
"babel-cli": "^6.26.0",
"babel-plugin-transform-object-rest-spread": "^6.26.0",
"babel-preset-env": "^1.6.1",
"eslint": "8.18.0",
"eslint-plugin-angular": "4.1.0",
"mocha": "^9.1.1",
"pre-commit": "1.2.2",
"should": "^13.1.3",
"xo": "^0.44.0"
},
"pre-commit": [
"test"
],
"pre-commit": [],
"xo": {
"semicolon": false,
"esnext": true,
"space": 2,
"extends": [
"prettier"
],
"ignores": [

@@ -64,12 +52,14 @@ "test/**"

"rules": {
"curly": 0,
"new-cap": 0,
"no-unused-vars": 0,
"unicorn/no-array-method-this-argument": "off",
"import/extensions": [
"error",
"never"
],
"unicorn/prefer-module": "off",
"object-curly-spacing": 0,
"no-unused-expressions": 0,
"promise/param-names": 0,
"import/no-unassigned-import": 0,
"no-negated-condition": 0
"no-negated-condition": 0,
"new-cap": 0
}
}
}

@@ -1,41 +0,41 @@

# mongo-oplog
[![Build Status](https://img.shields.io/travis/cayasso/mongo-oplog/master.svg)](https://travis-ci.org/cayasso/mongo-oplog)
[![NPM version](https://img.shields.io/npm/v/mongo-oplog.svg)](https://www.npmjs.com/package/mongo-oplog)
[![XO code style](https://img.shields.io/badge/code_style-XO-5ed9c7.svg)](https://github.com/sindresorhus/xo)
Listening to MongoDB live changes using oplog.
## Installation
## Features
``` bash
$ npm install ejoy-oplog
```
* Support start and stop tailing the MongoDB `oplog` at any time.
* Support filtering `oplog` events by `namespaces` (database and collections).
* Built on top of the native NodeJS [MongoDB driver](https://github.com/mongodb/node-mongodb-native/).
* First class `Promise` support which enable the use of `async` and `await`.
* The package has a very small footprint and requires just a few dependencies including `mongodb`, `debug` and `eventemitter3`.
* Uses `eventemitter3` for high performance event emitting.
* Strict and readable code enforced with [xo](https://github.com/sindresorhus/xo)
* Unit tested with `mocha` and built with `babel` for backward compatibility with older versions of NodeJS like `v4.x` and `v5.x`.
## Configure MongoDB with replica set
## IMPORTANT! Major update version 2.0.x
You need to configure your MongoDB instance (local instance) to have access to the [oplog](https://docs.mongodb.com/manual/core/replica-set-oplog/), here are some quick steps on how to do so:
2.0.x is a major rewrite taking advantage of `es6` and adding support for `promises` and `async/await`. Callbacks are still supported for backward compatibility.
This version has minimum **API** changes, but these changes might affect your code, so please take a look at the upgrading guide before installing.
1. Shutdown your existing mongo instance if its running.
[Check the upgrading guide here](https://github.com/cayasso/mongo-oplog/blob/master/UPGRADE.md)
2. Restart the instance. Use the `--replSet` option to specify the name of the replica set.
[Go here for the old 1.x readme](https://github.com/cayasso/mongo-oplog/tree/1.x)
``` bash
$ sudo mongod --replSet rs0
```
[Go here for the old 0.x readme](https://github.com/cayasso/mongo-oplog/tree/0.x)
3. Connect to the mongo instance by executing `mongo` in your terminal:
## Installation
```bash
$ mongo
```
``` bash
$ npm install mongo-oplog
4. In the mongo shell run `rs.initiate()` to initiate the new replica set:
```bash
> rs.initiate()
```
Once it is initiated then you are ready to start using `ejoy-oplog`.
And [here is the official MongoDB documentation](https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/) if you need additional help on MongoDB replica set.
## Usage
``` javascript
import MongoOplog from 'mongo-oplog'
import MongoOplog from 'ejoy-oplog'
const oplog = MongoOplog('mongodb://127.0.0.1:27017/local', { ns: 'test.posts' })

@@ -193,20 +193,4 @@

Configure MongoDB for active oplog:
Configure MongoDB for active oplog, once this is done then you can run the test:
Start MongoDB with:
```bash
$ mongod --replSet test
```
Start a `mongo` shell and configure mongo as follows:
```bash
$ mongo
> var config = {_id: "test", members: [{_id: 0, host: "127.0.0.1:27017"}]}
> rs.initiate(config)
```
Once configuration is initiated then you can run the test:
``` bash

@@ -217,25 +201,1 @@ $ npm install

## License
(The MIT License)
Copyright (c) 2015 Jonathan Brumley &lt;cayasso@gmail.com&gt;
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
'Software'), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

@@ -1,394 +0,492 @@

'use strict';
'use strict'
/**
* Module dependencies.
*/
const should = require('should')
const { MongoClient } = require('mongodb')
const MongoOplog = require('../src/index')
var should = require('should');
var MongoClient = require('mongodb').MongoClient;
var MongoOplog = require('../src/index').default;
var oplog, db, opdb;
var conn = {
mongo: 'mongodb://127.0.0.1:27017/optest',
oplog: 'mongodb://127.0.0.1:27017/local',
error: 'mongodb://127.0.0.1:8888/error'
};
const conn = {
mongo: 'mongodb://127.0.0.1:27017/optest',
oplog: 'mongodb://127.0.0.1:27017/local',
error: 'mongodb://127.0.0.1:8888/error'
}
let db
let client
let opdb
let oplog
describe('mongo-oplog', function () {
before(function (done) {
MongoClient.connect(conn.mongo, function (err, database) {
if (err) return done(err);
db = database;
done();
});
});
if (err) return done(err)
client = database;
db = database.db()
done()
})
})
it('should be a function', function () {
MongoOplog.should.be.a.Function;
});
should(MongoOplog).be.a.Function
})
it('should have required methods', function (done) {
var oplog = MongoOplog(opdb);
oplog.tail.should.be.a.Function;
oplog.stop.should.be.a.Function;
oplog.filter.should.be.a.Function;
oplog.destroy.should.be.a.Function;
done();
});
it('should have required methods',() => new Promise(function(resolve, _reject) {
MongoOplog(opdb).then((oplog)=>{
should(oplog.tail).be.a.Function
should(oplog.stop).be.a.Function
should(oplog.filter).be.a.Function
should(oplog.destroy).be.a.Function
resolve()
});
}))
it('should accept mongodb object as connection', function () {
MongoClient.connect(conn.oplog, function (err, db) {
if (err) return done(err);
var oplog = MongoOplog(db)
oplog.db.should.eql(db);
});
});
it('should emit `op` event', function (done) {
var coll = db.collection('a');
var oplog = MongoOplog(conn.oplog, { ns: 'optest.a' });
oplog.on('op', function (doc) {
doc.op.should.be.eql('i');
doc.o.n.should.be.eql('JB');
doc.o.c.should.be.eql(1);
done();
});
oplog.tail(function (err) {
if (err) return done(err);
coll.insert({ n: 'JB', c: 1 }, function (err) {
if (err) return done(err);
it('should accept mongodb object as connection', () => {
return new Promise(function(resolve, reject) {
MongoClient.connect(conn.oplog, function (err, client) {
const db = client.db()
if (err) return done(err)
MongoOplog(client)
.then((oplog) => {
should(oplog.db).eql(db)
db.dropDatabase(function () {
client.close(resolve)
})
});
})
});
});
});
it('should emit `insert` event', function (done) {
var coll = db.collection('b');
var oplog = MongoOplog(conn.oplog, { ns: 'optest.b' });
oplog.on('insert', function (doc) {
doc.op.should.be.eql('i');
doc.o.n.should.be.eql('JBL');
doc.o.c.should.be.eql(1);
done();
});
oplog.tail(function (err) {
if (err) return done(err);
coll.insert({ n: 'JBL', c: 1 }, function (err) {
if (err) return done(err);
});
});
});
it('should emit `op` event', () => {
return new Promise(function(resolve, reject) {
const coll = db.collection('a')
MongoOplog(conn.oplog, { ns: 'optest.a' })
.then((oplog) => {
oplog.on('op', function (doc) {
should(doc.op).be.eql('i')
should(doc.o.n).be.eql('JB')
should(doc.o.c).be.eql(1)
resolve();
});
it('should emit `update` event', function (done) {
var coll = db.collection('c');
var oplog = MongoOplog(conn.oplog, { ns: 'optest.c' });
oplog.on('update', function (doc) {
doc.op.should.be.eql('u');
doc.o.$set.n.should.be.eql('US');
doc.o.$set.c.should.be.eql(7);
done();
});
oplog.tail(function (err) {
if (err) return done(err);
coll.insert({ n: 'CR', c: 3 }, function (err) {
if (err) return done(err);
coll.update({ n: 'CR', c: 3 }, { $set: { n: 'US', c: 7 } }, function (err) {
if (err) return done(err);
oplog.tail(function (err) {
if (err) return resolve(err)
coll.insertOne({ n: 'JB', c: 1 }, function (err) {
if (err) return resolve(err)
})
});
});
});
});
});
});
it('should emit `delete` event', function (done) {
this.timeout(0);
var coll = db.collection('d');
var oplog = MongoOplog(conn.oplog, { ns: 'optest.d' });
oplog.tail(function (err) {
if (err) return done(err);
coll.insert({ n: 'PM', c: 4 }, function (err, doc) {
if (err) return done(err);
var id = (doc.ops || doc)[0]._id;
oplog.on('delete', function (doc) {
doc.op.should.be.eql('d');
doc.o._id.should.be.eql(id);
done();
});
coll.remove({ n: 'PM', c: 4 }, function (err) {
if (err) return done(err);
});
it('should emit `insert` event', () => {
return new Promise(function(resolve, reject) {
const coll = db.collection('b')
MongoOplog(conn.oplog, { ns: 'optest.b' })
.then((oplog)=>{
oplog.on('insert', function (doc) {
should(doc.op).be.eql('i')
should(doc.o.n).be.eql('JBL')
should(doc.o.c).be.eql(1)
resolve()
})
oplog.tail(function (err) {
if (err) return resolve(err)
coll.insert({ n: 'JBL', c: 1 }, function (err) {
if (err) return resolve(err)
})
})
});
});
});
});
})
it('should emit cursor `end` event', function (done) {
var oplog = MongoOplog(conn.oplog);
oplog.tail(function (err, stream) {
if (err) return done(err);
oplog.once('end', done);
stream.emit('end');
it('should emit `update` event', () => {
return new Promise(function(resolve, reject) {
const coll = db.collection('c')
MongoOplog(conn.oplog, { ns: 'optest.c' })
.then((oplog)=>{
oplog.on('update', function (doc) {
should(doc.op).be.eql('u')
should(doc.o.$set.n).be.eql('US')
should(doc.o.$set.c).be.eql(7)
resolve()
})
oplog.tail(function (err) {
if (err) return resolve(err)
coll.insert({ n: 'CR', c: 3 }, function (err, doc) {
if (err) return resolve(err)
coll.update({_id: {$exists: true}, n: 'CR', c: 3 }, { $set: { n: 'US', c: 7 } }, function (err) {
if (err) return resolve(err)
})
})
})
});
});
});
})
it('should emit `error` event', function (done) {
var oplog = MongoOplog(conn.error)
oplog.tail()
oplog.on('error', function (err) {
err.should.be.an.Error;
done();
});
});
it('should emit `delete` event', () => {
this.timeout(0)
return new Promise(function(resolve, reject) {
const coll = db.collection('d')
MongoOplog(conn.oplog, { ns: 'optest.d' })
.then((oplog) => {
oplog.tail(function (err) {
if (err) return resolve(err)
coll.insert({ n: 'PM', c: 4 }, function (err, doc) {
if (err) return resolve(err)
var id = (doc.ops || doc)[0]._id
oplog.on('delete', function (doc) {
should(doc.op).be.eql('d')
should(doc.o._id).be.eql(id)
resolve()
})
coll.remove({_id: {$exists: true}, n: 'PM', c: 4 }, function (err) {
if (err) return resolve(err)
})
})
})
});
});
})
it('should filter by collection', function (done) {
var e1 = db.collection('e1');
var e2 = db.collection('e2');
var oplog = MongoOplog(conn.oplog);
it('should emit cursor `end` event', () => {
return new Promise(function(resolve, reject) {
MongoOplog(conn.oplog)
.then((oplog) => {
oplog.tail(function (err, stream) {
if (err) return resolve(err)
oplog.once('end', () => {
resolve()
})
stream.emit('end')
var filter = oplog.filter('*.e1');
filter.on('op', function(doc) {
doc.o.n.should.be.eql('L1');
done();
})
})
});
})
oplog.tail(function (err) {
if (err) return done(err);
e1.insert({ n: 'L1' }, function (err) {
if (err) return done(err);
});
e2.insert({ n: 'L1' }, function (err) {
if (err) return done(err);
});
it('should emit `error` event', () => {
return new Promise(function(resolve, reject) {
MongoOplog(conn.error)
.then((oplog) => {
oplog.tail()
oplog.on('error', function (err) {
should(err).be.an.Error
resolve()
})
});
});
});
})
it('should filter by the exact namespace', function(done){
var cs = db.collection('cs');
var css = db.collection('css');
var oplog = MongoOplog(conn.oplog);
it('should filter by collection', () => {
return new Promise(function(resolve, reject) {
const e1 = db.collection('e1')
const e2 = db.collection('e2')
MongoOplog(conn.oplog)
.then((oplog) => {
const filter = oplog.filter('*.e1')
var filter = oplog.filter('optest.cs');
filter.on('op', function(doc) {
should(doc.o.n).be.eql('L1')
resolve()
})
filter.on('op', function(doc) {
if ('L1' !== doc.o.n) done('should not throw');
else done();
oplog.tail(function (err) {
if (err) return resolve(err)
e1.insert({ n: 'L1' }, function (err) {
if (err) return resolve(err)
})
e2.insert({ n: 'L1' }, function (err) {
if (err) return resolve(err)
})
})
})
});
})
oplog.tail(function (err) {
if (err) return done(err);
css.insert({ n: 'L2' }, function(err) {
if (err) return done(err);
cs.insert({ n: 'L1' }, function(err) {
if (err) return done(err);
});
});
it('should filter by the exact namespace', () => {
return new Promise(function(resolve, reject) {
const cs = db.collection('cs')
const css = db.collection('css')
MongoOplog(conn.oplog)
.then((oplog) => {
const filter = oplog.filter('optest.cs')
filter.on('op', function(doc) {
if ('L1' !== doc.o.n) resolve('should not throw')
else resolve()
})
oplog.tail(function (err) {
if (err) return resolve(err)
css.insert({ n: 'L2' }, function(err) {
if (err) return resolve(err)
cs.insert({ n: 'L1' }, function(err) {
if (err) return resolve(err)
})
})
})
});
});
});
})
it('should filter by namespace in constructor', function (done) {
var f1 = db.collection('f1');
var f2 = db.collection('f2');
var oplog = MongoOplog(conn.oplog, { ns: '*.f1' });
oplog.on('op', function (doc) {
doc.o.n.should.be.eql('L2');
done();
it('should filter by namespace in constructor', () => {
return new Promise(function(resolve, reject) {
const f1 = db.collection('f1')
const f2 = db.collection('f2')
MongoOplog(conn.oplog, { ns: '*.f1' })
.then((oplog) => {
oplog.on('op', function (doc) {
should(doc.o.n).be.eql('L2')
resolve()
})
oplog.tail(function (err) {
if (err) return resolve(err)
f1.insert({ n: 'L2' }, function (err) {
if (err) return resolve(err)
})
f2.insert({ n: 'L2' }, function (err) {
if (err) return resolve(err)
})
})
})
});
oplog.tail(function (err) {
if (err) return done(err);
f1.insert({ n: 'L2' }, function (err) {
if (err) return done(err);
});
f2.insert({ n: 'L2' }, function (err) {
if (err) return done(err);
});
});
});
})
it('should destroy filter', function (done) {
var coll = db.collection('g');
var oplog = MongoOplog(conn.oplog);
var filter = oplog.filter('*.g');
filter.on('op', function(doc) {
filter.destroy();
done();
it('should destroy filter', () => {
return new Promise(function(resolve, reject) {
const coll = db.collection('g')
MongoOplog(conn.oplog)
.then((oplog) => {
const filter = oplog.filter('*.g')
filter.on('op', function(doc) {
filter.destroy()
resolve()
})
oplog.tail(function (err) {
if (err) return resolve(err)
coll.insert({ n: 'CR' }, function (err) {
if (err) return resolve(err)
})
coll.insert({ n: 'CR' }, function (err) {
if (err) return resolve(err)
})
})
})
});
oplog.tail(function (err) {
if (err) return done(err);
coll.insert({ n: 'CR' }, function (err) {
if (err) return done(err);
});
coll.insert({ n: 'CR' }, function (err) {
if (err) return done(err);
});
});
});
})
it('should stop tailing', function (done) {
var coll = db.collection('h');
var oplog = MongoOplog(conn.oplog, { ns: '*.h' });
oplog.on('op', function (doc) {
oplog.stop();
done();
it('should stop tailing', () => {
return new Promise(function(resolve, reject) {
const coll = db.collection('h')
MongoOplog(conn.oplog, { ns: '*.h' })
.then((oplog) => {
oplog.on('op', function (doc) {
oplog.stop()
resolve()
})
oplog.tail(function (err){
if (err) return resolve(err)
coll.insert({ n: 'CR' }, function (err) {
if (err) return resolve(err)
})
coll.insert({ n: 'CR' }, function (err) {
if (err) return resolve(err)
})
})
})
});
oplog.tail(function (err){
if (err) return done(err);
coll.insert({ n: 'CR' }, function (err) {
if (err) return done(err);
});
coll.insert({ n: 'CR' }, function (err) {
if (err) return done(err);
});
});
});
})
it('should destroy oplog', function (done) {
var coll = db.collection('i');
var oplog = MongoOplog(conn.oplog);
oplog.on('op', function (doc) {
oplog.destroy(done);
it('should destroy oplog', () => {
return new Promise(function(resolve, reject) {
const coll = db.collection('i')
MongoOplog(conn.oplog)
.then((oplog) => {
oplog.on('op', function (doc) {
oplog.destroy(resolve)
})
oplog.tail(function (err){
if (err) return resolve(err)
coll.insert({ n: 'CR' }, function (err) {
if (err) return resolve(err)
})
coll.insert({ n: 'CR' }, function (err) {
if (err) return resolve(err)
})
})
})
});
oplog.tail(function (err){
if (err) return done(err);
coll.insert({ n: 'CR' }, function (err) {
if (err) return done(err);
});
coll.insert({ n: 'CR' }, function (err) {
if (err) return done(err);
});
});
});
})
it('should ignore oplog op events', function (done) {
var coll = db.collection('j');
var oplog = MongoOplog(conn.oplog, { ns: '*.j' });
oplog.on('op', function (doc) {
oplog.ignore = true;
done();
it('should ignore oplog op events', () => {
return new Promise(function(resolve, reject) {
const coll = db.collection('j')
MongoOplog(conn.oplog, { ns: '*.j' })
.then((oplog) => {
oplog.on('op', function (doc) {
oplog.ignore = true
resolve()
})
oplog.tail(function (err){
if (err) return resolve(err)
coll.insert({ n: 'CR' }, function (err) {
if (err) return resolve(err)
})
coll.insert({ n: 'CR' }, function (err) {
if (err) return resolve(err)
})
})
});
});
oplog.tail(function (err){
if (err) return done(err);
coll.insert({ n: 'CR' }, function (err) {
if (err) return done(err);
});
coll.insert({ n: 'CR' }, function (err) {
if (err) return done(err);
});
});
});
})
it('should ignore filter op events', function (done) {
var coll = db.collection('k');
var oplog = MongoOplog(conn.oplog);
var filter = oplog.filter('*.k');
it('should ignore filter op events', () => {
return new Promise(function(resolve, reject) {
const coll = db.collection('k')
MongoOplog(conn.oplog)
.then((oplog) => {
const filter = oplog.filter('*.k')
filter.on('op', function(doc) {
filter.ignore = true;
done();
});
filter.on('op', function(doc) {
filter.ignore = true
resolve()
})
oplog.tail(function (err) {
if (err) return done(err);
coll.insert({ n: 'CR' }, function (err) {
if (err) return done(err);
});
coll.insert({ n: 'CR' }, function (err) {
if (err) return done(err);
});
oplog.tail(function (err) {
if (err) return resolve(err)
coll.insert({ n: 'CR' }, function (err) {
if (err) return resolve(err)
})
coll.insert({ n: 'CR' }, function (err) {
if (err) return resolve(err)
})
})
})
});
});
})
it('should stop tailing', function (done) {
var coll = db.collection('h');
var oplog = MongoOplog(conn.oplog, { ns: '*.h' });
oplog.on('op', function (doc) {
oplog.stop();
done();
it('should stop tailing', () => {
return new Promise(function(resolve, reject) {
const coll = db.collection('h')
MongoOplog(conn.oplog, { ns: '*.h' })
.then((oplog) => {
oplog.on('op', function (doc) {
oplog.stop()
resolve()
})
oplog.tail(function (err){
if (err) return resolve(err)
coll.insert({ n: 'CR' }, function (err) {
if (err) return resolve(err)
})
coll.insert({ n: 'CR' }, function (err) {
if (err) return resolve(err)
})
})
});
});
oplog.tail(function (err){
if (err) return done(err);
coll.insert({ n: 'CR' }, function (err) {
if (err) return done(err);
});
coll.insert({ n: 'CR' }, function (err) {
if (err) return done(err);
});
});
});
})
it('should start from last ts when re-tailing', function (done) {
it('should start from last ts when re-tailing', () => {
this.timeout(0)
var c = 0;
var v = {}
var coll = db.collection('i');
var oplog = MongoOplog(conn.oplog, { ns: 'optest.i' });
oplog.on('op', function (doc) {
v[doc.o.c] = 1;
Object.keys(v).length.should.be.equal(++c);
if (6 === c) done();
else if (c > 6) done('Not valid')
});
return new Promise(function(resolve, reject) {
let c = 0
const v = {}
const coll = db.collection('i')
MongoOplog(conn.oplog, { ns: 'optest.i' })
.then((oplog) => {
oplog.on('op', function (doc) {
v[doc.o.c] = 1
should(Object.keys(v).length).be.equal(++c)
if (6 === c) resolve()
else if (c > 6) resolve('Not valid')
})
oplog.tail(function() {
coll.insert({ c: 1 });
coll.insert({ c: 2 });
coll.insert({ c: 3 });
setTimeout(function () {
oplog.stop(function() {
coll.insert({ c: 4 });
coll.insert({ c: 5 });
coll.insert({ c: 6 });
oplog.tail(function() {
setTimeout(function () {
oplog.stop(function() {
oplog.tail();
});
}, 500);
});
});
}, 500);
oplog.tail(function() {
coll.insert({ c: 1 })
coll.insert({ c: 2 })
coll.insert({ c: 3 })
setTimeout(function () {
oplog.stop(function() {
coll.insert({ c: 4 })
coll.insert({ c: 5 })
coll.insert({ c: 6 })
oplog.tail(function() {
setTimeout(function () {
oplog.stop(function() {
oplog.tail()
})
}, 500)
})
})
}, 500)
})
})
});
});
})
it('should start re-tailing on timeout', function (done) {
it('should start re-tailing on timeout', () => {
this.timeout(0)
var c = 0;
var v = {};
var coll = db.collection('n');
var oplog = MongoOplog(conn.oplog, { ns: 'optest.n' });
var values = {}
var valueSize = 0
oplog.on('op', function (doc) {
v[doc.o.c] = 1;
Object.keys(v).length.should.be.equal(++c);
if (6 === c) done();
else if (c > 6) done('Not valid')
});
oplog.tail(function(err, stream) {
coll.insert({ c: 1 });
coll.insert({ c: 2 });
coll.insert({ c: 3 });
return new Promise(function(resolve, reject) {
let c = 0
const v = {}
const coll = db.collection('n')
MongoOplog(conn.oplog, { ns: 'optest.n' })
.then((oplog)=>{
const values = {}
const valueSize = 0
oplog.on('op', function (doc) {
v[doc.o.c] = 1
should(Object.keys(v).length).be.equal(++c)
if (6 === c) {
setTimeout(function () {
oplog.destroy(resolve)
}, 100)
} else if (c > 6) resolve('Not valid')
})
oplog.tail(function(err, stream) {
coll.insert({ c: 1 })
coll.insert({ c: 2 })
coll.insert({ c: 3 })
// Mimic a timeout error
setTimeout(function() {
stream.emit('error', {
message: 'cursor killed or timed out',
stack: {}
// Mimic a timeout error
setTimeout(function() {
stream.emit('error', {
message: 'cursor killed or timed out',
stack: {}
})
stream.close()
}, 500)
stream.on('error', function () {
setTimeout(function() {
coll.insert({ c: 4 })
coll.insert({ c: 5 })
coll.insert({ c: 6 })
}, 500)
})
})
});
stream.close()
}, 500)
stream.on('error', function () {
setTimeout(function() {
coll.insert({ c: 4 });
coll.insert({ c: 5 });
coll.insert({ c: 6 });
}, 500)
});
});
});
})
it('should not throw if `destroy` called before connecting', () => {
return new Promise(function(resolve, reject) {
MongoOplog()
.then(()=>{
resolve();
});
});
})
afterEach(function (done) {
if (oplog) oplog.destroy(done)
else setTimeout(done, 10)
})
after(function (done) {
db.dropDatabase(function () {
db.close(done);
});
});
client.close(done)
})
})
});
})

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc