mongochangestream
Advanced tools
Comparing version 0.2.0 to 0.3.0
@@ -0,1 +1,5 @@ | ||
# 0.3.0 | ||
* Batch `runInitialScan`. | ||
# 0.2.0 | ||
@@ -2,0 +6,0 @@ |
import { Collection } from 'mongodb'; | ||
import { ProcessRecord } from './types.js'; | ||
import { ProcessRecord, ProcessRecords } from './types.js'; | ||
import type { default as Redis } from 'ioredis'; | ||
export declare const initSync: (redis: Redis) => { | ||
runInitialScan: (collection: Collection, processRecord: ProcessRecord) => Promise<void>; | ||
runInitialScan: (collection: Collection, processRecords: ProcessRecords, batchSize?: number) => Promise<void>; | ||
processChangeStream: (collection: Collection, processRecord: ProcessRecord, pipeline?: Document[]) => Promise<void>; | ||
syncCollection: (collection: Collection, processRecord: ProcessRecord, pipeline?: Document[]) => void; | ||
reset: (collection: Collection) => Promise<void>; | ||
}; |
@@ -10,2 +10,3 @@ "use strict"; | ||
const debug_1 = __importDefault(require("debug")); | ||
const prom_utils_1 = require("prom-utils"); | ||
const debug = (0, debug_1.default)('connectors:mongodbChangeStream'); | ||
@@ -33,3 +34,3 @@ const keyPrefix = 'mongodbChangeStream'; | ||
*/ | ||
const runInitialScan = async (collection, processRecord) => { | ||
const runInitialScan = async (collection, processRecords, batchSize = 100) => { | ||
debug('Running initial scan'); | ||
@@ -48,2 +49,11 @@ // Redis keys | ||
debug('Last scan _id %s', lastId); | ||
const _processRecords = async (records) => { | ||
// Process batch of records | ||
await processRecords(records); | ||
// Record last id of the batch | ||
const lastId = records[records.length - 1].fullDocument._id.toString(); | ||
await redis.set(lastScanIdKey, lastId); | ||
}; | ||
// Create queue | ||
const queue = (0, prom_utils_1.batchQueue)(_processRecords, batchSize); | ||
// Query collection | ||
@@ -63,7 +73,6 @@ const cursor = collection | ||
}; | ||
// Process record | ||
await processRecord(changeStreamDoc); | ||
// Record that this document was successfully processed | ||
await redis.set(lastScanIdKey, doc._id.toString()); | ||
await queue.enqueue(changeStreamDoc); | ||
} | ||
// Flush the queue | ||
await queue.flush(); | ||
// Record scan complete | ||
@@ -99,11 +108,2 @@ await redis.set(scanCompletedKey, new Date().toString()); | ||
/** | ||
* Sync a MongoDB collection. | ||
*/ | ||
const syncCollection = (collection, processRecord, pipeline = []) => { | ||
// Process the change stream | ||
processChangeStream(collection, processRecord, pipeline); | ||
// Run the initial scan | ||
runInitialScan(collection, processRecord); | ||
}; | ||
/** | ||
* Reset Redis state. | ||
@@ -118,3 +118,2 @@ */ | ||
processChangeStream, | ||
syncCollection, | ||
reset, | ||
@@ -121,0 +120,0 @@ }; |
@@ -1,2 +0,3 @@ | ||
import { ChangeStreamDocument } from 'mongodb'; | ||
import { ChangeStreamDocument, ChangeStreamInsertDocument } from 'mongodb'; | ||
export declare type ProcessRecord = (doc: ChangeStreamDocument) => void | Promise<void>; | ||
export declare type ProcessRecords = (doc: ChangeStreamInsertDocument[]) => void | Promise<void>; |
{ | ||
"name": "mongochangestream", | ||
"version": "0.2.0", | ||
"version": "0.3.0", | ||
"description": "Sync MongoDB collections via change streams into any database.", | ||
@@ -44,4 +44,5 @@ "author": "GovSpend", | ||
"dependencies": { | ||
"debug": "^4.3.4" | ||
"debug": "^4.3.4", | ||
"prom-utils": "^0.1.0" | ||
} | ||
} |
@@ -39,8 +39,7 @@ # Mongo Change Stream | ||
Below are the available methods. You can call `runInitialScan` and `processChangeStream` | ||
separately, but the most straightforward way is to call `syncCollection` which combines | ||
both functions. | ||
Below are the available methods. | ||
The `processChangeStream` method will never complete, but `runInitialScan` will complete | ||
once it has scanned all documents in the collection. | ||
once it has scanned all documents in the collection. `runInitialScan` batches records for | ||
efficiency. | ||
@@ -52,7 +51,10 @@ The `reset` method will delete all relevant keys for a given collection in Redis. | ||
export type ProcessRecord = (doc: ChangeStreamDocument) => void | Promise<void> | ||
export type ProcessRecord = ( | ||
doc: ChangeStreamDocument | ChangeStreamDocument[] | ||
) => void | Promise<void> | ||
const runInitialScan = async ( | ||
collection: Collection, | ||
processRecord: ProcessRecord | ||
processRecord: ProcessRecord, | ||
batchSize = 100 | ||
): Promise<void> => ... | ||
@@ -66,8 +68,2 @@ | ||
const syncCollection = ( | ||
collection: Collection, | ||
processRecord: ProcessRecord, | ||
pipeline: Document[] = [] | ||
): void => ... | ||
const reset = async (collection: Collection): Promise<void> => ... | ||
@@ -74,0 +70,0 @@ ``` |
import { ChangeStreamInsertDocument, Collection, ObjectId } from 'mongodb' | ||
import changeStreamToIterator from './changeStreamToIterator.js' | ||
import { ProcessRecord } from './types.js' | ||
import { ProcessRecord, ProcessRecords } from './types.js' | ||
import _debug from 'debug' | ||
import type { default as Redis } from 'ioredis' | ||
import { batchQueue } from 'prom-utils' | ||
@@ -36,3 +37,4 @@ const debug = _debug('connectors:mongodbChangeStream') | ||
collection: Collection, | ||
processRecord: ProcessRecord | ||
processRecords: ProcessRecords, | ||
batchSize = 100 | ||
) => { | ||
@@ -52,2 +54,11 @@ debug('Running initial scan') | ||
debug('Last scan _id %s', lastId) | ||
const _processRecords = async (records: ChangeStreamInsertDocument[]) => { | ||
// Process batch of records | ||
await processRecords(records) | ||
// Record last id of the batch | ||
const lastId = records[records.length - 1].fullDocument._id.toString() | ||
await redis.set(lastScanIdKey, lastId) | ||
} | ||
// Create queue | ||
const queue = batchQueue(_processRecords, batchSize) | ||
// Query collection | ||
@@ -67,7 +78,6 @@ const cursor = collection | ||
} as unknown as ChangeStreamInsertDocument | ||
// Process record | ||
await processRecord(changeStreamDoc) | ||
// Record that this document was successfully processed | ||
await redis.set(lastScanIdKey, doc._id.toString()) | ||
await queue.enqueue(changeStreamDoc) | ||
} | ||
// Flush the queue | ||
await queue.flush() | ||
// Record scan complete | ||
@@ -108,15 +118,2 @@ await redis.set(scanCompletedKey, new Date().toString()) | ||
} | ||
/** | ||
* Sync a MongoDB collection. | ||
*/ | ||
const syncCollection = ( | ||
collection: Collection, | ||
processRecord: ProcessRecord, | ||
pipeline: Document[] = [] | ||
) => { | ||
// Process the change stream | ||
processChangeStream(collection, processRecord, pipeline) | ||
// Run the initial scan | ||
runInitialScan(collection, processRecord) | ||
} | ||
@@ -134,5 +131,4 @@ /** | ||
processChangeStream, | ||
syncCollection, | ||
reset, | ||
} | ||
} |
@@ -1,3 +0,7 @@ | ||
import { ChangeStreamDocument } from 'mongodb' | ||
import { ChangeStreamDocument, ChangeStreamInsertDocument } from 'mongodb' | ||
export type ProcessRecord = (doc: ChangeStreamDocument) => void | Promise<void> | ||
export type ProcessRecords = ( | ||
doc: ChangeStreamInsertDocument[] | ||
) => void | Promise<void> |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
18853
4
367
154
+ Addedprom-utils@^0.1.0
+ Addedprom-utils@0.1.0(transitive)