mongochangestream
Advanced tools
Comparing version 0.44.0 to 0.45.0
@@ -0,1 +1,6 @@ | ||
# 0.45.0 | ||
- Added `uniqueId` option to allow the same collection to be synced in parallel. | ||
- Deprecated option `shouldRemoveMetadata` in `detectSchemaChange`. Prefer `shouldRemoveUnusedFields`. | ||
# 0.44.0 | ||
@@ -2,0 +7,0 @@ |
@@ -1,10 +0,10 @@ | ||
import { Collection, Db } from 'mongodb'; | ||
import { Events, SyncOptions, ProcessChangeStreamRecords, ProcessInitialScanRecords, ScanOptions, ChangeOptions, ChangeStreamOptions, JSONSchema, State } from './types.js'; | ||
import EventEmitter from 'eventemitter3'; | ||
import type { Redis } from 'ioredis'; | ||
import { QueueOptions } from 'prom-utils'; | ||
import EventEmitter from 'eventemitter3'; | ||
import { type Collection, type Db } from 'mongodb'; | ||
import { type QueueOptions } from 'prom-utils'; | ||
import { ChangeOptions, ChangeStreamOptions, Events, JSONSchema, ProcessChangeStreamRecords, ProcessInitialScanRecords, ScanOptions, State, SyncOptions } from './types.js'; | ||
/** | ||
* Get Redis keys used for the collection. | ||
*/ | ||
export declare const getKeys: (collection: Collection) => { | ||
export declare const getKeys: (collection: Collection, options: SyncOptions) => { | ||
scanCompletedKey: string; | ||
@@ -11,0 +11,0 @@ lastScanIdKey: string; |
@@ -7,10 +7,10 @@ "use strict"; | ||
exports.initSync = exports.getKeys = void 0; | ||
const debug_1 = __importDefault(require("debug")); | ||
const eventemitter3_1 = __importDefault(require("eventemitter3")); | ||
const fp_js_1 = __importDefault(require("lodash/fp.js")); | ||
const mongodb_1 = require("mongodb"); | ||
const debug_1 = __importDefault(require("debug")); | ||
const ms_1 = __importDefault(require("ms")); | ||
const prom_utils_1 = require("prom-utils"); | ||
const simple_machines_1 = require("simple-machines"); | ||
const util_js_1 = require("./util.js"); | ||
const eventemitter3_1 = __importDefault(require("eventemitter3")); | ||
const ms_1 = __importDefault(require("ms")); | ||
const simple_machines_1 = require("simple-machines"); | ||
const debug = (0, debug_1.default)('mongochangestream'); | ||
@@ -21,5 +21,6 @@ const keyPrefix = 'mongoChangeStream'; | ||
*/ | ||
const getKeys = (collection) => { | ||
const getKeys = (collection, options) => { | ||
const collectionKey = (0, util_js_1.getCollectionKey)(collection); | ||
const collectionPrefix = `${keyPrefix}:${collectionKey}`; | ||
const uniqueId = options.uniqueId ? `:${options.uniqueId}` : ''; | ||
const collectionPrefix = `${keyPrefix}:${collectionKey}${uniqueId}`; | ||
const scanCompletedKey = `${collectionPrefix}:initialScanCompletedOn`; | ||
@@ -55,3 +56,3 @@ const lastScanIdKey = `${collectionPrefix}:lastScanId`; | ||
function initSync(redis, collection, options = {}) { | ||
const keys = (0, exports.getKeys)(collection); | ||
const keys = (0, exports.getKeys)(collection, options); | ||
const { omit } = options; | ||
@@ -381,4 +382,4 @@ const emitter = new eventemitter3_1.default(); | ||
const interval = options.interval || (0, ms_1.default)('1m'); | ||
const shouldRemoveMetadata = options.shouldRemoveMetadata; | ||
const maybeRemoveMetadata = (0, util_js_1.when)(shouldRemoveMetadata, util_js_1.removeMetadata); | ||
const shouldRemoveUnusedFields = options.shouldRemoveUnusedFields || options.shouldRemoveMetadata; | ||
const maybeRemoveUnusedFields = (0, util_js_1.when)(shouldRemoveUnusedFields, util_js_1.removeUnusedFields); | ||
const state = (0, simple_machines_1.fsm)(simpleStateTransistions, 'stopped', { | ||
@@ -392,7 +393,7 @@ name: 'detectSchemaChange', | ||
if (schema) { | ||
return maybeRemoveMetadata(schema); | ||
return maybeRemoveUnusedFields(schema); | ||
} | ||
}); | ||
if (!previousSchema) { | ||
const schema = await getCollectionSchema(db).then(maybeRemoveMetadata); | ||
const schema = await getCollectionSchema(db).then(maybeRemoveUnusedFields); | ||
// Persist schema | ||
@@ -405,3 +406,3 @@ await redis.setnx(keys.schemaKey, JSON.stringify(schema)); | ||
const checkForSchemaChange = async () => { | ||
const currentSchema = await getCollectionSchema(db).then(maybeRemoveMetadata); | ||
const currentSchema = await getCollectionSchema(db).then(maybeRemoveUnusedFields); | ||
// Schemas are no longer the same | ||
@@ -408,0 +409,0 @@ if (!fp_js_1.default.isEqual(currentSchema, previousSchema)) { |
@@ -1,2 +0,2 @@ | ||
import { ChangeStream, AggregationCursor, ChangeStreamDocument, ChangeStreamInsertDocument, Document, MongoServerError, MongoAPIError } from 'mongodb'; | ||
import { AggregationCursor, ChangeStream, ChangeStreamDocument, ChangeStreamInsertDocument, Document, MongoAPIError, MongoServerError } from 'mongodb'; | ||
export type Cursor = ChangeStream | AggregationCursor; | ||
@@ -10,2 +10,9 @@ export type JSONSchema = Record<string, any>; | ||
omit?: string[]; | ||
/** | ||
* Added to all Redis keys to allow the same collection | ||
* to be synced in parallel. Otherwise, the Redis keys | ||
* would be the same for a given collection and parallel | ||
* syncing jobs would overwrite each other. | ||
*/ | ||
uniqueId?: string; | ||
} | ||
@@ -33,4 +40,10 @@ export interface SortField<T> { | ||
interval?: number; | ||
/** Should fields like title and description be ignored when detecting a change. */ | ||
/** @deprecated Use shouldRemoveUnusedFields instead.*/ | ||
shouldRemoveMetadata?: boolean; | ||
/** | ||
* Remove fields that are not used when converting the schema | ||
* in a downstream library like mongo2elastic or mongo2crate. | ||
* Preserves bsonType, properties, additionalProperties, items, and enum. | ||
*/ | ||
shouldRemoveUnusedFields?: boolean; | ||
} | ||
@@ -37,0 +50,0 @@ export type Events = 'cursorError' | 'resync' | 'schemaChange' | 'stateChange' | 'initialScanComplete'; |
@@ -1,4 +0,4 @@ | ||
import { Collection } from 'mongodb'; | ||
import _ from 'lodash/fp.js'; | ||
import { Cursor, CursorError, JSONSchema } from './types.js'; | ||
import { type Collection } from 'mongodb'; | ||
import type { Cursor, CursorError, JSONSchema } from './types.js'; | ||
export declare const setDefaults: (keys: string[], val: any) => Record<string, any>; | ||
@@ -11,5 +11,5 @@ export declare const generatePipelineFromOmit: (omit: string[]) => any[]; | ||
/** | ||
* Remove title and description from a JSON schema. | ||
* Remove unused schema fields | ||
*/ | ||
export declare const removeMetadata: (schema: JSONSchema) => JSONSchema; | ||
export declare const removeUnusedFields: (schema: JSONSchema) => JSONSchema; | ||
export declare function when<T, R>(condition: any, fn: (x: T) => R): (x: T) => T | R; | ||
@@ -16,0 +16,0 @@ /** |
@@ -6,7 +6,7 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.delayed = exports.missingOplogEntry = exports.safelyCheckNext = exports.when = exports.removeMetadata = exports.traverseSchema = exports.getCollectionKey = exports.omitFieldForUpdate = exports.omitFields = exports.generatePipelineFromOmit = exports.setDefaults = void 0; | ||
exports.delayed = exports.missingOplogEntry = exports.safelyCheckNext = exports.when = exports.removeUnusedFields = exports.traverseSchema = exports.getCollectionKey = exports.omitFieldForUpdate = exports.omitFields = exports.generatePipelineFromOmit = exports.setDefaults = void 0; | ||
const debug_1 = __importDefault(require("debug")); | ||
const fp_js_1 = __importDefault(require("lodash/fp.js")); | ||
const mongodb_1 = require("mongodb"); | ||
const fp_js_1 = __importDefault(require("lodash/fp.js")); | ||
const obj_walker_1 = require("obj-walker"); | ||
const debug_1 = __importDefault(require("debug")); | ||
const debug = (0, debug_1.default)('mongochangestream'); | ||
@@ -55,3 +55,3 @@ const setDefaults = (keys, val) => { | ||
exports.generatePipelineFromOmit = generatePipelineFromOmit; | ||
const omitFields = (omitPaths) => fp_js_1.default.omitBy((val, key) => fp_js_1.default.find((omitPath) => fp_js_1.default.startsWith(`${omitPath}.`, key), omitPaths)); | ||
const omitFields = (omitPaths) => fp_js_1.default.omitBy((_val, key) => fp_js_1.default.find((omitPath) => fp_js_1.default.startsWith(`${omitPath}.`, key), omitPaths)); | ||
exports.omitFields = omitFields; | ||
@@ -64,17 +64,26 @@ const omitFieldForUpdate = (omitPaths) => fp_js_1.default.update('updateDescription.updatedFields', (0, exports.omitFields)(omitPaths)); | ||
exports.traverseSchema = traverseSchema; | ||
const usedSchemaFields = [ | ||
'bsonType', | ||
'properties', | ||
'additionalProperties', | ||
'items', | ||
'enum', | ||
]; | ||
/** | ||
* Remove title and description from a JSON schema. | ||
* Remove unused schema fields | ||
*/ | ||
const removeMetadata = (schema) => { | ||
const removeUnusedFields = (schema) => { | ||
const walkFn = ({ val }) => { | ||
if ('title' in val) { | ||
delete val.title; | ||
for (const key in val) { | ||
if (!usedSchemaFields.includes(key)) { | ||
delete val[key]; | ||
} | ||
} | ||
if ('description' in val) { | ||
delete val.description; | ||
} | ||
}; | ||
return (0, obj_walker_1.walkie)(schema, walkFn, { traverse: exports.traverseSchema }); | ||
return (0, obj_walker_1.walkEach)(schema, walkFn, { | ||
traverse: exports.traverseSchema, | ||
modifyInPlace: true, | ||
}); | ||
}; | ||
exports.removeMetadata = removeMetadata; | ||
exports.removeUnusedFields = removeUnusedFields; | ||
function when(condition, fn) { | ||
@@ -81,0 +90,0 @@ return function (x) { |
{ | ||
"name": "mongochangestream", | ||
"version": "0.44.0", | ||
"version": "0.45.0", | ||
"description": "Sync MongoDB collections via change streams into any database.", | ||
@@ -8,3 +8,6 @@ "author": "GovSpend", | ||
"types": "dist/index.d.ts", | ||
"repository": "git://github.com/smartprocure/mongochangestream.git", | ||
"repository": { | ||
"type": "git", | ||
"url": "git://github.com/smartprocure/mongochangestream.git" | ||
}, | ||
"scripts": { | ||
@@ -17,4 +20,4 @@ "prepare": "npm run lint && npm run test && npm run build", | ||
"fmt": "prettier --ignore-path .gitignore --write './'", | ||
"test": "node --test --test-force-exit", | ||
"test:only": "DEBUG=* node --test --test-only --test-force-exit" | ||
"test": "node --env-file=.env --test --test-force-exit", | ||
"test:only": "DEBUG=* node --env-file=.env --test --test-only --test-force-exit" | ||
}, | ||
@@ -42,12 +45,12 @@ "keywords": [ | ||
"@faker-js/faker": "^8.4.1", | ||
"@trivago/prettier-plugin-sort-imports": "^4.3.0", | ||
"@types/debug": "^4.1.12", | ||
"@types/lodash": "^4.17.1", | ||
"@types/node": "^20.12.12", | ||
"@typescript-eslint/eslint-plugin": "^7.9.0", | ||
"dotenv": "^16.4.5", | ||
"prettier": "^3.2.5", | ||
"@types/lodash": "^4.17.5", | ||
"@types/node": "^20.14.2", | ||
"@typescript-eslint/eslint-plugin": "^7.13.0", | ||
"prettier": "^3.3.2", | ||
"typescript": "^5.4.5" | ||
}, | ||
"dependencies": { | ||
"debug": "^4.3.4", | ||
"debug": "^4.3.5", | ||
"eventemitter3": "^5.0.1", | ||
@@ -57,3 +60,3 @@ "lodash": "^4.17.21", | ||
"obj-walker": "^2.2.0", | ||
"prom-utils": "^0.8.0", | ||
"prom-utils": "^0.9.0", | ||
"simple-machines": "^0.4.0" | ||
@@ -68,4 +71,13 @@ }, | ||
"singleQuote": true, | ||
"trailingComma": "es5" | ||
"trailingComma": "es5", | ||
"plugins": [ | ||
"@trivago/prettier-plugin-sort-imports" | ||
], | ||
"importOrder": [ | ||
"^[./]" | ||
], | ||
"importOrderSortSpecifiers": true, | ||
"importOrderCaseInsensitive": true, | ||
"importOrderSeparation": true | ||
} | ||
} |
@@ -25,5 +25,5 @@ # Mongo Change Stream | ||
```typescript | ||
import { ChangeStreamDocument, MongoClient } from 'mongodb' | ||
import { default as Redis } from 'ioredis' | ||
import { initSync } from 'mongochangestream' | ||
import { ChangeStreamDocument, MongoClient } from 'mongodb' | ||
@@ -30,0 +30,0 @@ const redis = new Redis() |
/** | ||
* To run add a local .env file with MONGO_CONN | ||
* To run add a local .env file with MONGO_CONN and execute npm test. | ||
* NOTE: Node version 22 or higher is required. | ||
*/ | ||
import 'dotenv/config' | ||
import { faker } from '@faker-js/faker' | ||
import Redis from 'ioredis' | ||
import _ from 'lodash/fp.js' | ||
import { | ||
type ChangeStreamDocument, | ||
type ChangeStreamInsertDocument, | ||
type Collection, | ||
type Db, | ||
type Document, | ||
MongoClient, | ||
ObjectId, | ||
} from 'mongodb' | ||
import ms from 'ms' | ||
import assert from 'node:assert' | ||
import { describe, test } from 'node:test' | ||
import assert from 'node:assert' | ||
import { setTimeout } from 'node:timers/promises' | ||
import { type QueueOptions } from 'prom-utils' | ||
import { initSync } from './mongoChangeStream.js' | ||
import { | ||
import type { | ||
CursorErrorEvent, | ||
JSONSchema, | ||
ScanOptions, | ||
SchemaChangeEvent, | ||
ScanOptions, | ||
SortField, | ||
SyncOptions, | ||
SortField, | ||
CursorErrorEvent, | ||
} from './types.js' | ||
import { | ||
Document, | ||
ChangeStreamDocument, | ||
ChangeStreamInsertDocument, | ||
MongoClient, | ||
Collection, | ||
ObjectId, | ||
Db, | ||
} from 'mongodb' | ||
import Redis from 'ioredis' | ||
import { faker } from '@faker-js/faker' | ||
import ms from 'ms' | ||
import { setTimeout } from 'node:timers/promises' | ||
import { QueueOptions } from 'prom-utils' | ||
import { missingOplogEntry } from './util.js' | ||
@@ -187,2 +188,34 @@ | ||
test('should allow parallel syncing via uniqueId option', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
const sync2 = await getSync({ uniqueId: 'v2' }) | ||
await initState(sync, db, coll) | ||
// Clear syncing state | ||
await sync2.reset() | ||
const processed: { v1: any[]; v2: any[] } = { v1: [], v2: [] } | ||
const processRecords = | ||
(version: 'v1' | 'v2') => async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(50) | ||
processed[version].push(...docs) | ||
} | ||
const scanOptions = { batchSize: 100 } | ||
const initialScan = await sync.runInitialScan( | ||
processRecords('v1'), | ||
scanOptions | ||
) | ||
const initialScan2 = await sync2.runInitialScan( | ||
processRecords('v2'), | ||
scanOptions | ||
) | ||
// Wait for initial scan to complete | ||
await Promise.all([initialScan.start(), initialScan2.start()]) | ||
// Assertions | ||
assert.equal(processed.v1.length, numDocs) | ||
assert.equal(processed.v2.length, numDocs) | ||
// Stop | ||
await Promise.all([initialScan.stop(), initialScan2.stop()]) | ||
}) | ||
test('should exit cleanly if initial scan is already complete', async () => { | ||
@@ -703,9 +736,9 @@ const { coll, db, redis } = await getConns() | ||
const schemaChange = await sync.detectSchemaChange(db, { | ||
shouldRemoveMetadata: true, | ||
shouldRemoveUnusedFields: true, | ||
interval: 250, | ||
}) | ||
let newSchema: object = {} | ||
let schemaChangeEventTriggered = false | ||
sync.emitter.on('schemaChange', ({ currentSchema }: SchemaChangeEvent) => { | ||
console.dir(currentSchema, { depth: 10 }) | ||
newSchema = currentSchema | ||
schemaChangeEventTriggered = true | ||
}) | ||
@@ -725,3 +758,3 @@ // Start detecting schema changes | ||
await setTimeout(ms('1s')) | ||
assert.deepEqual(modifiedSchema, newSchema) | ||
assert.ok(schemaChangeEventTriggered) | ||
schemaChange.stop() | ||
@@ -728,0 +761,0 @@ }) |
@@ -0,27 +1,31 @@ | ||
import _debug from 'debug' | ||
import EventEmitter from 'eventemitter3' | ||
import type { Redis } from 'ioredis' | ||
import _ from 'lodash/fp.js' | ||
import { | ||
ChangeStreamDocument, | ||
ChangeStreamInsertDocument, | ||
Collection, | ||
type ChangeStream, | ||
type ChangeStreamDocument, | ||
type ChangeStreamInsertDocument, | ||
type Collection, | ||
type Db, | ||
ObjectId, | ||
Db, | ||
ChangeStream, | ||
} from 'mongodb' | ||
import * as mongodb from 'mongodb' | ||
import ms from 'ms' | ||
import { batchQueue, defer, type Deferred, type QueueOptions } from 'prom-utils' | ||
import { fsm, type StateTransitions } from 'simple-machines' | ||
import { | ||
ChangeOptions, | ||
ChangeStreamOptions, | ||
Events, | ||
SyncOptions, | ||
JSONSchema, | ||
ProcessChangeStreamRecords, | ||
ProcessInitialScanRecords, | ||
ScanOptions, | ||
ChangeOptions, | ||
ChangeStreamOptions, | ||
JSONSchema, | ||
State, | ||
SimpleState, | ||
SortField, | ||
State, | ||
SyncOptions, | ||
} from './types.js' | ||
import _debug from 'debug' | ||
import type { Redis } from 'ioredis' | ||
import { batchQueue, defer, Deferred, QueueOptions } from 'prom-utils' | ||
import { | ||
@@ -31,3 +35,3 @@ generatePipelineFromOmit, | ||
omitFieldForUpdate, | ||
removeMetadata, | ||
removeUnusedFields, | ||
safelyCheckNext, | ||
@@ -37,5 +41,2 @@ setDefaults, | ||
} from './util.js' | ||
import EventEmitter from 'eventemitter3' | ||
import ms from 'ms' | ||
import { fsm, StateTransitions } from 'simple-machines' | ||
@@ -49,5 +50,6 @@ const debug = _debug('mongochangestream') | ||
*/ | ||
export const getKeys = (collection: Collection) => { | ||
export const getKeys = (collection: Collection, options: SyncOptions) => { | ||
const collectionKey = getCollectionKey(collection) | ||
const collectionPrefix = `${keyPrefix}:${collectionKey}` | ||
const uniqueId = options.uniqueId ? `:${options.uniqueId}` : '' | ||
const collectionPrefix = `${keyPrefix}:${collectionKey}${uniqueId}` | ||
const scanCompletedKey = `${collectionPrefix}:initialScanCompletedOn` | ||
@@ -90,3 +92,3 @@ const lastScanIdKey = `${collectionPrefix}:lastScanId` | ||
) { | ||
const keys = getKeys(collection) | ||
const keys = getKeys(collection, options) | ||
const { omit } = options | ||
@@ -458,4 +460,8 @@ const emitter = new EventEmitter<Events | ExtendedEvents>() | ||
const interval = options.interval || ms('1m') | ||
const shouldRemoveMetadata = options.shouldRemoveMetadata | ||
const maybeRemoveMetadata = when(shouldRemoveMetadata, removeMetadata) | ||
const shouldRemoveUnusedFields = | ||
options.shouldRemoveUnusedFields || options.shouldRemoveMetadata | ||
const maybeRemoveUnusedFields = when( | ||
shouldRemoveUnusedFields, | ||
removeUnusedFields | ||
) | ||
const state = fsm(simpleStateTransistions, 'stopped', { | ||
@@ -470,7 +476,7 @@ name: 'detectSchemaChange', | ||
if (schema) { | ||
return maybeRemoveMetadata(schema) | ||
return maybeRemoveUnusedFields(schema) | ||
} | ||
}) | ||
if (!previousSchema) { | ||
const schema = await getCollectionSchema(db).then(maybeRemoveMetadata) | ||
const schema = await getCollectionSchema(db).then(maybeRemoveUnusedFields) | ||
// Persist schema | ||
@@ -483,4 +489,5 @@ await redis.setnx(keys.schemaKey, JSON.stringify(schema)) | ||
const checkForSchemaChange = async () => { | ||
const currentSchema = | ||
await getCollectionSchema(db).then(maybeRemoveMetadata) | ||
const currentSchema = await getCollectionSchema(db).then( | ||
maybeRemoveUnusedFields | ||
) | ||
// Schemas are no longer the same | ||
@@ -487,0 +494,0 @@ if (!_.isEqual(currentSchema, previousSchema)) { |
import { | ||
AggregationCursor, | ||
ChangeStream, | ||
AggregationCursor, | ||
ChangeStreamDocument, | ||
ChangeStreamInsertDocument, | ||
Document, | ||
MongoAPIError, | ||
MongoServerError, | ||
MongoAPIError, | ||
} from 'mongodb' | ||
@@ -29,2 +29,9 @@ | ||
omit?: string[] | ||
/** | ||
* Added to all Redis keys to allow the same collection | ||
* to be synced in parallel. Otherwise, the Redis keys | ||
* would be the same for a given collection and parallel | ||
* syncing jobs would overwrite each other. | ||
*/ | ||
uniqueId?: string | ||
} | ||
@@ -56,4 +63,10 @@ | ||
interval?: number | ||
/** Should fields like title and description be ignored when detecting a change. */ | ||
/** @deprecated Use shouldRemoveUnusedFields instead.*/ | ||
shouldRemoveMetadata?: boolean | ||
/** | ||
* Remove fields that are not used when converting the schema | ||
* in a downstream library like mongo2elastic or mongo2crate. | ||
* Preserves bsonType, properties, additionalProperties, items, and enum. | ||
*/ | ||
shouldRemoveUnusedFields?: boolean | ||
} | ||
@@ -60,0 +73,0 @@ |
@@ -0,48 +1,51 @@ | ||
import assert from 'node:assert' | ||
import { describe, test } from 'node:test' | ||
import assert from 'node:assert' | ||
import { generatePipelineFromOmit } from './util.js' | ||
import { generatePipelineFromOmit, removeUnusedFields } from './util.js' | ||
describe('util', () => { | ||
test('should generate pipeline from omit with no dotted fields', () => { | ||
const pipeline = generatePipelineFromOmit(['documents', 'createdAt']) | ||
assert.deepEqual(pipeline, [ | ||
{ | ||
$unset: [ | ||
'fullDocument.documents', | ||
'updateDescription.updatedFields.documents', | ||
'fullDocument.createdAt', | ||
'updateDescription.updatedFields.createdAt', | ||
], | ||
}, | ||
]) | ||
}) | ||
describe('generatePipelineFromOmit', () => { | ||
test('should generate pipeline from omit with no dotted fields', () => { | ||
const pipeline = generatePipelineFromOmit(['documents', 'createdAt']) | ||
assert.deepEqual(pipeline, [ | ||
{ | ||
$unset: [ | ||
'fullDocument.documents', | ||
'updateDescription.updatedFields.documents', | ||
'fullDocument.createdAt', | ||
'updateDescription.updatedFields.createdAt', | ||
], | ||
}, | ||
]) | ||
}) | ||
test('should generate pipeline from omit with dotted fields', () => { | ||
const pipeline = generatePipelineFromOmit([ | ||
'documents.agenda.parsedText', | ||
'documents.agenda.contentType', | ||
'createdAt', | ||
]) | ||
assert.deepEqual(pipeline, [ | ||
{ | ||
$unset: [ | ||
'fullDocument.documents.agenda.parsedText', | ||
'updateDescription.updatedFields.documents.agenda.parsedText', | ||
'fullDocument.documents.agenda.contentType', | ||
'updateDescription.updatedFields.documents.agenda.contentType', | ||
'fullDocument.createdAt', | ||
'updateDescription.updatedFields.createdAt', | ||
], | ||
}, | ||
{ | ||
$set: { | ||
'updateDescription.updatedFields': { | ||
$arrayToObject: { | ||
$filter: { | ||
input: { $objectToArray: '$updateDescription.updatedFields' }, | ||
cond: { | ||
$regexMatch: { | ||
input: '$$this.k', | ||
regex: | ||
'^(?!documents\\.agenda\\.parsedText|documents\\.agenda\\.contentType)', | ||
test('should generate pipeline from omit with dotted fields', () => { | ||
const pipeline = generatePipelineFromOmit([ | ||
'documents.agenda.parsedText', | ||
'documents.agenda.contentType', | ||
'createdAt', | ||
]) | ||
assert.deepEqual(pipeline, [ | ||
{ | ||
$unset: [ | ||
'fullDocument.documents.agenda.parsedText', | ||
'updateDescription.updatedFields.documents.agenda.parsedText', | ||
'fullDocument.documents.agenda.contentType', | ||
'updateDescription.updatedFields.documents.agenda.contentType', | ||
'fullDocument.createdAt', | ||
'updateDescription.updatedFields.createdAt', | ||
], | ||
}, | ||
{ | ||
$set: { | ||
'updateDescription.updatedFields': { | ||
$arrayToObject: { | ||
$filter: { | ||
input: { $objectToArray: '$updateDescription.updatedFields' }, | ||
cond: { | ||
$regexMatch: { | ||
input: '$$this.k', | ||
regex: | ||
'^(?!documents\\.agenda\\.parsedText|documents\\.agenda\\.contentType)', | ||
}, | ||
}, | ||
@@ -54,5 +57,69 @@ }, | ||
}, | ||
}, | ||
]) | ||
]) | ||
}) | ||
}) | ||
describe('removeUnusedFields', () => { | ||
test('should remove all unneeded fields', () => { | ||
const schema = { | ||
bsonType: 'object', | ||
required: ['a', 'b'], | ||
additionalProperties: false, | ||
properties: { | ||
a: { | ||
title: 'An array of strings', | ||
bsonType: 'array', | ||
items: { | ||
bsonType: 'string', | ||
title: 'A string', | ||
}, | ||
}, | ||
b: { | ||
description: 'foo or bar', | ||
bsonType: 'string', | ||
enum: ['foo', 'bar'], | ||
oneOf: [ | ||
{ bsonType: 'string', const: 'foo' }, | ||
{ bsonType: 'string', const: 'bar' }, | ||
], | ||
}, | ||
c: { | ||
bsonType: 'object', | ||
additionalProperties: true, | ||
properties: { | ||
d: { | ||
bsonType: 'number', | ||
description: 'A number', | ||
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
removeUnusedFields(schema) | ||
assert.deepEqual(schema, { | ||
bsonType: 'object', | ||
additionalProperties: false, | ||
properties: { | ||
a: { | ||
bsonType: 'array', | ||
items: { | ||
bsonType: 'string', | ||
}, | ||
}, | ||
b: { | ||
bsonType: 'string', | ||
enum: ['foo', 'bar'], | ||
}, | ||
c: { | ||
bsonType: 'object', | ||
additionalProperties: true, | ||
properties: { | ||
d: { | ||
bsonType: 'number', | ||
}, | ||
}, | ||
}, | ||
}, | ||
}) | ||
}) | ||
}) | ||
}) |
@@ -1,7 +0,8 @@ | ||
import { Collection, MongoServerError } from 'mongodb' | ||
import _debug from 'debug' | ||
import _ from 'lodash/fp.js' | ||
import { Node, walkie } from 'obj-walker' | ||
import { Cursor, CursorError, JSONSchema } from './types.js' | ||
import _debug from 'debug' | ||
import { type Collection, MongoServerError } from 'mongodb' | ||
import { type Node, walkEach } from 'obj-walker' | ||
import type { Cursor, CursorError, JSONSchema } from './types.js' | ||
const debug = _debug('mongochangestream') | ||
@@ -53,3 +54,3 @@ | ||
export const omitFields = (omitPaths: string[]) => | ||
_.omitBy((val, key) => | ||
_.omitBy((_val, key) => | ||
_.find((omitPath) => _.startsWith(`${omitPath}.`, key), omitPaths) | ||
@@ -67,15 +68,25 @@ ) | ||
const usedSchemaFields = [ | ||
'bsonType', | ||
'properties', | ||
'additionalProperties', | ||
'items', | ||
'enum', | ||
] | ||
/** | ||
* Remove title and description from a JSON schema. | ||
* Remove unused schema fields | ||
*/ | ||
export const removeMetadata = (schema: JSONSchema): JSONSchema => { | ||
export const removeUnusedFields = (schema: JSONSchema): JSONSchema => { | ||
const walkFn = ({ val }: Node) => { | ||
if ('title' in val) { | ||
delete val.title | ||
for (const key in val) { | ||
if (!usedSchemaFields.includes(key)) { | ||
delete val[key] | ||
} | ||
} | ||
if ('description' in val) { | ||
delete val.description | ||
} | ||
} | ||
return walkie(schema, walkFn, { traverse: traverseSchema }) | ||
return walkEach(schema, walkFn, { | ||
traverse: traverseSchema, | ||
modifyInPlace: true, | ||
}) | ||
} | ||
@@ -82,0 +93,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No repository
Supply chain riskPackage does not have a linked source code repository. Without this field, a package will have no reference to the location of the source code use to generate the package.
Found 1 instance in 1 package
102203
2534
0
+ Addedprom-utils@0.9.0(transitive)
Updateddebug@^4.3.5
Updatedprom-utils@^0.9.0