mongochangestream
Advanced tools
Comparing version 0.34.0 to 0.35.0
@@ -0,1 +1,6 @@ | ||
# 0.35.0 | ||
- Handle master failover scenario properly for initial scan. | ||
- Emait `initialScanComplete` when initial scan is complete. | ||
# 0.34.0 | ||
@@ -2,0 +7,0 @@ |
@@ -1,3 +0,2 @@ | ||
import _ from 'lodash/fp.js'; | ||
import { Collection, ObjectId, Db } from 'mongodb'; | ||
import { Collection, Db } from 'mongodb'; | ||
import { SyncOptions, ProcessRecord, ProcessRecords, ScanOptions, ChangeOptions, ChangeStreamOptions, JSONSchema } from './types.js'; | ||
@@ -19,7 +18,2 @@ import type { Redis } from 'ioredis'; | ||
}; | ||
export declare const defaultSortField: { | ||
field: string; | ||
serialize: _.LodashToString; | ||
deserialize: (x: string) => ObjectId; | ||
}; | ||
export declare const initSync: (redis: Redis, collection: Collection, options?: SyncOptions) => { | ||
@@ -33,3 +27,3 @@ /** | ||
*/ | ||
runInitialScan: (processRecords: ProcessRecords, options?: QueueOptions & ScanOptions) => Promise<{ | ||
runInitialScan: <T = any>(processRecords: ProcessRecords, options?: QueueOptions & ScanOptions<T>) => Promise<{ | ||
start: () => Promise<void>; | ||
@@ -36,0 +30,0 @@ stop: () => Promise<void>; |
@@ -6,3 +6,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.initSync = exports.defaultSortField = exports.getKeys = void 0; | ||
exports.initSync = exports.getKeys = void 0; | ||
const fp_js_1 = __importDefault(require("lodash/fp.js")); | ||
@@ -42,7 +42,2 @@ const mongodb_1 = require("mongodb"); | ||
exports.getKeys = getKeys; | ||
exports.defaultSortField = { | ||
field: '_id', | ||
serialize: fp_js_1.default.toString, | ||
deserialize: (x) => new mongodb_1.ObjectId(x), | ||
}; | ||
const stateTransitions = { | ||
@@ -67,3 +62,3 @@ stopped: ['starting'], | ||
const emitStateChange = (change) => emit('stateChange', change); | ||
// Detect if resync flag is set | ||
/** Detect if resync flag is set */ | ||
const detectResync = (resyncCheckInterval = (0, ms_1.default)('1m')) => { | ||
@@ -109,3 +104,3 @@ let resyncTimer; | ||
/** | ||
* Get the timestamp of the last record updated | ||
* Get the timestamp of the last record updated. Assumes the field is a date. | ||
*/ | ||
@@ -123,3 +118,3 @@ const getLastRecordUpdatedAt = (field) => collection | ||
}); | ||
const runInitialScan = async (processRecords, options = {}) => { | ||
async function runInitialScan(processRecords, options = {}) { | ||
let deferred; | ||
@@ -131,2 +126,34 @@ let cursor; | ||
}); | ||
const defaultSortField = { | ||
field: '_id', | ||
serialize: fp_js_1.default.toString, | ||
deserialize: (x) => new mongodb_1.ObjectId(x), | ||
}; | ||
const sortField = options.sortField || defaultSortField; | ||
/** Get the last id inserted */ | ||
const getLastIdInserted = () => collection | ||
.find() | ||
.project({ [sortField.field]: 1 }) | ||
.sort({ [sortField.field]: -1 }) | ||
.limit(1) | ||
.toArray() | ||
.then((x) => { | ||
return x[0]?.[sortField.field]; | ||
}); | ||
const getCursor = async () => { | ||
// Lookup last id successfully processed | ||
const lastIdProcessed = await redis.get(keys.lastScanIdKey); | ||
debug('Last id processed %s', lastIdProcessed); | ||
// Query collection | ||
return (collection | ||
// Skip ids already processed | ||
.find(lastIdProcessed | ||
? { | ||
[sortField.field]: { | ||
$gt: sortField.deserialize(lastIdProcessed), | ||
}, | ||
} | ||
: {}, omit ? { projection: (0, util_js_1.setDefaults)(omit, 0) } : {}) | ||
.sort({ [sortField.field]: 1 })); | ||
}; | ||
/** | ||
@@ -173,2 +200,3 @@ * Periodically check that records are being processed. | ||
const healthCheck = healthChecker(); | ||
/** Start the initial scan */ | ||
const start = async () => { | ||
@@ -187,7 +215,4 @@ debug('Starting initial scan'); | ||
deferred = (0, prom_utils_1.defer)(); | ||
const sortField = options.sortField || exports.defaultSortField; | ||
// Redis keys | ||
const { scanCompletedKey, lastScanIdKey } = keys; | ||
// Determine if initial scan has already completed | ||
const scanCompleted = await redis.get(scanCompletedKey); | ||
const scanCompleted = await redis.get(keys.scanCompletedKey); | ||
// Scan already completed so return | ||
@@ -208,23 +233,20 @@ if (scanCompleted) { | ||
await processRecords(records); | ||
debug('Processed %d records', records.length); | ||
const lastDocument = records[records.length - 1].fullDocument; | ||
// Record last id of the batch | ||
const lastId = fp_js_1.default.get(sortField.field, lastDocument); | ||
debug('Last id %s', lastId); | ||
if (lastId) { | ||
await redis.mset(lastScanIdKey, sortField.serialize(lastId), keys.lastScanProcessedAtKey, new Date().getTime()); | ||
await redis.mset(keys.lastScanIdKey, sortField.serialize(lastId), keys.lastScanProcessedAtKey, new Date().getTime()); | ||
} | ||
}; | ||
// Lookup last id successfully processed | ||
const lastId = await redis.get(lastScanIdKey); | ||
debug('Last scan id %s', lastId); | ||
// Create queue | ||
const queue = (0, prom_utils_1.batchQueue)(_processRecords, options); | ||
// Query collection | ||
cursor = collection | ||
// Skip ids already processed | ||
.find(lastId | ||
? { [sortField.field]: { $gt: sortField.deserialize(lastId) } } | ||
: {}, omit ? { projection: (0, util_js_1.setDefaults)(omit, 0) } : {}) | ||
.sort({ [sortField.field]: 1 }); | ||
cursor = await getCursor(); | ||
// Change state | ||
state.change('started'); | ||
// Take a snapshot of the last id inserted into the collection | ||
const lastIdInserted = await getLastIdInserted(); | ||
debug('Last id inserted %s', lastIdInserted); | ||
const ns = { db: collection.dbName, coll: collection.collectionName }; | ||
@@ -235,13 +257,24 @@ // Process documents | ||
debug('Initial scan doc %O', doc); | ||
const changeStreamDoc = { | ||
fullDocument: doc, | ||
operationType: 'insert', | ||
ns, | ||
}; | ||
await queue.enqueue(changeStreamDoc); | ||
// Doc can be null if cursor is closed | ||
if (doc) { | ||
const changeStreamDoc = { | ||
fullDocument: doc, | ||
operationType: 'insert', | ||
ns, | ||
}; | ||
await queue.enqueue(changeStreamDoc); | ||
} | ||
} | ||
// Flush the queue | ||
await queue.flush(); | ||
// Don't record scan complete if stopping | ||
if (!state.is('stopping')) { | ||
// Final id processed | ||
const finalIdProcessed = await redis.get(keys.lastScanIdKey); | ||
debug('Final id processed %s', finalIdProcessed); | ||
// Did we complete the initial scan? | ||
if ( | ||
// No records in the collection | ||
!lastIdInserted || | ||
// Final id processed was at least the last id inserted as of start | ||
(finalIdProcessed && | ||
sortField.deserialize(finalIdProcessed) >= lastIdInserted)) { | ||
debug('Completed initial scan'); | ||
@@ -251,7 +284,11 @@ // Stop the health check | ||
// Record scan complete | ||
await redis.set(scanCompletedKey, new Date().toString()); | ||
await redis.set(keys.scanCompletedKey, new Date().toString()); | ||
// Emit event | ||
emit('initialScanComplete', { lastId: finalIdProcessed }); | ||
} | ||
// Resolve deferred | ||
deferred.done(); | ||
debug('Exit initial scan'); | ||
}; | ||
/** Stop the initial scan */ | ||
const stop = async () => { | ||
@@ -279,2 +316,3 @@ debug('Stopping initial scan'); | ||
}; | ||
/** Restart the initial scan */ | ||
const restart = async () => { | ||
@@ -286,4 +324,3 @@ debug('Restarting initial scan'); | ||
return { start, stop, restart }; | ||
}; | ||
const defaultOptions = { fullDocument: 'updateLookup' }; | ||
} | ||
const processChangeStream = async (processRecord, options = {}) => { | ||
@@ -297,2 +334,3 @@ let deferred; | ||
const pipeline = options.pipeline || []; | ||
const defaultOptions = { fullDocument: 'updateLookup' }; | ||
/** | ||
@@ -318,3 +356,3 @@ * Periodically check that change stream events are being processed. | ||
debug('Last record created at %d', lastRecordUpdatedAt); | ||
// A record was updated but not synced within 5 seconds of being updated | ||
// A record was updated but not synced within maxSyncDelay of being updated | ||
if (!state.is('stopped') && | ||
@@ -351,2 +389,3 @@ lastRecordUpdatedAt && | ||
const healthCheck = healthChecker(); | ||
/** Start processing change stream */ | ||
const start = async () => { | ||
@@ -366,6 +405,4 @@ debug('Starting change stream'); | ||
deferred = (0, prom_utils_1.defer)(); | ||
// Redis keys | ||
const { changeStreamTokenKey } = keys; | ||
// Lookup change stream token | ||
const token = await redis.get(changeStreamTokenKey); | ||
const token = await redis.get(keys.changeStreamTokenKey); | ||
const changeStreamOptions = token | ||
@@ -382,3 +419,3 @@ ? // Resume token found, so set change stream resume point | ||
} | ||
// Get the change stream as an async iterator | ||
// Consume change stream | ||
while (await (0, util_js_1.safelyCheckNext)(changeStream)) { | ||
@@ -389,2 +426,3 @@ let event = await changeStream.next(); | ||
const token = event?._id; | ||
debug('token %o', token); | ||
// Omit nested fields that are not handled by $unset. | ||
@@ -398,3 +436,3 @@ // For example, if 'a' was omitted then 'a.b.c' should be omitted. | ||
// Update change stream token | ||
await redis.mset(changeStreamTokenKey, JSON.stringify(token), keys.lastChangeProcessedAtKey, new Date().getTime()); | ||
await redis.mset(keys.changeStreamTokenKey, JSON.stringify(token), keys.lastChangeProcessedAtKey, new Date().getTime()); | ||
} | ||
@@ -404,2 +442,3 @@ deferred.done(); | ||
}; | ||
/** Stop processing change stream */ | ||
const stop = async () => { | ||
@@ -427,2 +466,3 @@ debug('Stopping change stream'); | ||
}; | ||
/** Restart change stream */ | ||
const restart = async () => { | ||
@@ -429,0 +469,0 @@ debug('Restarting change stream'); |
@@ -9,2 +9,8 @@ import { ChangeStreamDocument, ChangeStreamInsertDocument, Document } from 'mongodb'; | ||
} | ||
export interface SortField<T> { | ||
field: string; | ||
/** Function to serialize value to string. */ | ||
serialize: (x: T) => string; | ||
deserialize: (x: string) => T; | ||
} | ||
export interface ScanOptions<T = any> { | ||
@@ -16,7 +22,4 @@ healthCheck?: { | ||
}; | ||
sortField?: { | ||
field: string; | ||
serialize: (x: T) => string; | ||
deserialize: (x: string) => T; | ||
}; | ||
/** Defaults to _id */ | ||
sortField?: SortField<T>; | ||
} | ||
@@ -41,3 +44,3 @@ export interface ChangeStreamOptions { | ||
} | ||
export declare type Events = 'healthCheckFail' | 'resync' | 'schemaChange' | 'stateChange'; | ||
export declare type Events = 'healthCheckFail' | 'resync' | 'schemaChange' | 'stateChange' | 'initialScanComplete'; | ||
interface InitialScanFailEvent { | ||
@@ -68,4 +71,8 @@ type: 'healthCheckFail'; | ||
} | ||
export interface InitialScanComplete { | ||
type: 'initialScanComplete'; | ||
lastId: string; | ||
} | ||
export declare type State = 'starting' | 'started' | 'stopping' | 'stopped'; | ||
export declare type SimpleState = 'started' | 'stopped'; | ||
export {}; |
@@ -9,2 +9,4 @@ "use strict"; | ||
const obj_walker_1 = require("obj-walker"); | ||
const debug_1 = __importDefault(require("debug")); | ||
const debug = (0, debug_1.default)('mongochangestream'); | ||
const setDefaults = (keys, val) => { | ||
@@ -59,5 +61,7 @@ const obj = {}; | ||
const safelyCheckNext = async (cursor) => { | ||
debug('safelyCheckNext called'); | ||
try { | ||
// Prevents hasNext from hanging when the cursor is already closed | ||
if (cursor.closed) { | ||
debug('safelyCheckNext cursor closed'); | ||
return false; | ||
@@ -68,2 +72,3 @@ } | ||
catch (e) { | ||
debug('safelyCheckNext error: %o', e); | ||
return false; | ||
@@ -70,0 +75,0 @@ } |
{ | ||
"name": "mongochangestream", | ||
"version": "0.34.0", | ||
"version": "0.35.0", | ||
"description": "Sync MongoDB collections via change streams into any database.", | ||
@@ -5,0 +5,0 @@ "author": "GovSpend", |
@@ -9,3 +9,3 @@ # Mongo Change Stream | ||
will resume where it left off. This is deterministic since the collection scan is sorted | ||
by `_id`. Change streams will likewise resume from the last resume token upon server | ||
by `_id` by default. Change streams will likewise resume from the last resume token upon server | ||
restarts. See the official MongoDB docs for more information on change stream resumption: | ||
@@ -57,3 +57,3 @@ | ||
schemaChange.start() | ||
schemaChange.emitter.on('change', () => { | ||
sync.emitter.on('schemaChange', () => { | ||
initialScan.stop() | ||
@@ -104,2 +104,5 @@ changeStream.stop() | ||
This library is meant to be built on. To that end, the following libraries are | ||
currently implemented and maintained. | ||
Sync MongoDB to MongoDB | ||
@@ -114,2 +117,22 @@ [mongo2mongo](https://www.npmjs.com/package/mongo2mongo) | ||
## Resilience | ||
Both the initial scan and change stream processing are designed to handle | ||
and resume from failures. Here are some scenarios: | ||
### The syncing server goes down | ||
In this scenario, processing will continue with the last recorded state | ||
when resumed. | ||
### The syncing server is being shutdown with a sigterm | ||
In this scenario, calling `stop` for the initial scan and change stream | ||
will cleanly end processing. | ||
### The MongoDB primary goes down and a new primary is elected | ||
In this scenario, you will need to enable health checks and restart the | ||
initial scan and/or change stream when a health check failure occurs. | ||
## Change Stream Strategies | ||
@@ -116,0 +139,0 @@ |
@@ -10,3 +10,2 @@ /** | ||
JSONSchema, | ||
SyncOptions, | ||
SchemaChangeEvent, | ||
@@ -28,3 +27,4 @@ ScanOptions, | ||
const init = _.memoize(async (options?: SyncOptions) => { | ||
const getConns = _.memoize(async (x?: any) => { | ||
console.log(x) | ||
const redis = new Redis({ keyPrefix: 'testing:' }) | ||
@@ -34,8 +34,13 @@ const client = await MongoClient.connect(process.env.MONGO_CONN as string) | ||
const coll = db.collection('testing') | ||
const sync = initSync(redis, coll, options) | ||
sync.emitter.on('stateChange', console.log) | ||
return { sync, db, coll, redis } | ||
return { client, db, coll, redis } | ||
}) | ||
const getSync = async () => { | ||
const { redis, coll } = await getConns() | ||
const sync = initSync(redis, coll) | ||
sync.emitter.on('stateChange', console.log) | ||
return sync | ||
} | ||
const genUser = () => ({ | ||
@@ -65,5 +70,5 @@ name: faker.name.fullName(), | ||
const populateCollection = (collection: Collection) => { | ||
const populateCollection = (collection: Collection, count = numDocs) => { | ||
const users = [] | ||
for (let i = 0; i < numDocs; i++) { | ||
for (let i = 0; i < count; i++) { | ||
users.push({ insertOne: { document: genUser() } }) | ||
@@ -74,4 +79,6 @@ } | ||
const before = async () => { | ||
const { sync, coll } = await init() | ||
const initState = async ( | ||
sync: ReturnType<typeof initSync>, | ||
coll: Collection | ||
) => { | ||
// Reset state | ||
@@ -85,4 +92,5 @@ await sync.reset() | ||
test('should complete initial scan', async () => { | ||
const { sync } = await init() | ||
await before() | ||
const { coll } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, coll) | ||
@@ -103,6 +111,10 @@ const processed = [] | ||
test('initial scan should resume properly', async () => { | ||
const { sync } = await init() | ||
await before() | ||
test('should complete initial scan if collection is empty', async () => { | ||
const { coll } = await getConns() | ||
const sync = await getSync() | ||
// Reset state | ||
await sync.reset() | ||
await coll.deleteMany({}) | ||
const processed = [] | ||
@@ -113,13 +125,45 @@ const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
} | ||
let completed = false | ||
sync.emitter.on('initialScanComplete', () => { | ||
completed = true | ||
}) | ||
const scanOptions = { batchSize: 100 } | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
// Wait for initial scan to complete | ||
await initialScan.start() | ||
assert.ok(completed) | ||
assert.equal(processed.length, 0) | ||
// Stop | ||
await initialScan.stop() | ||
}) | ||
test('initial scan should resume after stop', async () => { | ||
const { coll } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, coll) | ||
const processed = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(10) | ||
processed.push(...docs) | ||
} | ||
const scanOptions = { batchSize: 50 } | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
let completed = false | ||
sync.emitter.on('initialScanComplete', () => { | ||
completed = true | ||
}) | ||
// Start | ||
initialScan.start() | ||
// Allow for some records to be processed | ||
await setTimeout(50) | ||
await setTimeout(500) | ||
// Stop the initial scan | ||
await initialScan.stop() | ||
// Wait for the initial scan to complete | ||
await initialScan.start() | ||
assert.equal(processed.length, numDocs) | ||
initialScan.start() | ||
// Add some more records | ||
await populateCollection(coll, 10) | ||
await setTimeout(ms('5s')) | ||
assert.ok(completed) | ||
assert.equal(processed.length, numDocs + 10) | ||
// Stop | ||
@@ -129,5 +173,35 @@ await initialScan.stop() | ||
test('initial scan should not be marked as completed if connection is closed', async () => { | ||
// Memoize hack | ||
const { coll, redis, client } = await getConns({}) | ||
const sync = initSync(redis, coll) | ||
sync.emitter.on('stateChange', console.log) | ||
await initState(sync, coll) | ||
const processed = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(10) | ||
processed.push(...docs) | ||
} | ||
const scanOptions = { batchSize: 50 } | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
// Start | ||
initialScan.start() | ||
// Allow for some records to be processed | ||
await setTimeout(200) | ||
// Close the connection. | ||
await client.close() | ||
// Allow for some time | ||
await setTimeout(100) | ||
// Check if completed | ||
const completedAt = await redis.get(sync.keys.scanCompletedKey) | ||
assert.equal(completedAt, null) | ||
// Stop | ||
await initialScan.stop() | ||
}) | ||
test('should process records via change stream', async () => { | ||
const { sync, coll } = await init() | ||
await before() | ||
const { coll } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, coll) | ||
@@ -153,4 +227,5 @@ const processed = [] | ||
test('change stream should resume properly', async () => { | ||
const { sync, coll } = await init() | ||
await before() | ||
const { coll } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, coll) | ||
@@ -183,3 +258,3 @@ const processed = [] | ||
test('starting change stream is idempotent', async () => { | ||
const { sync } = await init() | ||
const sync = await getSync() | ||
// Change stream | ||
@@ -197,4 +272,5 @@ const processRecord = async () => { | ||
test('stopping change stream is idempotent', async () => { | ||
const { sync, coll } = await init() | ||
await before() | ||
const { coll } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, coll) | ||
@@ -215,4 +291,5 @@ // Change stream | ||
test('starting initial scan is idempotent', async () => { | ||
const { sync } = await init() | ||
await before() | ||
const { coll } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, coll) | ||
@@ -230,4 +307,5 @@ const processRecords = async () => { | ||
test('stopping initial scan is idempotent', async () => { | ||
const { sync } = await init() | ||
await before() | ||
const { coll } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, coll) | ||
@@ -246,4 +324,5 @@ const processRecords = async () => { | ||
test('Should resync when resync flag is set', async () => { | ||
const { sync, redis } = await init() | ||
await before() | ||
const { coll, redis } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, coll) | ||
@@ -290,3 +369,3 @@ let resyncTriggered = false | ||
test('Resync start/stop is idempotent', async () => { | ||
const { sync } = await init() | ||
const sync = await getSync() | ||
@@ -301,3 +380,4 @@ const resync = sync.detectResync() | ||
test('Detect schema change', async () => { | ||
const { db, coll, sync } = await init() | ||
const { db, coll } = await getConns() | ||
const sync = await getSync() | ||
// Set schema | ||
@@ -331,3 +411,4 @@ await db.command({ | ||
test('Schema change start/stop is idempotent', async () => { | ||
const { sync, db } = await init() | ||
const { db } = await getConns() | ||
const sync = await getSync() | ||
@@ -342,4 +423,5 @@ const schemaChange = await sync.detectSchemaChange(db) | ||
test('should fail health check - initial scan', async () => { | ||
const { sync } = await init() | ||
await before() | ||
const { coll } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, coll) | ||
@@ -371,4 +453,5 @@ let healthCheckFailed = false | ||
test('should fail health check - change stream', async () => { | ||
const { sync, coll } = await init() | ||
await before() | ||
const { coll } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, coll) | ||
@@ -375,0 +458,0 @@ let healthCheckFailed = false |
@@ -9,2 +9,3 @@ import _ from 'lodash/fp.js' | ||
} from 'mongodb' | ||
import * as mongodb from 'mongodb' | ||
import { | ||
@@ -21,2 +22,3 @@ Events, | ||
SimpleState, | ||
SortField, | ||
} from './types.js' | ||
@@ -67,8 +69,2 @@ import _debug from 'debug' | ||
export const defaultSortField = { | ||
field: '_id', | ||
serialize: _.toString, | ||
deserialize: (x: string) => new ObjectId(x), | ||
} | ||
const stateTransitions: StateTransitions<State> = { | ||
@@ -100,3 +96,3 @@ stopped: ['starting'], | ||
// Detect if resync flag is set | ||
/** Detect if resync flag is set */ | ||
const detectResync = (resyncCheckInterval = ms('1m')) => { | ||
@@ -149,3 +145,3 @@ let resyncTimer: NodeJS.Timer | ||
/** | ||
* Get the timestamp of the last record updated | ||
* Get the timestamp of the last record updated. Assumes the field is a date. | ||
*/ | ||
@@ -165,6 +161,6 @@ const getLastRecordUpdatedAt = (field: string): Promise<number | undefined> => | ||
const runInitialScan = async ( | ||
async function runInitialScan<T = any>( | ||
processRecords: ProcessRecords, | ||
options: QueueOptions & ScanOptions = {} | ||
) => { | ||
options: QueueOptions & ScanOptions<T> = {} | ||
) { | ||
let deferred: Deferred | ||
@@ -176,3 +172,44 @@ let cursor: ReturnType<typeof collection.find> | ||
}) | ||
const defaultSortField: SortField<ObjectId> = { | ||
field: '_id', | ||
serialize: _.toString, | ||
deserialize: (x: string) => new ObjectId(x), | ||
} | ||
const sortField = options.sortField || defaultSortField | ||
/** Get the last id inserted */ | ||
const getLastIdInserted = () => | ||
collection | ||
.find() | ||
.project({ [sortField.field]: 1 }) | ||
.sort({ [sortField.field]: -1 }) | ||
.limit(1) | ||
.toArray() | ||
.then((x) => { | ||
return x[0]?.[sortField.field] | ||
}) | ||
const getCursor = async () => { | ||
// Lookup last id successfully processed | ||
const lastIdProcessed = await redis.get(keys.lastScanIdKey) | ||
debug('Last id processed %s', lastIdProcessed) | ||
// Query collection | ||
return ( | ||
collection | ||
// Skip ids already processed | ||
.find( | ||
lastIdProcessed | ||
? { | ||
[sortField.field]: { | ||
$gt: sortField.deserialize(lastIdProcessed), | ||
}, | ||
} | ||
: {}, | ||
omit ? { projection: setDefaults(omit, 0) } : {} | ||
) | ||
.sort({ [sortField.field]: 1 }) | ||
) | ||
} | ||
/** | ||
@@ -222,2 +259,3 @@ * Periodically check that records are being processed. | ||
/** Start the initial scan */ | ||
const start = async () => { | ||
@@ -236,7 +274,4 @@ debug('Starting initial scan') | ||
deferred = defer() | ||
const sortField = options.sortField || defaultSortField | ||
// Redis keys | ||
const { scanCompletedKey, lastScanIdKey } = keys | ||
// Determine if initial scan has already completed | ||
const scanCompleted = await redis.get(scanCompletedKey) | ||
const scanCompleted = await redis.get(keys.scanCompletedKey) | ||
// Scan already completed so return | ||
@@ -258,8 +293,10 @@ if (scanCompleted) { | ||
await processRecords(records) | ||
debug('Processed %d records', records.length) | ||
const lastDocument = records[records.length - 1].fullDocument | ||
// Record last id of the batch | ||
const lastId = _.get(sortField.field, lastDocument) | ||
debug('Last id %s', lastId) | ||
if (lastId) { | ||
await redis.mset( | ||
lastScanIdKey, | ||
keys.lastScanIdKey, | ||
sortField.serialize(lastId), | ||
@@ -271,19 +308,12 @@ keys.lastScanProcessedAtKey, | ||
} | ||
// Lookup last id successfully processed | ||
const lastId = await redis.get(lastScanIdKey) | ||
debug('Last scan id %s', lastId) | ||
// Create queue | ||
const queue = batchQueue(_processRecords, options) | ||
// Query collection | ||
cursor = collection | ||
// Skip ids already processed | ||
.find( | ||
lastId | ||
? { [sortField.field]: { $gt: sortField.deserialize(lastId) } } | ||
: {}, | ||
omit ? { projection: setDefaults(omit, 0) } : {} | ||
) | ||
.sort({ [sortField.field]: 1 }) | ||
cursor = await getCursor() | ||
// Change state | ||
state.change('started') | ||
// Take a snapshot of the last id inserted into the collection | ||
const lastIdInserted = await getLastIdInserted() | ||
debug('Last id inserted %s', lastIdInserted) | ||
const ns = { db: collection.dbName, coll: collection.collectionName } | ||
@@ -294,13 +324,25 @@ // Process documents | ||
debug('Initial scan doc %O', doc) | ||
const changeStreamDoc = { | ||
fullDocument: doc, | ||
operationType: 'insert', | ||
ns, | ||
} as unknown as ChangeStreamInsertDocument | ||
await queue.enqueue(changeStreamDoc) | ||
// Doc can be null if cursor is closed | ||
if (doc) { | ||
const changeStreamDoc = { | ||
fullDocument: doc, | ||
operationType: 'insert', | ||
ns, | ||
} as unknown as ChangeStreamInsertDocument | ||
await queue.enqueue(changeStreamDoc) | ||
} | ||
} | ||
// Flush the queue | ||
await queue.flush() | ||
// Don't record scan complete if stopping | ||
if (!state.is('stopping')) { | ||
// Final id processed | ||
const finalIdProcessed = await redis.get(keys.lastScanIdKey) | ||
debug('Final id processed %s', finalIdProcessed) | ||
// Did we complete the initial scan? | ||
if ( | ||
// No records in the collection | ||
!lastIdInserted || | ||
// Final id processed was at least the last id inserted as of start | ||
(finalIdProcessed && | ||
sortField.deserialize(finalIdProcessed) >= lastIdInserted) | ||
) { | ||
debug('Completed initial scan') | ||
@@ -310,4 +352,7 @@ // Stop the health check | ||
// Record scan complete | ||
await redis.set(scanCompletedKey, new Date().toString()) | ||
await redis.set(keys.scanCompletedKey, new Date().toString()) | ||
// Emit event | ||
emit('initialScanComplete', { lastId: finalIdProcessed }) | ||
} | ||
// Resolve deferred | ||
deferred.done() | ||
@@ -317,2 +362,3 @@ debug('Exit initial scan') | ||
/** Stop the initial scan */ | ||
const stop = async () => { | ||
@@ -341,2 +387,3 @@ debug('Stopping initial scan') | ||
/** Restart the initial scan */ | ||
const restart = async () => { | ||
@@ -351,4 +398,2 @@ debug('Restarting initial scan') | ||
const defaultOptions = { fullDocument: 'updateLookup' } | ||
const processChangeStream = async ( | ||
@@ -365,2 +410,3 @@ processRecord: ProcessRecord, | ||
const pipeline = options.pipeline || [] | ||
const defaultOptions = { fullDocument: 'updateLookup' } | ||
@@ -392,3 +438,3 @@ /** | ||
debug('Last record created at %d', lastRecordUpdatedAt) | ||
// A record was updated but not synced within 5 seconds of being updated | ||
// A record was updated but not synced within maxSyncDelay of being updated | ||
if ( | ||
@@ -429,2 +475,3 @@ !state.is('stopped') && | ||
/** Start processing change stream */ | ||
const start = async () => { | ||
@@ -444,7 +491,5 @@ debug('Starting change stream') | ||
deferred = defer() | ||
// Redis keys | ||
const { changeStreamTokenKey } = keys | ||
// Lookup change stream token | ||
const token = await redis.get(changeStreamTokenKey) | ||
const changeStreamOptions = token | ||
const token = await redis.get(keys.changeStreamTokenKey) | ||
const changeStreamOptions: mongodb.ChangeStreamOptions = token | ||
? // Resume token found, so set change stream resume point | ||
@@ -463,3 +508,3 @@ { ...defaultOptions, resumeAfter: JSON.parse(token) } | ||
} | ||
// Get the change stream as an async iterator | ||
// Consume change stream | ||
while (await safelyCheckNext(changeStream)) { | ||
@@ -470,2 +515,3 @@ let event = await changeStream.next() | ||
const token = event?._id | ||
debug('token %o', token) | ||
// Omit nested fields that are not handled by $unset. | ||
@@ -480,3 +526,3 @@ // For example, if 'a' was omitted then 'a.b.c' should be omitted. | ||
await redis.mset( | ||
changeStreamTokenKey, | ||
keys.changeStreamTokenKey, | ||
JSON.stringify(token), | ||
@@ -491,2 +537,3 @@ keys.lastChangeProcessedAtKey, | ||
/** Stop processing change stream */ | ||
const stop = async () => { | ||
@@ -515,2 +562,3 @@ debug('Stopping change stream') | ||
/** Restart change stream */ | ||
const restart = async () => { | ||
@@ -517,0 +565,0 @@ debug('Restarting change stream') |
@@ -22,2 +22,9 @@ import { | ||
export interface SortField<T> { | ||
field: string | ||
/** Function to serialize value to string. */ | ||
serialize: (x: T) => string | ||
deserialize: (x: string) => T | ||
} | ||
export interface ScanOptions<T = any> { | ||
@@ -29,7 +36,4 @@ healthCheck?: { | ||
} | ||
sortField?: { | ||
field: string | ||
serialize: (x: T) => string | ||
deserialize: (x: string) => T | ||
} | ||
/** Defaults to _id */ | ||
sortField?: SortField<T> | ||
} | ||
@@ -64,2 +68,3 @@ | ||
| 'stateChange' | ||
| 'initialScanComplete' | ||
@@ -97,2 +102,7 @@ interface InitialScanFailEvent { | ||
export interface InitialScanComplete { | ||
type: 'initialScanComplete' | ||
lastId: string | ||
} | ||
// State | ||
@@ -99,0 +109,0 @@ |
@@ -5,3 +5,6 @@ import { Collection, ChangeStream, FindCursor } from 'mongodb' | ||
import { JSONSchema } from './types' | ||
import _debug from 'debug' | ||
const debug = _debug('mongochangestream') | ||
export const setDefaults = (keys: string[], val: any) => { | ||
@@ -62,5 +65,7 @@ const obj: Record<string, any> = {} | ||
export const safelyCheckNext = async (cursor: ChangeStream | FindCursor) => { | ||
debug('safelyCheckNext called') | ||
try { | ||
// Prevents hasNext from hanging when the cursor is already closed | ||
if (cursor.closed) { | ||
debug('safelyCheckNext cursor closed') | ||
return false | ||
@@ -70,4 +75,5 @@ } | ||
} catch (e) { | ||
debug('safelyCheckNext error: %o', e) | ||
return false | ||
} | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
84082
2092
219