mongochangestream
Advanced tools
Comparing version 0.16.0 to 0.17.0
@@ -0,1 +1,6 @@ | ||
# 0.17.0 | ||
- BREAKING CHANGE: `initSync` now takes `collection`. | ||
- NEW: `detectSchemaChange`. | ||
# 0.16.0 | ||
@@ -2,0 +7,0 @@ |
@@ -0,14 +1,8 @@ | ||
/// <reference types="node" /> | ||
import _ from 'lodash/fp.js'; | ||
import { Collection, Document, ObjectId } from 'mongodb'; | ||
import { Collection, Document, ObjectId, Db } from 'mongodb'; | ||
import { SyncOptions, ProcessRecord, ProcessRecords, ScanOptions } from './types.js'; | ||
import type { default as Redis } from 'ioredis'; | ||
import { QueueOptions } from 'prom-utils'; | ||
/** | ||
* Get Redis keys used for the given collection. | ||
*/ | ||
export declare const getKeys: (collection: Collection) => { | ||
scanCompletedKey: string; | ||
lastScanIdKey: string; | ||
changeStreamTokenKey: string; | ||
}; | ||
import events from 'node:events'; | ||
export declare const defaultSortField: { | ||
@@ -19,10 +13,22 @@ field: string; | ||
}; | ||
export declare const initSync: (redis: Redis, options?: SyncOptions) => { | ||
runInitialScan: (collection: Collection, processRecords: ProcessRecords, options?: QueueOptions & ScanOptions) => Promise<void>; | ||
processChangeStream: (collection: Collection, processRecord: ProcessRecord, pipeline?: Document[]) => Promise<{ | ||
export declare const initSync: (redis: Redis, collection: Collection, options?: SyncOptions) => { | ||
runInitialScan: (processRecords: ProcessRecords, options?: QueueOptions & ScanOptions) => Promise<void>; | ||
processChangeStream: (processRecord: ProcessRecord, pipeline?: Document[]) => Promise<{ | ||
start: () => Promise<void>; | ||
stop: () => void; | ||
}>; | ||
reset: (collection: Collection) => Promise<void>; | ||
clearCompletedOn: (collection: Collection) => Promise<void>; | ||
reset: () => Promise<void>; | ||
clearCompletedOn: () => Promise<void>; | ||
getCollectionSchema: (db: Db) => Promise<object | undefined>; | ||
detectSchemaChange: (db: Db, interval?: number) => Promise<{ | ||
start: () => void; | ||
stop: () => void; | ||
emitter: events; | ||
}>; | ||
keys: { | ||
scanCompletedKey: string; | ||
lastScanIdKey: string; | ||
changeStreamTokenKey: string; | ||
schemaKey: string; | ||
}; | ||
}; |
@@ -6,3 +6,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.initSync = exports.defaultSortField = exports.getKeys = void 0; | ||
exports.initSync = exports.defaultSortField = void 0; | ||
const fp_js_1 = __importDefault(require("lodash/fp.js")); | ||
@@ -14,2 +14,4 @@ const mongodb_1 = require("mongodb"); | ||
const util_js_1 = require("./util.js"); | ||
const node_events_1 = __importDefault(require("node:events")); | ||
const ms_1 = __importDefault(require("ms")); | ||
const debug = (0, debug_1.default)('mongochangestream'); | ||
@@ -26,2 +28,3 @@ const keyPrefix = 'mongoChangeStream'; | ||
const changeStreamTokenKey = `${keyPrefix}:${collectionKey}:changeStreamToken`; | ||
const schemaKey = `${keyPrefix}:${collectionKey}:schema`; | ||
return { | ||
@@ -31,5 +34,5 @@ scanCompletedKey, | ||
changeStreamTokenKey, | ||
schemaKey, | ||
}; | ||
}; | ||
exports.getKeys = getKeys; | ||
exports.defaultSortField = { | ||
@@ -40,3 +43,4 @@ field: '_id', | ||
}; | ||
const initSync = (redis, options) => { | ||
const initSync = (redis, collection, options) => { | ||
const keys = getKeys(collection); | ||
const omit = options?.omit; | ||
@@ -48,7 +52,7 @@ const omitPipeline = omit ? (0, util_js_1.generatePipelineFromOmit)(omit) : []; | ||
*/ | ||
const runInitialScan = async (collection, processRecords, options) => { | ||
const runInitialScan = async (processRecords, options) => { | ||
debug('Running initial scan'); | ||
const sortField = options?.sortField || exports.defaultSortField; | ||
// Redis keys | ||
const { scanCompletedKey, lastScanIdKey } = (0, exports.getKeys)(collection); | ||
const { scanCompletedKey, lastScanIdKey } = keys; | ||
// Determine if initial scan has already completed | ||
@@ -109,6 +113,6 @@ const scanCompleted = await redis.get(scanCompletedKey); | ||
*/ | ||
const processChangeStream = async (collection, processRecord, pipeline = []) => { | ||
const processChangeStream = async (processRecord, pipeline = []) => { | ||
const abortController = new AbortController(); | ||
// Redis keys | ||
const { changeStreamTokenKey } = (0, exports.getKeys)(collection); | ||
const { changeStreamTokenKey } = keys; | ||
// Lookup change stream token | ||
@@ -146,5 +150,4 @@ const token = await redis.get(changeStreamTokenKey); | ||
*/ | ||
const reset = async (collection) => { | ||
const keys = Object.values((0, exports.getKeys)(collection)); | ||
await redis.del(...keys); | ||
const reset = async () => { | ||
await redis.del(...Object.values(keys)); | ||
}; | ||
@@ -154,6 +157,57 @@ /** | ||
*/ | ||
const clearCompletedOn = async (collection) => { | ||
const keys = (0, exports.getKeys)(collection); | ||
const clearCompletedOn = async () => { | ||
await redis.del(keys.scanCompletedKey); | ||
}; | ||
/** | ||
* Get the existing JSON schema for the collection. | ||
*/ | ||
const getCollectionSchema = async (db) => { | ||
const colls = await db | ||
.listCollections({ name: collection.collectionName }) | ||
.toArray(); | ||
return fp_js_1.default.get('0.options.validator.$jsonSchema', colls); | ||
}; | ||
/** | ||
* Get cached collection schema | ||
*/ | ||
const getCachedCollectionSchema = () => redis.get(keys.schemaKey).then((val) => val && JSON.parse(val)); | ||
/** | ||
* Check for schema changes every interval and emit 'change' event if found. | ||
*/ | ||
const detectSchemaChange = async (db, interval = (0, ms_1.default)('10s')) => { | ||
const emitter = new node_events_1.default.EventEmitter(); | ||
let timer; | ||
// Check for a cached schema | ||
let previousSchema = await getCachedCollectionSchema(); | ||
if (!previousSchema) { | ||
const schema = await getCollectionSchema(db); | ||
// Persist schema | ||
await redis.setnx(keys.schemaKey, JSON.stringify(schema)); | ||
previousSchema = schema; | ||
} | ||
// Check for a schema change | ||
const checkForSchemaChange = async () => { | ||
const currentSchema = await getCollectionSchema(db); | ||
// Schemas are no longer the same | ||
if (!fp_js_1.default.isEqual(currentSchema, previousSchema)) { | ||
debug('Schema change detected %O', currentSchema); | ||
emitter.emit('change', { initialSchema: previousSchema, currentSchema }); | ||
// Persist schema | ||
await redis.set(keys.schemaKey, JSON.stringify(currentSchema)); | ||
previousSchema = currentSchema; | ||
} | ||
}; | ||
const start = () => { | ||
debug('Started polling for schema changes'); | ||
// Perform an inital check | ||
checkForSchemaChange(); | ||
// Check for schema changes every interval | ||
timer = setInterval(checkForSchemaChange, interval); | ||
}; | ||
const stop = () => { | ||
debug('Stopped polling for schema changes'); | ||
clearInterval(timer); | ||
}; | ||
return { start, stop, emitter }; | ||
}; | ||
return { | ||
@@ -164,4 +218,7 @@ runInitialScan, | ||
clearCompletedOn, | ||
getCollectionSchema, | ||
detectSchemaChange, | ||
keys, | ||
}; | ||
}; | ||
exports.initSync = initSync; |
{ | ||
"name": "mongochangestream", | ||
"version": "0.16.0", | ||
"version": "0.17.0", | ||
"description": "Sync MongoDB collections via change streams into any database.", | ||
@@ -45,2 +45,3 @@ "author": "GovSpend", | ||
"lodash": "^4.17.21", | ||
"ms": "^2.1.3", | ||
"prom-utils": "^0.3.0" | ||
@@ -47,0 +48,0 @@ }, |
@@ -44,5 +44,5 @@ # Mongo Change Stream | ||
// Sync collection | ||
const sync = initSync(redis) | ||
sync.syncCollection(coll, processRecord) | ||
const changeStream = await sync.processChangeStream(coll, processRecord) | ||
const sync = initSync(redis, coll) | ||
sync.syncCollection(processRecord) | ||
const changeStream = await sync.processChangeStream(processRecord) | ||
changeStream.start() | ||
@@ -70,3 +70,2 @@ setTimeout(changeStream.stop, 30000) | ||
const runInitialScan = async ( | ||
collection: Collection, | ||
processRecords: ProcessRecords, | ||
@@ -77,3 +76,2 @@ options?: QueueOptions & ScanOptions | ||
const processChangeStream = async ( | ||
collection: Collection, | ||
processRecord: ProcessRecord, | ||
@@ -80,0 +78,0 @@ pipeline?: Document[] |
@@ -7,2 +7,3 @@ import _ from 'lodash/fp.js' | ||
ObjectId, | ||
Db, | ||
} from 'mongodb' | ||
@@ -25,2 +26,4 @@ import changeStreamToIterator from './changeStreamToIterator.js' | ||
} from './util.js' | ||
import events from 'node:events' | ||
import ms from 'ms' | ||
@@ -34,3 +37,3 @@ const debug = _debug('mongochangestream') | ||
*/ | ||
export const getKeys = (collection: Collection) => { | ||
const getKeys = (collection: Collection) => { | ||
const collectionKey = getCollectionKey(collection) | ||
@@ -41,2 +44,3 @@ const scanPrefix = `${keyPrefix}:${collectionKey}` | ||
const changeStreamTokenKey = `${keyPrefix}:${collectionKey}:changeStreamToken` | ||
const schemaKey = `${keyPrefix}:${collectionKey}:schema` | ||
return { | ||
@@ -46,2 +50,3 @@ scanCompletedKey, | ||
changeStreamTokenKey, | ||
schemaKey, | ||
} | ||
@@ -56,3 +61,8 @@ } | ||
export const initSync = (redis: Redis, options?: SyncOptions) => { | ||
export const initSync = ( | ||
redis: Redis, | ||
collection: Collection, | ||
options?: SyncOptions | ||
) => { | ||
const keys = getKeys(collection) | ||
const omit = options?.omit | ||
@@ -65,3 +75,2 @@ const omitPipeline = omit ? generatePipelineFromOmit(omit) : [] | ||
const runInitialScan = async ( | ||
collection: Collection, | ||
processRecords: ProcessRecords, | ||
@@ -73,3 +82,3 @@ options?: QueueOptions & ScanOptions | ||
// Redis keys | ||
const { scanCompletedKey, lastScanIdKey } = getKeys(collection) | ||
const { scanCompletedKey, lastScanIdKey } = keys | ||
// Determine if initial scan has already completed | ||
@@ -136,3 +145,2 @@ const scanCompleted = await redis.get(scanCompletedKey) | ||
const processChangeStream = async ( | ||
collection: Collection, | ||
processRecord: ProcessRecord, | ||
@@ -143,3 +151,3 @@ pipeline: Document[] = [] | ||
// Redis keys | ||
const { changeStreamTokenKey } = getKeys(collection) | ||
const { changeStreamTokenKey } = keys | ||
// Lookup change stream token | ||
@@ -183,5 +191,4 @@ const token = await redis.get(changeStreamTokenKey) | ||
*/ | ||
const reset = async (collection: Collection) => { | ||
const keys = Object.values(getKeys(collection)) | ||
await redis.del(...keys) | ||
const reset = async () => { | ||
await redis.del(...Object.values(keys)) | ||
} | ||
@@ -192,7 +199,61 @@ | ||
*/ | ||
const clearCompletedOn = async (collection: Collection) => { | ||
const keys = getKeys(collection) | ||
const clearCompletedOn = async () => { | ||
await redis.del(keys.scanCompletedKey) | ||
} | ||
/** | ||
* Get the existing JSON schema for the collection. | ||
*/ | ||
const getCollectionSchema = async (db: Db): Promise<object | undefined> => { | ||
const colls = await db | ||
.listCollections({ name: collection.collectionName }) | ||
.toArray() | ||
return _.get('0.options.validator.$jsonSchema', colls) | ||
} | ||
/** | ||
* Get cached collection schema | ||
*/ | ||
const getCachedCollectionSchema = () => | ||
redis.get(keys.schemaKey).then((val: any) => val && JSON.parse(val)) | ||
/** | ||
* Check for schema changes every interval and emit 'change' event if found. | ||
*/ | ||
const detectSchemaChange = async (db: Db, interval = ms('10s')) => { | ||
const emitter = new events.EventEmitter() | ||
let timer: NodeJS.Timer | ||
// Check for a cached schema | ||
let previousSchema = await getCachedCollectionSchema() | ||
if (!previousSchema) { | ||
const schema = await getCollectionSchema(db) | ||
// Persist schema | ||
await redis.setnx(keys.schemaKey, JSON.stringify(schema)) | ||
previousSchema = schema | ||
} | ||
// Check for a schema change | ||
const checkForSchemaChange = async () => { | ||
const currentSchema = await getCollectionSchema(db) | ||
// Schemas are no longer the same | ||
if (!_.isEqual(currentSchema, previousSchema)) { | ||
debug('Schema change detected %O', currentSchema) | ||
emitter.emit('change', { initialSchema: previousSchema, currentSchema }) | ||
// Persist schema | ||
await redis.set(keys.schemaKey, JSON.stringify(currentSchema)) | ||
previousSchema = currentSchema | ||
} | ||
} | ||
const start = () => { | ||
debug('Started polling for schema changes') | ||
// Perform an inital check | ||
checkForSchemaChange() | ||
// Check for schema changes every interval | ||
timer = setInterval(checkForSchemaChange, interval) | ||
} | ||
const stop = () => { | ||
debug('Stopped polling for schema changes') | ||
clearInterval(timer) | ||
} | ||
return { start, stop, emitter } | ||
} | ||
return { | ||
@@ -203,3 +264,6 @@ runInitialScan, | ||
clearCompletedOn, | ||
getCollectionSchema, | ||
detectSchemaChange, | ||
keys, | ||
} | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
33002
719
6
164
+ Addedms@^2.1.3