mongochangestream
Advanced tools
Comparing version 0.18.0 to 0.19.0
@@ -0,1 +1,8 @@ | ||
# 0.19.0 | ||
- BREAKING CHANGE: Changed API for `runInitialScan`. You must explicitly call `start` now. | ||
The change stream can be stopped by calling `stop`. | ||
- It is now possible to cleanly stop `runInitialScan` and `processChangeStream`, allowing | ||
for a smooth restarting behavior if a schema change is detected. | ||
# 0.18.0 | ||
@@ -2,0 +9,0 @@ |
@@ -1,10 +0,17 @@ | ||
import { ChangeStreamOptions, Collection, Document } from 'mongodb'; | ||
declare const changeStreamToIterator: (collection: Collection, pipeline: Document[], options: ChangeStreamOptions) => { | ||
import { ChangeStreamDocument, ChangeStream } from 'mongodb'; | ||
declare const toIterator: (changeStream: ChangeStream) => { | ||
[Symbol.asyncIterator](): { | ||
next(): Promise<{ | ||
value: import("mongodb").ChangeStreamDocument<Document>; | ||
value: ChangeStreamDocument<import("bson").Document>; | ||
done: boolean; | ||
} | { | ||
value: ChangeStreamDocument<import("bson").Document>; | ||
done: boolean; | ||
}>; | ||
return(): Promise<{ | ||
value: ChangeStreamDocument<import("bson").Document>; | ||
done: boolean; | ||
}>; | ||
}; | ||
}; | ||
export default changeStreamToIterator; | ||
export default toIterator; |
"use strict"; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const debug_1 = __importDefault(require("debug")); | ||
const debug = (0, debug_1.default)('mongochangestream'); | ||
const changeStreamToIterator = (collection, pipeline, options) => { | ||
const changeStream = collection.watch(pipeline, options); | ||
debug('Started change stream - pipeline %O options %O', pipeline, options); | ||
const prom_utils_1 = require("prom-utils"); | ||
const done = { value: {}, done: true }; | ||
const toIterator = (changeStream) => { | ||
const deferred = (0, prom_utils_1.defer)(); | ||
changeStream.once('close', deferred.done); | ||
return { | ||
@@ -15,6 +12,11 @@ [Symbol.asyncIterator]() { | ||
async next() { | ||
return changeStream | ||
.next() | ||
.then((data) => ({ value: data, done: false })); | ||
return Promise.race([ | ||
deferred.promise.then(() => done), | ||
changeStream.next().then((data) => ({ value: data, done: false })), | ||
]); | ||
}, | ||
async return() { | ||
await changeStream.close(); | ||
return done; | ||
}, | ||
}; | ||
@@ -24,2 +26,2 @@ }, | ||
}; | ||
exports.default = changeStreamToIterator; | ||
exports.default = toIterator; |
@@ -14,3 +14,6 @@ /// <reference types="node" /> | ||
export declare const initSync: (redis: Redis, collection: Collection, options?: SyncOptions) => { | ||
runInitialScan: (processRecords: ProcessRecords, options?: QueueOptions & ScanOptions) => Promise<void>; | ||
runInitialScan: (processRecords: ProcessRecords, options?: QueueOptions & ScanOptions) => Promise<{ | ||
start: () => Promise<void>; | ||
stop: () => Promise<void>; | ||
}>; | ||
processChangeStream: (processRecord: ProcessRecord, pipeline?: Document[]) => Promise<{ | ||
@@ -17,0 +20,0 @@ start: () => Promise<void>; |
@@ -48,51 +48,65 @@ "use strict"; | ||
const runInitialScan = async (processRecords, options) => { | ||
debug('Running initial scan'); | ||
const sortField = options?.sortField || exports.defaultSortField; | ||
// Redis keys | ||
const { scanCompletedKey, lastScanIdKey } = keys; | ||
// Determine if initial scan has already completed | ||
const scanCompleted = await redis.get(scanCompletedKey); | ||
// Scan already completed so return | ||
if (scanCompleted) { | ||
debug(`Initial scan previously completed on %s`, scanCompleted); | ||
return; | ||
} | ||
const _processRecords = async (records) => { | ||
// Process batch of records | ||
await processRecords(records); | ||
const lastDocument = records[records.length - 1].fullDocument; | ||
// Record last id of the batch | ||
const lastId = fp_js_1.default.get(sortField.field, lastDocument); | ||
if (lastId) { | ||
await redis.set(lastScanIdKey, sortField.serialize(lastId)); | ||
let deferred; | ||
let cursor; | ||
const start = async () => { | ||
debug('Starting initial scan'); | ||
const sortField = options?.sortField || exports.defaultSortField; | ||
// Redis keys | ||
const { scanCompletedKey, lastScanIdKey } = keys; | ||
// Determine if initial scan has already completed | ||
const scanCompleted = await redis.get(scanCompletedKey); | ||
// Scan already completed so return | ||
if (scanCompleted) { | ||
debug(`Initial scan previously completed on %s`, scanCompleted); | ||
return; | ||
} | ||
const _processRecords = async (records) => { | ||
// Process batch of records | ||
await processRecords(records); | ||
const lastDocument = records[records.length - 1].fullDocument; | ||
// Record last id of the batch | ||
const lastId = fp_js_1.default.get(sortField.field, lastDocument); | ||
if (lastId) { | ||
await redis.set(lastScanIdKey, sortField.serialize(lastId)); | ||
} | ||
}; | ||
// Lookup last id successfully processed | ||
const lastId = await redis.get(lastScanIdKey); | ||
debug('Last scan id %s', lastId); | ||
// Create queue | ||
const queue = (0, prom_utils_1.batchQueue)(_processRecords, options); | ||
// Query collection | ||
cursor = collection | ||
// Skip ids already processed | ||
.find(lastId | ||
? { [sortField.field]: { $gt: sortField.deserialize(lastId) } } | ||
: {}, omit ? { projection: (0, util_js_1.setDefaults)(omit, 0) } : {}) | ||
.sort({ [sortField.field]: 1 }); | ||
const ns = { db: collection.dbName, coll: collection.collectionName }; | ||
// Process documents | ||
for await (const doc of cursor) { | ||
debug('Initial scan doc %O', doc); | ||
deferred = (0, prom_utils_1.defer)(); | ||
const changeStreamDoc = { | ||
fullDocument: doc, | ||
operationType: 'insert', | ||
ns, | ||
}; | ||
await queue.enqueue(changeStreamDoc); | ||
deferred.done(); | ||
} | ||
// Flush the queue | ||
await queue.flush(); | ||
// Record scan complete | ||
await redis.set(scanCompletedKey, new Date().toString()); | ||
debug('Completed initial scan'); | ||
}; | ||
// Lookup last id successfully processed | ||
const lastId = await redis.get(lastScanIdKey); | ||
debug('Last scan id %s', lastId); | ||
// Create queue | ||
const queue = (0, prom_utils_1.batchQueue)(_processRecords, options); | ||
// Query collection | ||
const cursor = collection | ||
// Skip ids already processed | ||
.find(lastId | ||
? { [sortField.field]: { $gt: sortField.deserialize(lastId) } } | ||
: {}, omit ? { projection: (0, util_js_1.setDefaults)(omit, 0) } : {}) | ||
.sort({ [sortField.field]: 1 }); | ||
const ns = { db: collection.dbName, coll: collection.collectionName }; | ||
// Process documents | ||
for await (const doc of cursor) { | ||
debug('Initial scan doc %O', doc); | ||
const changeStreamDoc = { | ||
fullDocument: doc, | ||
operationType: 'insert', | ||
ns, | ||
}; | ||
await queue.enqueue(changeStreamDoc); | ||
} | ||
// Flush the queue | ||
await queue.flush(); | ||
// Record scan complete | ||
await redis.set(scanCompletedKey, new Date().toString()); | ||
debug('Completed initial scan'); | ||
const stop = async () => { | ||
debug('Stopping initial scan'); | ||
// Wait for event to be processed | ||
await deferred?.promise; | ||
// Close the cursor | ||
await cursor?.close(); | ||
}; | ||
return { start, stop }; | ||
}; | ||
@@ -109,21 +123,20 @@ const defaultOptions = { fullDocument: 'updateLookup' }; | ||
const processChangeStream = async (processRecord, pipeline = []) => { | ||
const abortController = new AbortController(); | ||
let deferred; | ||
// Redis keys | ||
const { changeStreamTokenKey } = keys; | ||
// Lookup change stream token | ||
const token = await redis.get(changeStreamTokenKey); | ||
const options = token | ||
? // Resume token found, so set change stream resume point | ||
{ ...defaultOptions, resumeAfter: JSON.parse(token) } | ||
: defaultOptions; | ||
// Get the change stream as an async iterator | ||
const changeStream = (0, changeStreamToIterator_js_1.default)(collection, [...omitPipeline, ...pipeline], options); | ||
let changeStream; | ||
const start = async () => { | ||
for await (let event of changeStream) { | ||
debug('Starting change stream'); | ||
// Redis keys | ||
const { changeStreamTokenKey } = keys; | ||
// Lookup change stream token | ||
const token = await redis.get(changeStreamTokenKey); | ||
const options = token | ||
? // Resume token found, so set change stream resume point | ||
{ ...defaultOptions, resumeAfter: JSON.parse(token) } | ||
: defaultOptions; | ||
// Start the change stream | ||
changeStream = collection.watch([...omitPipeline, ...pipeline], options); | ||
const iterator = (0, changeStreamToIterator_js_1.default)(changeStream); | ||
// Get the change stream as an async iterator | ||
for await (let event of iterator) { | ||
debug('Change stream event %O', event); | ||
// Don't process event if stopping | ||
if (abortController.signal.aborted) { | ||
return; | ||
} | ||
deferred = (0, prom_utils_1.defer)(); | ||
@@ -144,6 +157,7 @@ // Get resume token | ||
}; | ||
const stop = () => { | ||
abortController.abort(); | ||
const stop = async () => { | ||
debug('Stopping change stream'); | ||
await changeStream.close(); | ||
// Wait for event to be processed | ||
return deferred?.promise; | ||
await deferred?.promise; | ||
}; | ||
@@ -206,5 +220,5 @@ return { start, stop }; | ||
const start = () => { | ||
debug('Started polling for schema changes'); | ||
debug('Starting polling for schema changes'); | ||
checkForSchemaChange(); | ||
// Perform an inital check | ||
checkForSchemaChange(); | ||
// Check for schema changes every interval | ||
@@ -214,3 +228,3 @@ timer = setInterval(checkForSchemaChange, interval); | ||
const stop = () => { | ||
debug('Stopped polling for schema changes'); | ||
debug('Stopping polling for schema changes'); | ||
clearInterval(timer); | ||
@@ -217,0 +231,0 @@ }; |
{ | ||
"name": "mongochangestream", | ||
"version": "0.18.0", | ||
"version": "0.19.0", | ||
"description": "Sync MongoDB collections via change streams into any database.", | ||
@@ -5,0 +5,0 @@ "author": "GovSpend", |
@@ -1,13 +0,9 @@ | ||
import { ChangeStreamOptions, Collection, Document } from 'mongodb' | ||
import _debug from 'debug' | ||
import { defer } from 'prom-utils' | ||
import { ChangeStreamDocument, ChangeStream } from 'mongodb' | ||
const debug = _debug('mongochangestream') | ||
const done = { value: {} as ChangeStreamDocument, done: true } | ||
const changeStreamToIterator = ( | ||
collection: Collection, | ||
pipeline: Document[], | ||
options: ChangeStreamOptions | ||
) => { | ||
const changeStream = collection.watch(pipeline, options) | ||
debug('Started change stream - pipeline %O options %O', pipeline, options) | ||
const toIterator = (changeStream: ChangeStream) => { | ||
const deferred = defer() | ||
changeStream.once('close', deferred.done) | ||
return { | ||
@@ -17,6 +13,11 @@ [Symbol.asyncIterator]() { | ||
async next() { | ||
return changeStream | ||
.next() | ||
.then((data) => ({ value: data, done: false })) | ||
return Promise.race([ | ||
deferred.promise.then(() => done), | ||
changeStream.next().then((data) => ({ value: data, done: false })), | ||
]) | ||
}, | ||
async return() { | ||
await changeStream.close() | ||
return done | ||
}, | ||
} | ||
@@ -26,2 +27,2 @@ }, | ||
} | ||
export default changeStreamToIterator | ||
export default toIterator |
@@ -8,4 +8,5 @@ import _ from 'lodash/fp.js' | ||
Db, | ||
ChangeStream, | ||
} from 'mongodb' | ||
import changeStreamToIterator from './changeStreamToIterator.js' | ||
import toIterator from './changeStreamToIterator.js' | ||
import { | ||
@@ -73,54 +74,71 @@ SyncOptions, | ||
) => { | ||
debug('Running initial scan') | ||
const sortField = options?.sortField || defaultSortField | ||
// Redis keys | ||
const { scanCompletedKey, lastScanIdKey } = keys | ||
// Determine if initial scan has already completed | ||
const scanCompleted = await redis.get(scanCompletedKey) | ||
// Scan already completed so return | ||
if (scanCompleted) { | ||
debug(`Initial scan previously completed on %s`, scanCompleted) | ||
return | ||
} | ||
const _processRecords = async (records: ChangeStreamInsertDocument[]) => { | ||
// Process batch of records | ||
await processRecords(records) | ||
const lastDocument = records[records.length - 1].fullDocument | ||
// Record last id of the batch | ||
const lastId = _.get(sortField.field, lastDocument) | ||
if (lastId) { | ||
await redis.set(lastScanIdKey, sortField.serialize(lastId)) | ||
let deferred: Deferred | ||
let cursor: ReturnType<typeof collection.find> | ||
const start = async () => { | ||
debug('Starting initial scan') | ||
const sortField = options?.sortField || defaultSortField | ||
// Redis keys | ||
const { scanCompletedKey, lastScanIdKey } = keys | ||
// Determine if initial scan has already completed | ||
const scanCompleted = await redis.get(scanCompletedKey) | ||
// Scan already completed so return | ||
if (scanCompleted) { | ||
debug(`Initial scan previously completed on %s`, scanCompleted) | ||
return | ||
} | ||
const _processRecords = async (records: ChangeStreamInsertDocument[]) => { | ||
// Process batch of records | ||
await processRecords(records) | ||
const lastDocument = records[records.length - 1].fullDocument | ||
// Record last id of the batch | ||
const lastId = _.get(sortField.field, lastDocument) | ||
if (lastId) { | ||
await redis.set(lastScanIdKey, sortField.serialize(lastId)) | ||
} | ||
} | ||
// Lookup last id successfully processed | ||
const lastId = await redis.get(lastScanIdKey) | ||
debug('Last scan id %s', lastId) | ||
// Create queue | ||
const queue = batchQueue(_processRecords, options) | ||
// Query collection | ||
cursor = collection | ||
// Skip ids already processed | ||
.find( | ||
lastId | ||
? { [sortField.field]: { $gt: sortField.deserialize(lastId) } } | ||
: {}, | ||
omit ? { projection: setDefaults(omit, 0) } : {} | ||
) | ||
.sort({ [sortField.field]: 1 }) | ||
const ns = { db: collection.dbName, coll: collection.collectionName } | ||
// Process documents | ||
for await (const doc of cursor) { | ||
debug('Initial scan doc %O', doc) | ||
deferred = defer() | ||
const changeStreamDoc = { | ||
fullDocument: doc, | ||
operationType: 'insert', | ||
ns, | ||
} as unknown as ChangeStreamInsertDocument | ||
await queue.enqueue(changeStreamDoc) | ||
deferred.done() | ||
} | ||
// Flush the queue | ||
await queue.flush() | ||
// Record scan complete | ||
await redis.set(scanCompletedKey, new Date().toString()) | ||
debug('Completed initial scan') | ||
} | ||
// Lookup last id successfully processed | ||
const lastId = await redis.get(lastScanIdKey) | ||
debug('Last scan id %s', lastId) | ||
// Create queue | ||
const queue = batchQueue(_processRecords, options) | ||
// Query collection | ||
const cursor = collection | ||
// Skip ids already processed | ||
.find( | ||
lastId | ||
? { [sortField.field]: { $gt: sortField.deserialize(lastId) } } | ||
: {}, | ||
omit ? { projection: setDefaults(omit, 0) } : {} | ||
) | ||
.sort({ [sortField.field]: 1 }) | ||
const ns = { db: collection.dbName, coll: collection.collectionName } | ||
// Process documents | ||
for await (const doc of cursor) { | ||
debug('Initial scan doc %O', doc) | ||
const changeStreamDoc = { | ||
fullDocument: doc, | ||
operationType: 'insert', | ||
ns, | ||
} as unknown as ChangeStreamInsertDocument | ||
await queue.enqueue(changeStreamDoc) | ||
const stop = async () => { | ||
debug('Stopping initial scan') | ||
// Wait for event to be processed | ||
await deferred?.promise | ||
// Close the cursor | ||
await cursor?.close() | ||
} | ||
// Flush the queue | ||
await queue.flush() | ||
// Record scan complete | ||
await redis.set(scanCompletedKey, new Date().toString()) | ||
debug('Completed initial scan') | ||
return { start, stop } | ||
} | ||
@@ -142,25 +160,21 @@ | ||
) => { | ||
const abortController = new AbortController() | ||
let deferred: Deferred | ||
// Redis keys | ||
const { changeStreamTokenKey } = keys | ||
// Lookup change stream token | ||
const token = await redis.get(changeStreamTokenKey) | ||
const options = token | ||
? // Resume token found, so set change stream resume point | ||
{ ...defaultOptions, resumeAfter: JSON.parse(token) } | ||
: defaultOptions | ||
// Get the change stream as an async iterator | ||
const changeStream = changeStreamToIterator( | ||
collection, | ||
[...omitPipeline, ...pipeline], | ||
options | ||
) | ||
let changeStream: ChangeStream | ||
const start = async () => { | ||
for await (let event of changeStream) { | ||
debug('Starting change stream') | ||
// Redis keys | ||
const { changeStreamTokenKey } = keys | ||
// Lookup change stream token | ||
const token = await redis.get(changeStreamTokenKey) | ||
const options = token | ||
? // Resume token found, so set change stream resume point | ||
{ ...defaultOptions, resumeAfter: JSON.parse(token) } | ||
: defaultOptions | ||
// Start the change stream | ||
changeStream = collection.watch([...omitPipeline, ...pipeline], options) | ||
const iterator = toIterator(changeStream) | ||
// Get the change stream as an async iterator | ||
for await (let event of iterator) { | ||
debug('Change stream event %O', event) | ||
// Don't process event if stopping | ||
if (abortController.signal.aborted) { | ||
return | ||
} | ||
deferred = defer() | ||
@@ -181,7 +195,10 @@ // Get resume token | ||
} | ||
const stop = () => { | ||
abortController.abort() | ||
const stop = async () => { | ||
debug('Stopping change stream') | ||
await changeStream.close() | ||
// Wait for event to be processed | ||
return deferred?.promise | ||
await deferred?.promise | ||
} | ||
return { start, stop } | ||
@@ -248,5 +265,5 @@ } | ||
const start = () => { | ||
debug('Started polling for schema changes') | ||
debug('Starting polling for schema changes') | ||
checkForSchemaChange() | ||
// Perform an inital check | ||
checkForSchemaChange() | ||
// Check for schema changes every interval | ||
@@ -256,3 +273,3 @@ timer = setInterval(checkForSchemaChange, interval) | ||
const stop = () => { | ||
debug('Stopped polling for schema changes') | ||
debug('Stopping polling for schema changes') | ||
clearInterval(timer) | ||
@@ -259,0 +276,0 @@ } |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
34569
747