mongochangestream
Advanced tools
Comparing version 0.22.0 to 0.23.0
@@ -0,1 +1,5 @@ | ||
# 0.23.0 | ||
- `JSONSchema` type. | ||
# 0.22.0 | ||
@@ -2,0 +6,0 @@ |
/// <reference types="node" /> | ||
import _ from 'lodash/fp.js'; | ||
import { Collection, Document, ObjectId, Db } from 'mongodb'; | ||
import { SyncOptions, ProcessRecord, ProcessRecords, ScanOptions, ChangeOptions } from './types.js'; | ||
import { SyncOptions, ProcessRecord, ProcessRecords, ScanOptions, ChangeOptions, JSONSchema } from './types.js'; | ||
import type { Redis } from 'ioredis'; | ||
@@ -14,2 +14,9 @@ import { QueueOptions } from 'prom-utils'; | ||
export declare const initSync: (redis: Redis, collection: Collection, options?: SyncOptions) => { | ||
/** | ||
* Run initial collection scan. `options.batchSize` defaults to 500. | ||
* Sorting defaults to `_id`. | ||
* | ||
* Call `start` to start processing documents and `stop` to close | ||
* the cursor. | ||
*/ | ||
runInitialScan: (processRecords: ProcessRecords, options?: QueueOptions & ScanOptions) => Promise<{ | ||
@@ -19,2 +26,10 @@ start: () => Promise<void>; | ||
}>; | ||
/** | ||
* Process MongoDB change stream for the collection. | ||
* If omit is passed to `initSync` a pipeline stage that removes | ||
* those fields will be prepended to the `pipeline` argument. | ||
* | ||
* Call `start` to start processing events and `stop` to close | ||
* the change stream. | ||
*/ | ||
processChangeStream: (processRecord: ProcessRecord, pipeline?: Document[]) => Promise<{ | ||
@@ -24,5 +39,18 @@ start: () => Promise<void>; | ||
}>; | ||
/** | ||
* Delete all Redis keys for the collection. | ||
*/ | ||
reset: () => Promise<void>; | ||
clearCompletedOn: () => Promise<void>; | ||
getCollectionSchema: (db: Db) => Promise<object>; | ||
/** | ||
* Get the existing JSON schema for the collection. | ||
*/ | ||
getCollectionSchema: (db: Db) => Promise<JSONSchema>; | ||
/** | ||
* Check for schema changes every interval and emit 'change' event if found. | ||
* Optionally, set interval and strip metadata (i.e., title and description) | ||
* from the JSON schema. | ||
* | ||
* Call `start` to start polling for schema changes and `stop` to clear | ||
* the timer. | ||
*/ | ||
detectSchemaChange: (db: Db, options?: ChangeOptions) => Promise<{ | ||
@@ -33,2 +61,5 @@ start: () => Promise<void>; | ||
}>; | ||
/** | ||
* Redis keys used for the collection. | ||
*/ | ||
keys: { | ||
@@ -35,0 +66,0 @@ scanCompletedKey: string; |
@@ -18,3 +18,3 @@ "use strict"; | ||
/** | ||
* Get Redis keys used for the given collection. | ||
* Get Redis keys used for the collection. | ||
*/ | ||
@@ -40,10 +40,6 @@ const getKeys = (collection) => { | ||
}; | ||
const initSync = (redis, collection, options) => { | ||
const initSync = (redis, collection, options = {}) => { | ||
const keys = getKeys(collection); | ||
const omit = options?.omit; | ||
const omit = options.omit; | ||
const omitPipeline = omit ? (0, util_js_1.generatePipelineFromOmit)(omit) : []; | ||
/** | ||
* Run initial collection scan. `options.batchSize` defaults to 500. | ||
* Sorting defaults to `_id`. | ||
*/ | ||
const runInitialScan = async (processRecords, options = {}) => { | ||
@@ -121,10 +117,2 @@ let deferred; | ||
const defaultOptions = { fullDocument: 'updateLookup' }; | ||
/** | ||
* Process MongoDB change stream for the given collection. | ||
* If omit is passed to `initSync` a pipeline stage that removes | ||
* those fields will be prepended to the `pipeline` argument. | ||
* | ||
* Call `start` to start processing events and `stop` to close | ||
* the change stream. | ||
*/ | ||
const processChangeStream = async (processRecord, pipeline = []) => { | ||
@@ -172,17 +160,5 @@ let deferred; | ||
}; | ||
/** | ||
* Delete all Redis keys for the given collection. | ||
*/ | ||
const reset = async () => { | ||
await redis.del(...Object.values(keys)); | ||
}; | ||
/** | ||
* Delete completed on key in Redis for the given collection. | ||
*/ | ||
const clearCompletedOn = async () => { | ||
await redis.del(keys.scanCompletedKey); | ||
}; | ||
/** | ||
* Get the existing JSON schema for the collection. | ||
*/ | ||
const getCollectionSchema = async (db) => { | ||
@@ -198,7 +174,2 @@ const colls = await db | ||
const getCachedCollectionSchema = () => redis.get(keys.schemaKey).then((val) => val && JSON.parse(val)); | ||
/** | ||
* Check for schema changes every interval and emit 'change' event if found. | ||
* Optionally, set interval and strip metadata (i.e., title and description) | ||
* from the JSON schema. | ||
*/ | ||
const detectSchemaChange = async (db, options = {}) => { | ||
@@ -247,8 +218,39 @@ const interval = options.interval || (0, ms_1.default)('1m'); | ||
return { | ||
/** | ||
* Run initial collection scan. `options.batchSize` defaults to 500. | ||
* Sorting defaults to `_id`. | ||
* | ||
* Call `start` to start processing documents and `stop` to close | ||
* the cursor. | ||
*/ | ||
runInitialScan, | ||
/** | ||
* Process MongoDB change stream for the collection. | ||
* If omit is passed to `initSync` a pipeline stage that removes | ||
* those fields will be prepended to the `pipeline` argument. | ||
* | ||
* Call `start` to start processing events and `stop` to close | ||
* the change stream. | ||
*/ | ||
processChangeStream, | ||
/** | ||
* Delete all Redis keys for the collection. | ||
*/ | ||
reset, | ||
clearCompletedOn, | ||
/** | ||
* Get the existing JSON schema for the collection. | ||
*/ | ||
getCollectionSchema, | ||
/** | ||
* Check for schema changes every interval and emit 'change' event if found. | ||
* Optionally, set interval and strip metadata (i.e., title and description) | ||
* from the JSON schema. | ||
* | ||
* Call `start` to start polling for schema changes and `stop` to clear | ||
* the timer. | ||
*/ | ||
detectSchemaChange, | ||
/** | ||
* Redis keys used for the collection. | ||
*/ | ||
keys, | ||
@@ -255,0 +257,0 @@ }; |
import { ChangeStreamDocument, ChangeStreamInsertDocument } from 'mongodb'; | ||
import { JSONSchema4, JSONSchema6, JSONSchema7 } from 'json-schema'; | ||
export declare type ProcessRecord = (doc: ChangeStreamDocument) => void | Promise<void>; | ||
@@ -18,1 +19,2 @@ export declare type ProcessRecords = (doc: ChangeStreamInsertDocument[]) => void | Promise<void>; | ||
} | ||
export declare type JSONSchema = JSONSchema4 | JSONSchema6 | JSONSchema7; |
import { Collection } from 'mongodb'; | ||
import _ from 'lodash/fp.js'; | ||
import { JSONSchema } from './types'; | ||
export declare const setDefaults: (keys: string[], val: any) => Record<string, any>; | ||
@@ -10,6 +11,15 @@ export declare const generatePipelineFromOmit: (omit: string[]) => { | ||
export declare const getCollectionKey: (collection: Collection) => string; | ||
export declare const traverseSchema: (x: JSONSchema) => false | { | ||
[k: string]: import("json-schema").JSONSchema4; | ||
} | { | ||
[k: string]: import("json-schema").JSONSchema6Definition; | ||
} | { | ||
[key: string]: import("json-schema").JSONSchema7Definition; | ||
} | { | ||
_items: true | import("json-schema").JSONSchema4 | import("json-schema").JSONSchema4[] | import("json-schema").JSONSchema6 | import("json-schema").JSONSchema6Definition[] | import("json-schema").JSONSchema7 | import("json-schema").JSONSchema7Definition[]; | ||
} | undefined; | ||
/** | ||
* Remove title and description from a JSON schema. | ||
*/ | ||
export declare const removeMetadata: (schema: object) => object; | ||
export declare function when<T>(condition: any, fn: (x: T) => any): (x: T) => any; | ||
export declare const removeMetadata: (schema: JSONSchema) => object; | ||
export declare function when<T, R>(condition: any, fn: (x: T) => R): (x: T) => T | R; |
@@ -6,3 +6,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.when = exports.removeMetadata = exports.getCollectionKey = exports.omitFieldForUpdate = exports.omitFields = exports.generatePipelineFromOmit = exports.setDefaults = void 0; | ||
exports.when = exports.removeMetadata = exports.traverseSchema = exports.getCollectionKey = exports.omitFieldForUpdate = exports.omitFields = exports.generatePipelineFromOmit = exports.setDefaults = void 0; | ||
const fp_js_1 = __importDefault(require("lodash/fp.js")); | ||
@@ -32,2 +32,4 @@ const obj_walker_1 = require("obj-walker"); | ||
exports.getCollectionKey = getCollectionKey; | ||
const traverseSchema = (x) => x.properties || (x.items && { _items: x.items }); | ||
exports.traverseSchema = traverseSchema; | ||
/** | ||
@@ -37,3 +39,2 @@ * Remove title and description from a JSON schema. | ||
const removeMetadata = (schema) => { | ||
const traverse = (x) => x.properties || (x.items && { items: x.items }); | ||
const walkFn = ({ val }) => { | ||
@@ -47,3 +48,3 @@ if ('title' in val) { | ||
}; | ||
return (0, obj_walker_1.walkie)(schema, walkFn, { traverse }); | ||
return (0, obj_walker_1.walkie)(schema, walkFn, { traverse: exports.traverseSchema }); | ||
}; | ||
@@ -50,0 +51,0 @@ exports.removeMetadata = removeMetadata; |
{ | ||
"name": "mongochangestream", | ||
"version": "0.22.0", | ||
"version": "0.23.0", | ||
"description": "Sync MongoDB collections via change streams into any database.", | ||
@@ -43,2 +43,3 @@ "author": "GovSpend", | ||
"dependencies": { | ||
"@types/json-schema": "^7.0.11", | ||
"debug": "^4.3.4", | ||
@@ -45,0 +46,0 @@ "lodash": "^4.17.21", |
@@ -17,2 +17,3 @@ import _ from 'lodash/fp.js' | ||
ChangeOptions, | ||
JSONSchema, | ||
} from './types.js' | ||
@@ -38,3 +39,3 @@ import _debug from 'debug' | ||
/** | ||
* Get Redis keys used for the given collection. | ||
* Get Redis keys used for the collection. | ||
*/ | ||
@@ -65,11 +66,8 @@ const getKeys = (collection: Collection) => { | ||
collection: Collection, | ||
options?: SyncOptions | ||
options: SyncOptions = {} | ||
) => { | ||
const keys = getKeys(collection) | ||
const omit = options?.omit | ||
const omit = options.omit | ||
const omitPipeline = omit ? generatePipelineFromOmit(omit) : [] | ||
/** | ||
* Run initial collection scan. `options.batchSize` defaults to 500. | ||
* Sorting defaults to `_id`. | ||
*/ | ||
const runInitialScan = async ( | ||
@@ -158,10 +156,2 @@ processRecords: ProcessRecords, | ||
/** | ||
* Process MongoDB change stream for the given collection. | ||
* If omit is passed to `initSync` a pipeline stage that removes | ||
* those fields will be prepended to the `pipeline` argument. | ||
* | ||
* Call `start` to start processing events and `stop` to close | ||
* the change stream. | ||
*/ | ||
const processChangeStream = async ( | ||
@@ -216,5 +206,2 @@ processRecord: ProcessRecord, | ||
/** | ||
* Delete all Redis keys for the given collection. | ||
*/ | ||
const reset = async () => { | ||
@@ -224,13 +211,3 @@ await redis.del(...Object.values(keys)) | ||
/** | ||
* Delete completed on key in Redis for the given collection. | ||
*/ | ||
const clearCompletedOn = async () => { | ||
await redis.del(keys.scanCompletedKey) | ||
} | ||
/** | ||
* Get the existing JSON schema for the collection. | ||
*/ | ||
const getCollectionSchema = async (db: Db): Promise<object> => { | ||
const getCollectionSchema = async (db: Db): Promise<JSONSchema> => { | ||
const colls = await db | ||
@@ -247,7 +224,3 @@ .listCollections({ name: collection.collectionName }) | ||
redis.get(keys.schemaKey).then((val: any) => val && JSON.parse(val)) | ||
/** | ||
* Check for schema changes every interval and emit 'change' event if found. | ||
* Optionally, set interval and strip metadata (i.e., title and description) | ||
* from the JSON schema. | ||
*/ | ||
const detectSchemaChange = async (db: Db, options: ChangeOptions = {}) => { | ||
@@ -302,10 +275,41 @@ const interval = options.interval || ms('1m') | ||
return { | ||
/** | ||
* Run initial collection scan. `options.batchSize` defaults to 500. | ||
* Sorting defaults to `_id`. | ||
* | ||
* Call `start` to start processing documents and `stop` to close | ||
* the cursor. | ||
*/ | ||
runInitialScan, | ||
/** | ||
* Process MongoDB change stream for the collection. | ||
* If omit is passed to `initSync` a pipeline stage that removes | ||
* those fields will be prepended to the `pipeline` argument. | ||
* | ||
* Call `start` to start processing events and `stop` to close | ||
* the change stream. | ||
*/ | ||
processChangeStream, | ||
/** | ||
* Delete all Redis keys for the collection. | ||
*/ | ||
reset, | ||
clearCompletedOn, | ||
/** | ||
* Get the existing JSON schema for the collection. | ||
*/ | ||
getCollectionSchema, | ||
/** | ||
* Check for schema changes every interval and emit 'change' event if found. | ||
* Optionally, set interval and strip metadata (i.e., title and description) | ||
* from the JSON schema. | ||
* | ||
* Call `start` to start polling for schema changes and `stop` to clear | ||
* the timer. | ||
*/ | ||
detectSchemaChange, | ||
/** | ||
* Redis keys used for the collection. | ||
*/ | ||
keys, | ||
} | ||
} |
import { ChangeStreamDocument, ChangeStreamInsertDocument } from 'mongodb' | ||
import { JSONSchema4, JSONSchema6, JSONSchema7 } from 'json-schema' | ||
@@ -25,1 +26,3 @@ export type ProcessRecord = (doc: ChangeStreamDocument) => void | Promise<void> | ||
} | ||
export type JSONSchema = JSONSchema4 | JSONSchema6 | JSONSchema7 |
import { Collection } from 'mongodb' | ||
import _ from 'lodash/fp.js' | ||
import { walkie, Node } from 'obj-walker' | ||
import { Node, walkie } from 'obj-walker' | ||
import { JSONSchema } from './types' | ||
@@ -32,7 +33,9 @@ export const setDefaults = (keys: string[], val: any) => { | ||
export const traverseSchema = (x: JSONSchema) => | ||
x.properties || (x.items && { _items: x.items }) | ||
/** | ||
* Remove title and description from a JSON schema. | ||
*/ | ||
export const removeMetadata = (schema: object) => { | ||
const traverse = (x: any) => x.properties || (x.items && { items: x.items }) | ||
export const removeMetadata = (schema: JSONSchema) => { | ||
const walkFn = ({ val }: Node) => { | ||
@@ -46,9 +49,9 @@ if ('title' in val) { | ||
} | ||
return walkie(schema, walkFn, { traverse }) | ||
return walkie(schema, walkFn, { traverse: traverseSchema }) | ||
} | ||
export function when<T>(condition: any, fn: (x: T) => any) { | ||
return function(x: T) { | ||
export function when<T, R>(condition: any, fn: (x: T) => R) { | ||
return function (x: T) { | ||
return condition ? fn(x) : x | ||
} | ||
} | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
40954
888
8
+ Added@types/json-schema@^7.0.11
+ Added@types/json-schema@7.0.15(transitive)