mongochangestream
Advanced tools
Comparing version 0.5.0 to 0.6.0
@@ -0,1 +1,5 @@ | ||
# 0.6.0 | ||
* `sortField` option for overriding the default sorting field of `_id`. | ||
# 0.5.0 | ||
@@ -2,0 +6,0 @@ |
@@ -1,9 +0,15 @@ | ||
import { Collection } from 'mongodb'; | ||
import { ProcessRecord, ProcessRecords } from './types.js'; | ||
import _ from 'lodash/fp.js'; | ||
import { Collection, ObjectId } from 'mongodb'; | ||
import { ProcessRecord, ProcessRecords, SyncOptions } from './types.js'; | ||
import type { default as Redis } from 'ioredis'; | ||
import { QueueOptions } from 'prom-utils'; | ||
export declare const defaultSortField: { | ||
field: string; | ||
serialize: _.LodashToString; | ||
deserialize: (x: string) => ObjectId; | ||
}; | ||
export declare const initSync: (redis: Redis) => { | ||
runInitialScan: (collection: Collection, processRecords: ProcessRecords, options?: QueueOptions) => Promise<void>; | ||
runInitialScan: (collection: Collection, processRecords: ProcessRecords, options?: QueueOptions & SyncOptions) => Promise<void>; | ||
processChangeStream: (collection: Collection, processRecord: ProcessRecord, pipeline?: Document[]) => Promise<void>; | ||
reset: (collection: Collection) => Promise<void>; | ||
}; |
@@ -6,3 +6,4 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.initSync = void 0; | ||
exports.initSync = exports.defaultSortField = void 0; | ||
const fp_js_1 = __importDefault(require("lodash/fp.js")); | ||
const mongodb_1 = require("mongodb"); | ||
@@ -30,2 +31,7 @@ const changeStreamToIterator_js_1 = __importDefault(require("./changeStreamToIterator.js")); | ||
}; | ||
exports.defaultSortField = { | ||
field: '_id', | ||
serialize: fp_js_1.default.toString, | ||
deserialize: (x) => new mongodb_1.ObjectId(x), | ||
}; | ||
const initSync = (redis) => { | ||
@@ -37,2 +43,3 @@ /** | ||
debug('Running initial scan'); | ||
const sortField = options?.sortField || exports.defaultSortField; | ||
// Redis keys | ||
@@ -47,12 +54,15 @@ const { scanCompletedKey, lastScanIdKey } = getKeys(collection); | ||
} | ||
// Lookup last _id successfully processed | ||
const lastId = await redis.get(lastScanIdKey); | ||
debug('Last scan _id %s', lastId); | ||
const _processRecords = async (records) => { | ||
// Process batch of records | ||
await processRecords(records); | ||
const lastDocument = records[records.length - 1].fullDocument; | ||
// Record last id of the batch | ||
const lastId = records[records.length - 1].fullDocument._id.toString(); | ||
await redis.set(lastScanIdKey, lastId); | ||
const lastId = fp_js_1.default.get(sortField.field, lastDocument); | ||
if (lastId) { | ||
await redis.set(lastScanIdKey, sortField.serialize(lastId)); | ||
} | ||
}; | ||
// Lookup last id successfully processed | ||
const lastId = await redis.get(lastScanIdKey); | ||
debug('Last scan id %s', lastId); | ||
// Create queue | ||
@@ -63,4 +73,6 @@ const queue = (0, prom_utils_1.batchQueue)(_processRecords, options); | ||
// Skip ids already processed | ||
.find(lastId ? { _id: { $gt: new mongodb_1.ObjectId(lastId) } } : {}) | ||
.sort({ _id: 1 }); | ||
.find(lastId | ||
? { [sortField.field]: { $gt: sortField.deserialize(lastId) } } | ||
: {}) | ||
.sort({ [sortField.field]: 1 }); | ||
const ns = { db: collection.dbName, coll: collection.collectionName }; | ||
@@ -67,0 +79,0 @@ // Process documents |
import { ChangeStreamDocument, ChangeStreamInsertDocument } from 'mongodb'; | ||
export declare type ProcessRecord = (doc: ChangeStreamDocument) => void | Promise<void>; | ||
export declare type ProcessRecords = (doc: ChangeStreamInsertDocument[]) => void | Promise<void>; | ||
export interface SyncOptions<T = any> { | ||
sortField?: { | ||
field: string; | ||
serialize: (x: T) => string; | ||
deserialize: (x: string) => T; | ||
}; | ||
} |
{ | ||
"name": "mongochangestream", | ||
"version": "0.5.0", | ||
"version": "0.6.0", | ||
"description": "Sync MongoDB collections via change streams into any database.", | ||
@@ -36,2 +36,3 @@ "author": "GovSpend", | ||
"@types/debug": "^4.1.7", | ||
"@types/lodash": "^4.14.182", | ||
"@typescript-eslint/eslint-plugin": "^5.32.0", | ||
@@ -46,4 +47,5 @@ "eslint": "^8.21.0", | ||
"debug": "^4.3.4", | ||
"lodash": "^4.17.21", | ||
"prom-utils": "^0.2.0" | ||
} | ||
} |
@@ -0,4 +1,5 @@ | ||
import _ from 'lodash/fp.js' | ||
import { ChangeStreamInsertDocument, Collection, ObjectId } from 'mongodb' | ||
import changeStreamToIterator from './changeStreamToIterator.js' | ||
import { ProcessRecord, ProcessRecords } from './types.js' | ||
import { ProcessRecord, ProcessRecords, SyncOptions } from './types.js' | ||
import _debug from 'debug' | ||
@@ -31,2 +32,8 @@ import type { default as Redis } from 'ioredis' | ||
export const defaultSortField = { | ||
field: '_id', | ||
serialize: _.toString, | ||
deserialize: (x: string) => new ObjectId(x), | ||
} | ||
export const initSync = (redis: Redis) => { | ||
@@ -39,5 +46,6 @@ /** | ||
processRecords: ProcessRecords, | ||
options?: QueueOptions | ||
options?: QueueOptions & SyncOptions | ||
) => { | ||
debug('Running initial scan') | ||
const sortField = options?.sortField || defaultSortField | ||
// Redis keys | ||
@@ -52,12 +60,15 @@ const { scanCompletedKey, lastScanIdKey } = getKeys(collection) | ||
} | ||
// Lookup last _id successfully processed | ||
const lastId = await redis.get(lastScanIdKey) | ||
debug('Last scan _id %s', lastId) | ||
const _processRecords = async (records: ChangeStreamInsertDocument[]) => { | ||
// Process batch of records | ||
await processRecords(records) | ||
const lastDocument = records[records.length - 1].fullDocument | ||
// Record last id of the batch | ||
const lastId = records[records.length - 1].fullDocument._id.toString() | ||
await redis.set(lastScanIdKey, lastId) | ||
const lastId = _.get(sortField.field, lastDocument) | ||
if (lastId) { | ||
await redis.set(lastScanIdKey, sortField.serialize(lastId)) | ||
} | ||
} | ||
// Lookup last id successfully processed | ||
const lastId = await redis.get(lastScanIdKey) | ||
debug('Last scan id %s', lastId) | ||
// Create queue | ||
@@ -68,4 +79,8 @@ const queue = batchQueue(_processRecords, options) | ||
// Skip ids already processed | ||
.find(lastId ? { _id: { $gt: new ObjectId(lastId) } } : {}) | ||
.sort({ _id: 1 }) | ||
.find( | ||
lastId | ||
? { [sortField.field]: { $gt: sortField.deserialize(lastId) } } | ||
: {} | ||
) | ||
.sort({ [sortField.field]: 1 }) | ||
const ns = { db: collection.dbName, coll: collection.collectionName } | ||
@@ -72,0 +87,0 @@ // Process documents |
@@ -8,1 +8,9 @@ import { ChangeStreamDocument, ChangeStreamInsertDocument } from 'mongodb' | ||
) => void | Promise<void> | ||
export interface SyncOptions<T = any> { | ||
sortField?: { | ||
field: string | ||
serialize: (x: T) => string | ||
deserialize: (x: string) => T | ||
} | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
20622
414
5
8
+ Addedlodash@^4.17.21
+ Addedlodash@4.17.21(transitive)