Socket
Socket
Sign inDemoInstall

mongochangestream

Package Overview
Dependencies
Maintainers
1
Versions
56
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongochangestream - npm Package Compare versions

Comparing version 0.34.0 to 0.35.0

5

CHANGELOG.md

@@ -0,1 +1,6 @@

# 0.35.0
- Handle master failover scenario properly for initial scan.
- Emait `initialScanComplete` when initial scan is complete.
# 0.34.0

@@ -2,0 +7,0 @@

10

dist/mongoChangeStream.d.ts

@@ -1,3 +0,2 @@

import _ from 'lodash/fp.js';
import { Collection, ObjectId, Db } from 'mongodb';
import { Collection, Db } from 'mongodb';
import { SyncOptions, ProcessRecord, ProcessRecords, ScanOptions, ChangeOptions, ChangeStreamOptions, JSONSchema } from './types.js';

@@ -19,7 +18,2 @@ import type { Redis } from 'ioredis';

};
export declare const defaultSortField: {
field: string;
serialize: _.LodashToString;
deserialize: (x: string) => ObjectId;
};
export declare const initSync: (redis: Redis, collection: Collection, options?: SyncOptions) => {

@@ -33,3 +27,3 @@ /**

*/
runInitialScan: (processRecords: ProcessRecords, options?: QueueOptions & ScanOptions) => Promise<{
runInitialScan: <T = any>(processRecords: ProcessRecords, options?: QueueOptions & ScanOptions<T>) => Promise<{
start: () => Promise<void>;

@@ -36,0 +30,0 @@ stop: () => Promise<void>;

120

dist/mongoChangeStream.js

@@ -6,3 +6,3 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.initSync = exports.defaultSortField = exports.getKeys = void 0;
exports.initSync = exports.getKeys = void 0;
const fp_js_1 = __importDefault(require("lodash/fp.js"));

@@ -42,7 +42,2 @@ const mongodb_1 = require("mongodb");

exports.getKeys = getKeys;
exports.defaultSortField = {
field: '_id',
serialize: fp_js_1.default.toString,
deserialize: (x) => new mongodb_1.ObjectId(x),
};
const stateTransitions = {

@@ -67,3 +62,3 @@ stopped: ['starting'],

const emitStateChange = (change) => emit('stateChange', change);
// Detect if resync flag is set
/** Detect if resync flag is set */
const detectResync = (resyncCheckInterval = (0, ms_1.default)('1m')) => {

@@ -109,3 +104,3 @@ let resyncTimer;

/**
* Get the timestamp of the last record updated
* Get the timestamp of the last record updated. Assumes the field is a date.
*/

@@ -123,3 +118,3 @@ const getLastRecordUpdatedAt = (field) => collection

});
const runInitialScan = async (processRecords, options = {}) => {
async function runInitialScan(processRecords, options = {}) {
let deferred;

@@ -131,2 +126,34 @@ let cursor;

});
const defaultSortField = {
field: '_id',
serialize: fp_js_1.default.toString,
deserialize: (x) => new mongodb_1.ObjectId(x),
};
const sortField = options.sortField || defaultSortField;
/** Get the last id inserted */
const getLastIdInserted = () => collection
.find()
.project({ [sortField.field]: 1 })
.sort({ [sortField.field]: -1 })
.limit(1)
.toArray()
.then((x) => {
return x[0]?.[sortField.field];
});
const getCursor = async () => {
// Lookup last id successfully processed
const lastIdProcessed = await redis.get(keys.lastScanIdKey);
debug('Last id processed %s', lastIdProcessed);
// Query collection
return (collection
// Skip ids already processed
.find(lastIdProcessed
? {
[sortField.field]: {
$gt: sortField.deserialize(lastIdProcessed),
},
}
: {}, omit ? { projection: (0, util_js_1.setDefaults)(omit, 0) } : {})
.sort({ [sortField.field]: 1 }));
};
/**

@@ -173,2 +200,3 @@ * Periodically check that records are being processed.

const healthCheck = healthChecker();
/** Start the initial scan */
const start = async () => {

@@ -187,7 +215,4 @@ debug('Starting initial scan');

deferred = (0, prom_utils_1.defer)();
const sortField = options.sortField || exports.defaultSortField;
// Redis keys
const { scanCompletedKey, lastScanIdKey } = keys;
// Determine if initial scan has already completed
const scanCompleted = await redis.get(scanCompletedKey);
const scanCompleted = await redis.get(keys.scanCompletedKey);
// Scan already completed so return

@@ -208,23 +233,20 @@ if (scanCompleted) {

await processRecords(records);
debug('Processed %d records', records.length);
const lastDocument = records[records.length - 1].fullDocument;
// Record last id of the batch
const lastId = fp_js_1.default.get(sortField.field, lastDocument);
debug('Last id %s', lastId);
if (lastId) {
await redis.mset(lastScanIdKey, sortField.serialize(lastId), keys.lastScanProcessedAtKey, new Date().getTime());
await redis.mset(keys.lastScanIdKey, sortField.serialize(lastId), keys.lastScanProcessedAtKey, new Date().getTime());
}
};
// Lookup last id successfully processed
const lastId = await redis.get(lastScanIdKey);
debug('Last scan id %s', lastId);
// Create queue
const queue = (0, prom_utils_1.batchQueue)(_processRecords, options);
// Query collection
cursor = collection
// Skip ids already processed
.find(lastId
? { [sortField.field]: { $gt: sortField.deserialize(lastId) } }
: {}, omit ? { projection: (0, util_js_1.setDefaults)(omit, 0) } : {})
.sort({ [sortField.field]: 1 });
cursor = await getCursor();
// Change state
state.change('started');
// Take a snapshot of the last id inserted into the collection
const lastIdInserted = await getLastIdInserted();
debug('Last id inserted %s', lastIdInserted);
const ns = { db: collection.dbName, coll: collection.collectionName };

@@ -235,13 +257,24 @@ // Process documents

debug('Initial scan doc %O', doc);
const changeStreamDoc = {
fullDocument: doc,
operationType: 'insert',
ns,
};
await queue.enqueue(changeStreamDoc);
// Doc can be null if cursor is closed
if (doc) {
const changeStreamDoc = {
fullDocument: doc,
operationType: 'insert',
ns,
};
await queue.enqueue(changeStreamDoc);
}
}
// Flush the queue
await queue.flush();
// Don't record scan complete if stopping
if (!state.is('stopping')) {
// Final id processed
const finalIdProcessed = await redis.get(keys.lastScanIdKey);
debug('Final id processed %s', finalIdProcessed);
// Did we complete the initial scan?
if (
// No records in the collection
!lastIdInserted ||
// Final id processed was at least the last id inserted as of start
(finalIdProcessed &&
sortField.deserialize(finalIdProcessed) >= lastIdInserted)) {
debug('Completed initial scan');

@@ -251,7 +284,11 @@ // Stop the health check

// Record scan complete
await redis.set(scanCompletedKey, new Date().toString());
await redis.set(keys.scanCompletedKey, new Date().toString());
// Emit event
emit('initialScanComplete', { lastId: finalIdProcessed });
}
// Resolve deferred
deferred.done();
debug('Exit initial scan');
};
/** Stop the initial scan */
const stop = async () => {

@@ -279,2 +316,3 @@ debug('Stopping initial scan');

};
/** Restart the initial scan */
const restart = async () => {

@@ -286,4 +324,3 @@ debug('Restarting initial scan');

return { start, stop, restart };
};
const defaultOptions = { fullDocument: 'updateLookup' };
}
const processChangeStream = async (processRecord, options = {}) => {

@@ -297,2 +334,3 @@ let deferred;

const pipeline = options.pipeline || [];
const defaultOptions = { fullDocument: 'updateLookup' };
/**

@@ -318,3 +356,3 @@ * Periodically check that change stream events are being processed.

debug('Last record created at %d', lastRecordUpdatedAt);
// A record was updated but not synced within 5 seconds of being updated
// A record was updated but not synced within maxSyncDelay of being updated
if (!state.is('stopped') &&

@@ -351,2 +389,3 @@ lastRecordUpdatedAt &&

const healthCheck = healthChecker();
/** Start processing change stream */
const start = async () => {

@@ -366,6 +405,4 @@ debug('Starting change stream');

deferred = (0, prom_utils_1.defer)();
// Redis keys
const { changeStreamTokenKey } = keys;
// Lookup change stream token
const token = await redis.get(changeStreamTokenKey);
const token = await redis.get(keys.changeStreamTokenKey);
const changeStreamOptions = token

@@ -382,3 +419,3 @@ ? // Resume token found, so set change stream resume point

}
// Get the change stream as an async iterator
// Consume change stream
while (await (0, util_js_1.safelyCheckNext)(changeStream)) {

@@ -389,2 +426,3 @@ let event = await changeStream.next();

const token = event?._id;
debug('token %o', token);
// Omit nested fields that are not handled by $unset.

@@ -398,3 +436,3 @@ // For example, if 'a' was omitted then 'a.b.c' should be omitted.

// Update change stream token
await redis.mset(changeStreamTokenKey, JSON.stringify(token), keys.lastChangeProcessedAtKey, new Date().getTime());
await redis.mset(keys.changeStreamTokenKey, JSON.stringify(token), keys.lastChangeProcessedAtKey, new Date().getTime());
}

@@ -404,2 +442,3 @@ deferred.done();

};
/** Stop processing change stream */
const stop = async () => {

@@ -427,2 +466,3 @@ debug('Stopping change stream');

};
/** Restart change stream */
const restart = async () => {

@@ -429,0 +469,0 @@ debug('Restarting change stream');

@@ -9,2 +9,8 @@ import { ChangeStreamDocument, ChangeStreamInsertDocument, Document } from 'mongodb';

}
export interface SortField<T> {
field: string;
/** Function to serialize value to string. */
serialize: (x: T) => string;
deserialize: (x: string) => T;
}
export interface ScanOptions<T = any> {

@@ -16,7 +22,4 @@ healthCheck?: {

};
sortField?: {
field: string;
serialize: (x: T) => string;
deserialize: (x: string) => T;
};
/** Defaults to _id */
sortField?: SortField<T>;
}

@@ -41,3 +44,3 @@ export interface ChangeStreamOptions {

}
export declare type Events = 'healthCheckFail' | 'resync' | 'schemaChange' | 'stateChange';
export declare type Events = 'healthCheckFail' | 'resync' | 'schemaChange' | 'stateChange' | 'initialScanComplete';
interface InitialScanFailEvent {

@@ -68,4 +71,8 @@ type: 'healthCheckFail';

}
export interface InitialScanComplete {
type: 'initialScanComplete';
lastId: string;
}
export declare type State = 'starting' | 'started' | 'stopping' | 'stopped';
export declare type SimpleState = 'started' | 'stopped';
export {};

@@ -9,2 +9,4 @@ "use strict";

const obj_walker_1 = require("obj-walker");
const debug_1 = __importDefault(require("debug"));
const debug = (0, debug_1.default)('mongochangestream');
const setDefaults = (keys, val) => {

@@ -59,5 +61,7 @@ const obj = {};

const safelyCheckNext = async (cursor) => {
debug('safelyCheckNext called');
try {
// Prevents hasNext from hanging when the cursor is already closed
if (cursor.closed) {
debug('safelyCheckNext cursor closed');
return false;

@@ -68,2 +72,3 @@ }

catch (e) {
debug('safelyCheckNext error: %o', e);
return false;

@@ -70,0 +75,0 @@ }

{
"name": "mongochangestream",
"version": "0.34.0",
"version": "0.35.0",
"description": "Sync MongoDB collections via change streams into any database.",

@@ -5,0 +5,0 @@ "author": "GovSpend",

@@ -9,3 +9,3 @@ # Mongo Change Stream

will resume where it left off. This is deterministic since the collection scan is sorted
by `_id`. Change streams will likewise resume from the last resume token upon server
by `_id` by default. Change streams will likewise resume from the last resume token upon server
restarts. See the official MongoDB docs for more information on change stream resumption:

@@ -57,3 +57,3 @@

schemaChange.start()
schemaChange.emitter.on('change', () => {
sync.emitter.on('schemaChange', () => {
initialScan.stop()

@@ -104,2 +104,5 @@ changeStream.stop()

This library is meant to be built on. To that end, the following libraries are
currently implemented and maintained.
Sync MongoDB to MongoDB

@@ -114,2 +117,22 @@ [mongo2mongo](https://www.npmjs.com/package/mongo2mongo)

## Resilience
Both the initial scan and change stream processing are designed to handle
and resume from failures. Here are some scenarios:
### The syncing server goes down
In this scenario, processing will continue with the last recorded state
when resumed.
### The syncing server is being shutdown with a sigterm
In this scenario, calling `stop` for the initial scan and change stream
will cleanly end processing.
### The MongoDB primary goes down and a new primary is elected
In this scenario, you will need to enable health checks and restart the
initial scan and/or change stream when a health check failure occurs.
## Change Stream Strategies

@@ -116,0 +139,0 @@

@@ -10,3 +10,2 @@ /**

JSONSchema,
SyncOptions,
SchemaChangeEvent,

@@ -28,3 +27,4 @@ ScanOptions,

const init = _.memoize(async (options?: SyncOptions) => {
const getConns = _.memoize(async (x?: any) => {
console.log(x)
const redis = new Redis({ keyPrefix: 'testing:' })

@@ -34,8 +34,13 @@ const client = await MongoClient.connect(process.env.MONGO_CONN as string)

const coll = db.collection('testing')
const sync = initSync(redis, coll, options)
sync.emitter.on('stateChange', console.log)
return { sync, db, coll, redis }
return { client, db, coll, redis }
})
const getSync = async () => {
const { redis, coll } = await getConns()
const sync = initSync(redis, coll)
sync.emitter.on('stateChange', console.log)
return sync
}
const genUser = () => ({

@@ -65,5 +70,5 @@ name: faker.name.fullName(),

const populateCollection = (collection: Collection) => {
const populateCollection = (collection: Collection, count = numDocs) => {
const users = []
for (let i = 0; i < numDocs; i++) {
for (let i = 0; i < count; i++) {
users.push({ insertOne: { document: genUser() } })

@@ -74,4 +79,6 @@ }

const before = async () => {
const { sync, coll } = await init()
const initState = async (
sync: ReturnType<typeof initSync>,
coll: Collection
) => {
// Reset state

@@ -85,4 +92,5 @@ await sync.reset()

test('should complete initial scan', async () => {
const { sync } = await init()
await before()
const { coll } = await getConns()
const sync = await getSync()
await initState(sync, coll)

@@ -103,6 +111,10 @@ const processed = []

test('initial scan should resume properly', async () => {
const { sync } = await init()
await before()
test('should complete initial scan if collection is empty', async () => {
const { coll } = await getConns()
const sync = await getSync()
// Reset state
await sync.reset()
await coll.deleteMany({})
const processed = []

@@ -113,13 +125,45 @@ const processRecords = async (docs: ChangeStreamInsertDocument[]) => {

}
let completed = false
sync.emitter.on('initialScanComplete', () => {
completed = true
})
const scanOptions = { batchSize: 100 }
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
// Wait for initial scan to complete
await initialScan.start()
assert.ok(completed)
assert.equal(processed.length, 0)
// Stop
await initialScan.stop()
})
test('initial scan should resume after stop', async () => {
const { coll } = await getConns()
const sync = await getSync()
await initState(sync, coll)
const processed = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(10)
processed.push(...docs)
}
const scanOptions = { batchSize: 50 }
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
let completed = false
sync.emitter.on('initialScanComplete', () => {
completed = true
})
// Start
initialScan.start()
// Allow for some records to be processed
await setTimeout(50)
await setTimeout(500)
// Stop the initial scan
await initialScan.stop()
// Wait for the initial scan to complete
await initialScan.start()
assert.equal(processed.length, numDocs)
initialScan.start()
// Add some more records
await populateCollection(coll, 10)
await setTimeout(ms('5s'))
assert.ok(completed)
assert.equal(processed.length, numDocs + 10)
// Stop

@@ -129,5 +173,35 @@ await initialScan.stop()

test('initial scan should not be marked as completed if connection is closed', async () => {
// Memoize hack
const { coll, redis, client } = await getConns({})
const sync = initSync(redis, coll)
sync.emitter.on('stateChange', console.log)
await initState(sync, coll)
const processed = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(10)
processed.push(...docs)
}
const scanOptions = { batchSize: 50 }
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
// Start
initialScan.start()
// Allow for some records to be processed
await setTimeout(200)
// Close the connection.
await client.close()
// Allow for some time
await setTimeout(100)
// Check if completed
const completedAt = await redis.get(sync.keys.scanCompletedKey)
assert.equal(completedAt, null)
// Stop
await initialScan.stop()
})
test('should process records via change stream', async () => {
const { sync, coll } = await init()
await before()
const { coll } = await getConns()
const sync = await getSync()
await initState(sync, coll)

@@ -153,4 +227,5 @@ const processed = []

test('change stream should resume properly', async () => {
const { sync, coll } = await init()
await before()
const { coll } = await getConns()
const sync = await getSync()
await initState(sync, coll)

@@ -183,3 +258,3 @@ const processed = []

test('starting change stream is idempotent', async () => {
const { sync } = await init()
const sync = await getSync()
// Change stream

@@ -197,4 +272,5 @@ const processRecord = async () => {

test('stopping change stream is idempotent', async () => {
const { sync, coll } = await init()
await before()
const { coll } = await getConns()
const sync = await getSync()
await initState(sync, coll)

@@ -215,4 +291,5 @@ // Change stream

test('starting initial scan is idempotent', async () => {
const { sync } = await init()
await before()
const { coll } = await getConns()
const sync = await getSync()
await initState(sync, coll)

@@ -230,4 +307,5 @@ const processRecords = async () => {

test('stopping initial scan is idempotent', async () => {
const { sync } = await init()
await before()
const { coll } = await getConns()
const sync = await getSync()
await initState(sync, coll)

@@ -246,4 +324,5 @@ const processRecords = async () => {

test('Should resync when resync flag is set', async () => {
const { sync, redis } = await init()
await before()
const { coll, redis } = await getConns()
const sync = await getSync()
await initState(sync, coll)

@@ -290,3 +369,3 @@ let resyncTriggered = false

test('Resync start/stop is idempotent', async () => {
const { sync } = await init()
const sync = await getSync()

@@ -301,3 +380,4 @@ const resync = sync.detectResync()

test('Detect schema change', async () => {
const { db, coll, sync } = await init()
const { db, coll } = await getConns()
const sync = await getSync()
// Set schema

@@ -331,3 +411,4 @@ await db.command({

test('Schema change start/stop is idempotent', async () => {
const { sync, db } = await init()
const { db } = await getConns()
const sync = await getSync()

@@ -342,4 +423,5 @@ const schemaChange = await sync.detectSchemaChange(db)

test('should fail health check - initial scan', async () => {
const { sync } = await init()
await before()
const { coll } = await getConns()
const sync = await getSync()
await initState(sync, coll)

@@ -371,4 +453,5 @@ let healthCheckFailed = false

test('should fail health check - change stream', async () => {
const { sync, coll } = await init()
await before()
const { coll } = await getConns()
const sync = await getSync()
await initState(sync, coll)

@@ -375,0 +458,0 @@ let healthCheckFailed = false

@@ -9,2 +9,3 @@ import _ from 'lodash/fp.js'

} from 'mongodb'
import * as mongodb from 'mongodb'
import {

@@ -21,2 +22,3 @@ Events,

SimpleState,
SortField,
} from './types.js'

@@ -67,8 +69,2 @@ import _debug from 'debug'

export const defaultSortField = {
field: '_id',
serialize: _.toString,
deserialize: (x: string) => new ObjectId(x),
}
const stateTransitions: StateTransitions<State> = {

@@ -100,3 +96,3 @@ stopped: ['starting'],

// Detect if resync flag is set
/** Detect if resync flag is set */
const detectResync = (resyncCheckInterval = ms('1m')) => {

@@ -149,3 +145,3 @@ let resyncTimer: NodeJS.Timer

/**
* Get the timestamp of the last record updated
* Get the timestamp of the last record updated. Assumes the field is a date.
*/

@@ -165,6 +161,6 @@ const getLastRecordUpdatedAt = (field: string): Promise<number | undefined> =>

const runInitialScan = async (
async function runInitialScan<T = any>(
processRecords: ProcessRecords,
options: QueueOptions & ScanOptions = {}
) => {
options: QueueOptions & ScanOptions<T> = {}
) {
let deferred: Deferred

@@ -176,3 +172,44 @@ let cursor: ReturnType<typeof collection.find>

})
const defaultSortField: SortField<ObjectId> = {
field: '_id',
serialize: _.toString,
deserialize: (x: string) => new ObjectId(x),
}
const sortField = options.sortField || defaultSortField
/** Get the last id inserted */
const getLastIdInserted = () =>
collection
.find()
.project({ [sortField.field]: 1 })
.sort({ [sortField.field]: -1 })
.limit(1)
.toArray()
.then((x) => {
return x[0]?.[sortField.field]
})
const getCursor = async () => {
// Lookup last id successfully processed
const lastIdProcessed = await redis.get(keys.lastScanIdKey)
debug('Last id processed %s', lastIdProcessed)
// Query collection
return (
collection
// Skip ids already processed
.find(
lastIdProcessed
? {
[sortField.field]: {
$gt: sortField.deserialize(lastIdProcessed),
},
}
: {},
omit ? { projection: setDefaults(omit, 0) } : {}
)
.sort({ [sortField.field]: 1 })
)
}
/**

@@ -222,2 +259,3 @@ * Periodically check that records are being processed.

/** Start the initial scan */
const start = async () => {

@@ -236,7 +274,4 @@ debug('Starting initial scan')

deferred = defer()
const sortField = options.sortField || defaultSortField
// Redis keys
const { scanCompletedKey, lastScanIdKey } = keys
// Determine if initial scan has already completed
const scanCompleted = await redis.get(scanCompletedKey)
const scanCompleted = await redis.get(keys.scanCompletedKey)
// Scan already completed so return

@@ -258,8 +293,10 @@ if (scanCompleted) {

await processRecords(records)
debug('Processed %d records', records.length)
const lastDocument = records[records.length - 1].fullDocument
// Record last id of the batch
const lastId = _.get(sortField.field, lastDocument)
debug('Last id %s', lastId)
if (lastId) {
await redis.mset(
lastScanIdKey,
keys.lastScanIdKey,
sortField.serialize(lastId),

@@ -271,19 +308,12 @@ keys.lastScanProcessedAtKey,

}
// Lookup last id successfully processed
const lastId = await redis.get(lastScanIdKey)
debug('Last scan id %s', lastId)
// Create queue
const queue = batchQueue(_processRecords, options)
// Query collection
cursor = collection
// Skip ids already processed
.find(
lastId
? { [sortField.field]: { $gt: sortField.deserialize(lastId) } }
: {},
omit ? { projection: setDefaults(omit, 0) } : {}
)
.sort({ [sortField.field]: 1 })
cursor = await getCursor()
// Change state
state.change('started')
// Take a snapshot of the last id inserted into the collection
const lastIdInserted = await getLastIdInserted()
debug('Last id inserted %s', lastIdInserted)
const ns = { db: collection.dbName, coll: collection.collectionName }

@@ -294,13 +324,25 @@ // Process documents

debug('Initial scan doc %O', doc)
const changeStreamDoc = {
fullDocument: doc,
operationType: 'insert',
ns,
} as unknown as ChangeStreamInsertDocument
await queue.enqueue(changeStreamDoc)
// Doc can be null if cursor is closed
if (doc) {
const changeStreamDoc = {
fullDocument: doc,
operationType: 'insert',
ns,
} as unknown as ChangeStreamInsertDocument
await queue.enqueue(changeStreamDoc)
}
}
// Flush the queue
await queue.flush()
// Don't record scan complete if stopping
if (!state.is('stopping')) {
// Final id processed
const finalIdProcessed = await redis.get(keys.lastScanIdKey)
debug('Final id processed %s', finalIdProcessed)
// Did we complete the initial scan?
if (
// No records in the collection
!lastIdInserted ||
// Final id processed was at least the last id inserted as of start
(finalIdProcessed &&
sortField.deserialize(finalIdProcessed) >= lastIdInserted)
) {
debug('Completed initial scan')

@@ -310,4 +352,7 @@ // Stop the health check

// Record scan complete
await redis.set(scanCompletedKey, new Date().toString())
await redis.set(keys.scanCompletedKey, new Date().toString())
// Emit event
emit('initialScanComplete', { lastId: finalIdProcessed })
}
// Resolve deferred
deferred.done()

@@ -317,2 +362,3 @@ debug('Exit initial scan')

/** Stop the initial scan */
const stop = async () => {

@@ -341,2 +387,3 @@ debug('Stopping initial scan')

/** Restart the initial scan */
const restart = async () => {

@@ -351,4 +398,2 @@ debug('Restarting initial scan')

const defaultOptions = { fullDocument: 'updateLookup' }
const processChangeStream = async (

@@ -365,2 +410,3 @@ processRecord: ProcessRecord,

const pipeline = options.pipeline || []
const defaultOptions = { fullDocument: 'updateLookup' }

@@ -392,3 +438,3 @@ /**

debug('Last record created at %d', lastRecordUpdatedAt)
// A record was updated but not synced within 5 seconds of being updated
// A record was updated but not synced within maxSyncDelay of being updated
if (

@@ -429,2 +475,3 @@ !state.is('stopped') &&

/** Start processing change stream */
const start = async () => {

@@ -444,7 +491,5 @@ debug('Starting change stream')

deferred = defer()
// Redis keys
const { changeStreamTokenKey } = keys
// Lookup change stream token
const token = await redis.get(changeStreamTokenKey)
const changeStreamOptions = token
const token = await redis.get(keys.changeStreamTokenKey)
const changeStreamOptions: mongodb.ChangeStreamOptions = token
? // Resume token found, so set change stream resume point

@@ -463,3 +508,3 @@ { ...defaultOptions, resumeAfter: JSON.parse(token) }

}
// Get the change stream as an async iterator
// Consume change stream
while (await safelyCheckNext(changeStream)) {

@@ -470,2 +515,3 @@ let event = await changeStream.next()

const token = event?._id
debug('token %o', token)
// Omit nested fields that are not handled by $unset.

@@ -480,3 +526,3 @@ // For example, if 'a' was omitted then 'a.b.c' should be omitted.

await redis.mset(
changeStreamTokenKey,
keys.changeStreamTokenKey,
JSON.stringify(token),

@@ -491,2 +537,3 @@ keys.lastChangeProcessedAtKey,

/** Stop processing change stream */
const stop = async () => {

@@ -515,2 +562,3 @@ debug('Stopping change stream')

/** Restart change stream */
const restart = async () => {

@@ -517,0 +565,0 @@ debug('Restarting change stream')

@@ -22,2 +22,9 @@ import {

export interface SortField<T> {
field: string
/** Function to serialize value to string. */
serialize: (x: T) => string
deserialize: (x: string) => T
}
export interface ScanOptions<T = any> {

@@ -29,7 +36,4 @@ healthCheck?: {

}
sortField?: {
field: string
serialize: (x: T) => string
deserialize: (x: string) => T
}
/** Defaults to _id */
sortField?: SortField<T>
}

@@ -64,2 +68,3 @@

| 'stateChange'
| 'initialScanComplete'

@@ -97,2 +102,7 @@ interface InitialScanFailEvent {

export interface InitialScanComplete {
type: 'initialScanComplete'
lastId: string
}
// State

@@ -99,0 +109,0 @@

@@ -5,3 +5,6 @@ import { Collection, ChangeStream, FindCursor } from 'mongodb'

import { JSONSchema } from './types'
import _debug from 'debug'
const debug = _debug('mongochangestream')
export const setDefaults = (keys: string[], val: any) => {

@@ -62,5 +65,7 @@ const obj: Record<string, any> = {}

export const safelyCheckNext = async (cursor: ChangeStream | FindCursor) => {
debug('safelyCheckNext called')
try {
// Prevents hasNext from hanging when the cursor is already closed
if (cursor.closed) {
debug('safelyCheckNext cursor closed')
return false

@@ -70,4 +75,5 @@ }

} catch (e) {
debug('safelyCheckNext error: %o', e)
return false
}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc