mongochangestream
Advanced tools
Comparing version 0.8.0 to 0.9.0
@@ -0,1 +1,5 @@ | ||
# 0.9.0 | ||
* Moved `omit` to `initSync` so that it can be used in the `processChangeStream` pipeline. | ||
# 0.8.0 | ||
@@ -2,0 +6,0 @@ |
export * from './mongoChangeStream.js'; | ||
export * from './types.js'; | ||
export * from './util.js'; |
@@ -19,1 +19,2 @@ "use strict"; | ||
__exportStar(require("./types.js"), exports); | ||
__exportStar(require("./util.js"), exports); |
import _ from 'lodash/fp.js'; | ||
import { Collection, ObjectId } from 'mongodb'; | ||
import { ProcessRecord, ProcessRecords, ScanOptions } from './types.js'; | ||
import { Options, ProcessRecord, ProcessRecords, ScanOptions } from './types.js'; | ||
import type { default as Redis } from 'ioredis'; | ||
@@ -19,3 +19,3 @@ import { QueueOptions } from 'prom-utils'; | ||
}; | ||
export declare const initSync: (redis: Redis) => { | ||
export declare const initSync: (redis: Redis, options?: Options) => { | ||
runInitialScan: (collection: Collection, processRecords: ProcessRecords, options?: QueueOptions & ScanOptions) => Promise<void>; | ||
@@ -22,0 +22,0 @@ processChangeStream: (collection: Collection, processRecord: ProcessRecord, pipeline?: Document[]) => Promise<void>; |
@@ -37,3 +37,5 @@ "use strict"; | ||
}; | ||
const initSync = (redis) => { | ||
const initSync = (redis, options) => { | ||
const omit = options?.omit; | ||
const defaultPipeline = omit ? (0, util_js_1.generatePipelineFromOmit)(omit) : []; | ||
/** | ||
@@ -46,3 +48,2 @@ * Run initial collection scan. `options.batchSize` defaults to 500. | ||
const sortField = options?.sortField || exports.defaultSortField; | ||
const omit = options?.omit; | ||
// Redis keys | ||
@@ -100,3 +101,3 @@ const { scanCompletedKey, lastScanIdKey } = (0, exports.getKeys)(collection); | ||
*/ | ||
const processChangeStream = async (collection, processRecord, pipeline = []) => { | ||
const processChangeStream = async (collection, processRecord, pipeline) => { | ||
// Redis keys | ||
@@ -111,3 +112,3 @@ const { changeStreamTokenKey } = (0, exports.getKeys)(collection); | ||
// Get the change stream as an async iterator | ||
const changeStream = (0, changeStreamToIterator_js_1.default)(collection, pipeline, options); | ||
const changeStream = (0, changeStreamToIterator_js_1.default)(collection, pipeline || defaultPipeline, options); | ||
// Consume the events | ||
@@ -114,0 +115,0 @@ for await (const event of changeStream) { |
import { ChangeStreamDocument, ChangeStreamInsertDocument } from 'mongodb'; | ||
export declare type ProcessRecord = (doc: ChangeStreamDocument) => void | Promise<void>; | ||
export declare type ProcessRecords = (doc: ChangeStreamInsertDocument[]) => void | Promise<void>; | ||
export interface Options { | ||
omit?: string[]; | ||
} | ||
export interface ScanOptions<T = any> { | ||
@@ -10,3 +13,2 @@ sortField?: { | ||
}; | ||
omit?: string[]; | ||
} |
export declare const setDefaults: (keys: string[], val: any) => Record<string, any>; | ||
export declare const generatePipelineFromOmit: (omit: string[]) => { | ||
$unset: string[]; | ||
}[]; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.setDefaults = void 0; | ||
exports.generatePipelineFromOmit = exports.setDefaults = void 0; | ||
const setDefaults = (keys, val) => { | ||
@@ -12,1 +12,9 @@ const obj = {}; | ||
exports.setDefaults = setDefaults; | ||
const generatePipelineFromOmit = (omit) => { | ||
const fields = omit.flatMap((field) => [ | ||
`fullDocument.${field}`, | ||
`updateDescription.updatedFields.${field}`, | ||
]); | ||
return [{ $unset: fields }]; | ||
}; | ||
exports.generatePipelineFromOmit = generatePipelineFromOmit; |
{ | ||
"name": "mongochangestream", | ||
"version": "0.8.0", | ||
"version": "0.9.0", | ||
"description": "Sync MongoDB collections via change streams into any database.", | ||
@@ -5,0 +5,0 @@ "author": "GovSpend", |
@@ -39,3 +39,3 @@ # Mongo Change Stream | ||
Below are the available methods. | ||
Below are the available methods. | ||
@@ -58,3 +58,3 @@ The `processChangeStream` method will never complete, but `runInitialScan` will complete | ||
processRecord: ProcessRecord, | ||
options?: QueueOptions | ||
options?: QueueOptions & ScanOptions | ||
): Promise<void> => ... | ||
@@ -65,3 +65,3 @@ | ||
processRecord: ProcessRecord, | ||
pipeline: Document[] = [] | ||
pipeline?: Document[] | ||
): Promise<void> => ... | ||
@@ -68,0 +68,0 @@ |
export * from './mongoChangeStream.js' | ||
export * from './types.js' | ||
export * from './util.js' |
import _ from 'lodash/fp.js' | ||
import { ChangeStreamInsertDocument, Collection, ObjectId } from 'mongodb' | ||
import changeStreamToIterator from './changeStreamToIterator.js' | ||
import { ProcessRecord, ProcessRecords, ScanOptions } from './types.js' | ||
import { Options, ProcessRecord, ProcessRecords, ScanOptions } from './types.js' | ||
import _debug from 'debug' | ||
import type { default as Redis } from 'ioredis' | ||
import { batchQueue, QueueOptions } from 'prom-utils' | ||
import { setDefaults } from './util.js' | ||
import { generatePipelineFromOmit, setDefaults } from './util.js' | ||
@@ -39,3 +39,5 @@ const debug = _debug('mongoChangeStream') | ||
export const initSync = (redis: Redis) => { | ||
export const initSync = (redis: Redis, options?: Options) => { | ||
const omit = options?.omit | ||
const defaultPipeline = omit ? generatePipelineFromOmit(omit) : [] | ||
/** | ||
@@ -52,3 +54,2 @@ * Run initial collection scan. `options.batchSize` defaults to 500. | ||
const sortField = options?.sortField || defaultSortField | ||
const omit = options?.omit | ||
// Redis keys | ||
@@ -114,3 +115,3 @@ const { scanCompletedKey, lastScanIdKey } = getKeys(collection) | ||
processRecord: ProcessRecord, | ||
pipeline: Document[] = [] | ||
pipeline?: Document[] | ||
) => { | ||
@@ -126,3 +127,7 @@ // Redis keys | ||
// Get the change stream as an async iterator | ||
const changeStream = changeStreamToIterator(collection, pipeline, options) | ||
const changeStream = changeStreamToIterator( | ||
collection, | ||
pipeline || defaultPipeline, | ||
options | ||
) | ||
// Consume the events | ||
@@ -149,3 +154,3 @@ for await (const event of changeStream) { | ||
/** | ||
* Delete completed on key in Redis for the given collection. | ||
* Delete completed on key in Redis for the given collection. | ||
*/ | ||
@@ -152,0 +157,0 @@ const clearCompletedOn = async (collection: Collection) => { |
@@ -9,2 +9,6 @@ import { ChangeStreamDocument, ChangeStreamInsertDocument } from 'mongodb' | ||
export interface Options { | ||
omit?: string[] | ||
} | ||
export interface ScanOptions<T = any> { | ||
@@ -16,3 +20,2 @@ sortField?: { | ||
} | ||
omit?: string[] | ||
} |
@@ -8,1 +8,9 @@ export const setDefaults = (keys: string[], val: any) => { | ||
} | ||
export const generatePipelineFromOmit = (omit: string[]) => { | ||
const fields = omit.flatMap((field) => [ | ||
`fullDocument.${field}`, | ||
`updateDescription.updatedFields.${field}`, | ||
]) | ||
return [{ $unset: fields }] | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
24017
501