mongochangestream
Advanced tools
Comparing version 0.43.2 to 0.44.0
@@ -0,1 +1,5 @@ | ||
# 0.44.0 | ||
- Bump dependencies. | ||
# 0.43.2 | ||
@@ -2,0 +6,0 @@ |
@@ -33,3 +33,3 @@ import { Collection, Db } from 'mongodb'; | ||
change: (newState: State) => void; | ||
waitForChange: (...newStates: State[]) => Promise<boolean>; | ||
waitForChange: (...newStates: State[]) => Promise<void>; | ||
is: (...states: State[]) => boolean; | ||
@@ -55,3 +55,3 @@ canChange: (newState: State) => boolean; | ||
change: (newState: State) => void; | ||
waitForChange: (...newStates: State[]) => Promise<boolean>; | ||
waitForChange: (...newStates: State[]) => Promise<void>; | ||
is: (...states: State[]) => boolean; | ||
@@ -58,0 +58,0 @@ canChange: (newState: State) => boolean; |
{ | ||
"name": "mongochangestream", | ||
"version": "0.43.2", | ||
"version": "0.44.0", | ||
"description": "Sync MongoDB collections via change streams into any database.", | ||
@@ -10,3 +10,3 @@ "author": "GovSpend", | ||
"scripts": { | ||
"prepare": "npm run lint && npm run build", | ||
"prepare": "npm run lint && npm run test && npm run build", | ||
"clean": "rm -rf dist", | ||
@@ -16,3 +16,5 @@ "build": "npm run fmt && npm run clean && tsc --declaration --project ./tsconfig-prod.json", | ||
"lint": "eslint src/**", | ||
"fmt": "prettier --ignore-path .gitignore --write './'" | ||
"fmt": "prettier --ignore-path .gitignore --write './'", | ||
"test": "node --test --test-force-exit", | ||
"test:only": "DEBUG=* node --test --test-only --test-force-exit" | ||
}, | ||
@@ -39,10 +41,10 @@ "keywords": [ | ||
"devDependencies": { | ||
"@faker-js/faker": "^7.6.0", | ||
"@types/debug": "^4.1.7", | ||
"@types/lodash": "^4.14.182", | ||
"@typescript-eslint/eslint-plugin": "^5.59.1", | ||
"eslint": "^8.39.0", | ||
"ioredis": "5.3.2", | ||
"prettier": "^2.8.8", | ||
"typescript": "^5.0.4" | ||
"@faker-js/faker": "^8.4.1", | ||
"@types/debug": "^4.1.12", | ||
"@types/lodash": "^4.17.1", | ||
"@types/node": "^20.12.12", | ||
"@typescript-eslint/eslint-plugin": "^7.9.0", | ||
"dotenv": "^16.4.5", | ||
"prettier": "^3.2.5", | ||
"typescript": "^5.4.5" | ||
}, | ||
@@ -54,5 +56,5 @@ "dependencies": { | ||
"ms": "^2.1.3", | ||
"obj-walker": "^1.7.0", | ||
"prom-utils": "^0.5.0", | ||
"simple-machines": "^0.3.0" | ||
"obj-walker": "^2.2.0", | ||
"prom-utils": "^0.8.0", | ||
"simple-machines": "^0.4.0" | ||
}, | ||
@@ -59,0 +61,0 @@ "peerDependencies": { |
/** | ||
* To run: MONGO_CONN="[conn string]" node dist/mongoChangeStream.test.js | ||
* To run add a local .env file with MONGO_CONN | ||
*/ | ||
import 'dotenv/config' | ||
import _ from 'lodash/fp.js' | ||
import { test } from 'node:test' | ||
import { describe, test } from 'node:test' | ||
import assert from 'node:assert' | ||
@@ -50,7 +51,7 @@ import { initSync } from './mongoChangeStream.js' | ||
const genUser = () => ({ | ||
name: faker.name.fullName(), | ||
name: faker.person.fullName(), | ||
address: { | ||
city: faker.address.city(), | ||
state: faker.address.state(), | ||
zipCode: faker.address.zipCode(), | ||
city: faker.location.city(), | ||
state: faker.location.state(), | ||
zipCode: faker.location.zipCode(), | ||
}, | ||
@@ -105,642 +106,645 @@ createdAt: faker.date.past(), | ||
test('should complete initial scan', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
describe('syncing', () => { | ||
test('starting change stream is idempotent', async () => { | ||
const sync = await getSync() | ||
// Change stream | ||
const processRecords = async () => { | ||
await setTimeout(5) | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
// Start twice | ||
changeStream.start() | ||
changeStream.start() | ||
await changeStream.stop() | ||
}) | ||
const processed = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(50) | ||
processed.push(...docs) | ||
} | ||
const scanOptions = { batchSize: 100 } | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
// Wait for initial scan to complete | ||
await initialScan.start() | ||
assert.equal(processed.length, numDocs) | ||
// Stop | ||
await initialScan.stop() | ||
}) | ||
test('stopping change stream is idempotent', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
test('should exit cleanly if initial scan is already complete', async () => { | ||
const { coll, db, redis } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
// Mark initial scan complete | ||
await redis.set(sync.keys.scanCompletedKey, new Date().toString()) | ||
// Change stream | ||
const processRecords = async () => { | ||
await setTimeout(5) | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
changeStream.start() | ||
// Change documents | ||
await coll.updateMany({}, { $set: { createdAt: new Date('2022-01-03') } }) | ||
// Stop twice | ||
await changeStream.stop() | ||
await changeStream.stop() | ||
}) | ||
const processRecords = async () => { | ||
await setTimeout(50) | ||
} | ||
const scanOptions = { batchSize: 100 } | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
await initialScan.start() | ||
await initialScan.stop() | ||
}) | ||
test('starting initial scan is idempotent', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
test('should run initial scan in reverse sort order', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
const processRecords = async () => { | ||
await setTimeout(50) | ||
} | ||
const initialScan = await sync.runInitialScan(processRecords) | ||
// Start twice | ||
initialScan.start() | ||
initialScan.start() | ||
await initialScan.stop() | ||
}) | ||
const processed = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(50) | ||
processed.push(...docs) | ||
} | ||
const sortField: SortField<ObjectId> = { | ||
field: '_id', | ||
serialize: _.toString, | ||
deserialize: (x: string) => new ObjectId(x), | ||
order: 'desc', | ||
} | ||
const scanOptions = { batchSize: 100, sortField } | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
// Wait for initial scan to complete | ||
await initialScan.start() | ||
assert.equal(processed.length, numDocs) | ||
// Stop | ||
await initialScan.stop() | ||
}) | ||
test('stopping initial scan is idempotent', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
test('should omit fields from initial scan', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync({ omit: ['name'] }) | ||
await initState(sync, db, coll) | ||
const processRecords = async () => { | ||
await setTimeout(50) | ||
} | ||
const initialScan = await sync.runInitialScan(processRecords) | ||
initialScan.start() | ||
await setTimeout(500) | ||
// Stop twice | ||
await initialScan.stop() | ||
await initialScan.stop() | ||
}) | ||
const documents: Document[] = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(50) | ||
documents.push(docs[0].fullDocument) | ||
} | ||
const scanOptions = { batchSize: 100 } | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
// Wait for initial scan to complete | ||
await initialScan.start() | ||
assert.equal(documents[0].name, undefined) | ||
// Stop | ||
await initialScan.stop() | ||
}) | ||
test('should complete initial scan', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
test('should complete initial scan if collection is empty', async () => { | ||
const { coll } = await getConns() | ||
const sync = await getSync() | ||
const processed = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(50) | ||
processed.push(...docs) | ||
} | ||
const scanOptions = { batchSize: 100 } | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
// Wait for initial scan to complete | ||
await initialScan.start() | ||
assert.equal(processed.length, numDocs) | ||
// Stop | ||
await initialScan.stop() | ||
}) | ||
// Reset state | ||
await sync.reset() | ||
await coll.deleteMany({}) | ||
test('should exit cleanly if initial scan is already complete', async () => { | ||
const { coll, db, redis } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
// Mark initial scan complete | ||
await redis.set(sync.keys.scanCompletedKey, new Date().toString()) | ||
const processed = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(50) | ||
processed.push(...docs) | ||
} | ||
let completed = false | ||
sync.emitter.on('initialScanComplete', () => { | ||
completed = true | ||
const processRecords = async () => { | ||
await setTimeout(50) | ||
} | ||
const scanOptions = { batchSize: 100 } | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
await initialScan.start() | ||
await initialScan.stop() | ||
}) | ||
const scanOptions = { batchSize: 100 } | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
// Wait for initial scan to complete | ||
await initialScan.start() | ||
assert.ok(completed) | ||
assert.equal(processed.length, 0) | ||
// Stop | ||
await initialScan.stop() | ||
}) | ||
test('initial scan should resume after stop', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
test('should run initial scan in reverse sort order', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
const processed = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(10) | ||
processed.push(...docs) | ||
} | ||
const scanOptions = { batchSize: 50 } | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
let completed = false | ||
sync.emitter.on('initialScanComplete', () => { | ||
completed = true | ||
const processed = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(50) | ||
processed.push(...docs) | ||
} | ||
const sortField: SortField<ObjectId> = { | ||
field: '_id', | ||
serialize: _.toString, | ||
deserialize: (x: string) => new ObjectId(x), | ||
order: 'desc', | ||
} | ||
const scanOptions = { batchSize: 100, sortField } | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
// Wait for initial scan to complete | ||
await initialScan.start() | ||
assert.equal(processed.length, numDocs) | ||
// Stop | ||
await initialScan.stop() | ||
}) | ||
let cursorError = false | ||
sync.emitter.on('cursorError', () => { | ||
cursorError = true | ||
test('should omit fields from initial scan', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync({ omit: ['name'] }) | ||
await initState(sync, db, coll) | ||
const documents: Document[] = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(50) | ||
documents.push(docs[0].fullDocument) | ||
} | ||
const scanOptions = { batchSize: 100 } | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
// Wait for initial scan to complete | ||
await initialScan.start() | ||
assert.equal(documents[0].name, undefined) | ||
// Stop | ||
await initialScan.stop() | ||
}) | ||
// Start | ||
initialScan.start() | ||
// Allow for some records to be processed | ||
await setTimeout(500) | ||
// Stop the initial scan | ||
await initialScan.stop() | ||
// Should not emit cursorError when stopping | ||
assert.equal(cursorError, false) | ||
// Wait for the initial scan to complete | ||
initialScan.start() | ||
// Add some more records | ||
await populateCollection(coll, 10) | ||
await setTimeout(ms('5s')) | ||
assert.ok(completed) | ||
assert.equal(processed.length, numDocs + 10) | ||
// Stop | ||
await initialScan.stop() | ||
}) | ||
test('initial scan should not be marked as completed if connection is closed', async () => { | ||
// Get a new connection since we're closing the connection in the test | ||
const { coll, redis, db, client } = await getConns({}) | ||
const sync = initSync(redis, coll) | ||
sync.emitter.on('stateChange', console.log) | ||
sync.emitter.on('cursorError', console.log) | ||
await initState(sync, db, coll) | ||
test('should complete initial scan if collection is empty', async () => { | ||
const { coll } = await getConns() | ||
const sync = await getSync() | ||
const processed = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(10) | ||
processed.push(...docs) | ||
} | ||
const scanOptions = { batchSize: 50 } | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
// Start | ||
initialScan.start() | ||
// Allow for some records to be processed | ||
await setTimeout(200) | ||
// Close the connection. | ||
await client.close() | ||
// Allow for some time | ||
await setTimeout(100) | ||
// Check if completed | ||
const completedAt = await redis.get(sync.keys.scanCompletedKey) | ||
assert.equal(completedAt, null) | ||
// Stop | ||
await initialScan.stop() | ||
}) | ||
// Reset state | ||
await sync.reset() | ||
await coll.deleteMany({}) | ||
test('initial scan should support custom pipeline', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
const processed = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(50) | ||
processed.push(...docs) | ||
} | ||
let completed = false | ||
sync.emitter.on('initialScanComplete', () => { | ||
completed = true | ||
}) | ||
const scanOptions = { batchSize: 100 } | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
// Wait for initial scan to complete | ||
await initialScan.start() | ||
assert.ok(completed) | ||
assert.equal(processed.length, 0) | ||
// Stop | ||
await initialScan.stop() | ||
}) | ||
sync.emitter.on('stateChange', console.log) | ||
test('initial scan should resume after stop', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
const documents: Document[] = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(10) | ||
documents.push(docs[0].fullDocument) | ||
} | ||
const scanOptions: QueueOptions & ScanOptions = { | ||
batchSize: 50, | ||
pipeline: [ | ||
{ | ||
$addFields: { | ||
cityState: { $concat: ['$address.city', '-', '$address.state'] }, | ||
}, | ||
}, | ||
], | ||
} | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
// Start | ||
initialScan.start() | ||
// Allow for some records to be processed | ||
await setTimeout(500) | ||
// Stop | ||
await initialScan.stop() | ||
assert.ok(documents[0].cityState) | ||
}) | ||
const processed = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(15) | ||
processed.push(...docs) | ||
} | ||
const scanOptions = { batchSize: 25 } | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
let completed = false | ||
sync.emitter.on('initialScanComplete', () => { | ||
completed = true | ||
}) | ||
let cursorError = false | ||
sync.emitter.on('cursorError', () => { | ||
cursorError = true | ||
}) | ||
// Start | ||
initialScan.start() | ||
// Allow for some records to be processed | ||
await setTimeout(500) | ||
// Stop the initial scan | ||
await initialScan.stop() | ||
// Only a subset of the documents were processed | ||
assert.ok(processed.length < numDocs) | ||
// Should not emit cursorError when stopping | ||
assert.equal(cursorError, false) | ||
// Wait for the initial scan to complete | ||
await initialScan.start() | ||
assert.ok(completed) | ||
assert.equal(processed.length, numDocs) | ||
// Stop | ||
await initialScan.stop() | ||
}) | ||
test('should process records via change stream', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
test('initial scan should not be marked as completed if connection is closed', async () => { | ||
// Get a new connection since we're closing the connection in the test | ||
const { coll, redis, db, client } = await getConns({}) | ||
const sync = initSync(redis, coll) | ||
sync.emitter.on('stateChange', console.log) | ||
sync.emitter.on('cursorError', console.log) | ||
await initState(sync, db, coll) | ||
let cursorError = false | ||
sync.emitter.on('cursorError', () => { | ||
cursorError = true | ||
const processed = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(10) | ||
processed.push(...docs) | ||
} | ||
const scanOptions = { batchSize: 50 } | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
// Start | ||
initialScan.start() | ||
// Allow for some records to be processed | ||
await setTimeout(200) | ||
// Close the connection. | ||
await client.close() | ||
// Allow for some time | ||
await setTimeout(100) | ||
// Check if completed | ||
const completedAt = await redis.get(sync.keys.scanCompletedKey) | ||
assert.equal(completedAt, null) | ||
// Stop | ||
await initialScan.stop() | ||
}) | ||
const processed = [] | ||
const processRecords = async (docs: ChangeStreamDocument[]) => { | ||
for (const doc of docs) { | ||
await setTimeout(5) | ||
processed.push(doc) | ||
test('initial scan should support custom pipeline', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
sync.emitter.on('stateChange', console.log) | ||
const documents: Document[] = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(10) | ||
documents.push(docs[0].fullDocument) | ||
} | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
// Start | ||
changeStream.start() | ||
await setTimeout(ms('1s')) | ||
// Update records | ||
coll.updateMany({}, { $set: { createdAt: new Date('2022-01-01') } }) | ||
// Wait for the change stream events to be processed | ||
await setTimeout(ms('10s')) | ||
assert.equal(processed.length, numDocs) | ||
// Stop | ||
await changeStream.stop() | ||
// Should not emit cursorError when stopping | ||
assert.equal(cursorError, false) | ||
}) | ||
const scanOptions: QueueOptions & ScanOptions = { | ||
batchSize: 50, | ||
pipeline: [ | ||
{ | ||
$addFields: { | ||
cityState: { $concat: ['$address.city', '-', '$address.state'] }, | ||
}, | ||
}, | ||
], | ||
} | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
// Start | ||
initialScan.start() | ||
// Allow for some records to be processed | ||
await setTimeout(500) | ||
// Stop | ||
await initialScan.stop() | ||
assert.ok(documents[0].cityState) | ||
}) | ||
test('should omit fields from change stream', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync({ omit: ['address.city'] }) | ||
await initState(sync, db, coll) | ||
test('should process records via change stream', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
const documents: Document[] = [] | ||
const processRecords = async (docs: ChangeStreamDocument[]) => { | ||
for (const doc of docs) { | ||
await setTimeout(5) | ||
if (doc.operationType === 'update' && doc.fullDocument) { | ||
documents.push(doc) | ||
let cursorError = false | ||
sync.emitter.on('cursorError', () => { | ||
cursorError = true | ||
}) | ||
const processed = [] | ||
const processRecords = async (docs: ChangeStreamDocument[]) => { | ||
for (const doc of docs) { | ||
await setTimeout(5) | ||
processed.push(doc) | ||
} | ||
} | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
// Start | ||
changeStream.start() | ||
await setTimeout(ms('1s')) | ||
// Update records | ||
coll.updateMany( | ||
{}, | ||
{ $set: { name: 'unknown', 'address.city': 'San Diego' } } | ||
) | ||
// Wait for the change stream events to be processed | ||
await setTimeout(ms('2s')) | ||
assert.equal(documents[0].fullDocument.address.city, undefined) | ||
assert.equal( | ||
documents[0].updateDescription.updatedFields['address.city'], | ||
undefined | ||
) | ||
// Stop | ||
await changeStream.stop() | ||
}) | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
// Start | ||
changeStream.start() | ||
await setTimeout(ms('1s')) | ||
// Update records | ||
coll.updateMany({}, { $set: { createdAt: new Date('2022-01-01') } }) | ||
// Wait for the change stream events to be processed | ||
await setTimeout(ms('10s')) | ||
assert.equal(processed.length, numDocs) | ||
// Stop | ||
await changeStream.stop() | ||
// Should not emit cursorError when stopping | ||
assert.equal(cursorError, false) | ||
}) | ||
test('should omit nested fields when parent field is omitted from change stream', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync({ omit: ['address'] }) | ||
await initState(sync, db, coll) | ||
test('should omit fields from change stream', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync({ omit: ['address.city'] }) | ||
await initState(sync, db, coll) | ||
const documents: Document[] = [] | ||
const processRecords = async (docs: ChangeStreamDocument[]) => { | ||
for (const doc of docs) { | ||
await setTimeout(5) | ||
if (doc.operationType === 'update' && doc.fullDocument) { | ||
documents.push(doc.fullDocument) | ||
const documents: Document[] = [] | ||
const processRecords = async (docs: ChangeStreamDocument[]) => { | ||
for (const doc of docs) { | ||
await setTimeout(5) | ||
if (doc.operationType === 'update' && doc.fullDocument) { | ||
documents.push(doc) | ||
} | ||
} | ||
} | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
// Start | ||
changeStream.start() | ||
await setTimeout(ms('1s')) | ||
// Update records | ||
coll.updateMany({}, { $set: { 'address.zipCode': '90210' } }) | ||
// Wait for the change stream events to be processed | ||
await setTimeout(ms('2s')) | ||
assert.equal(documents[0].address?.zipCode, undefined) | ||
// Stop | ||
await changeStream.stop() | ||
}) | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
// Start | ||
changeStream.start() | ||
await setTimeout(ms('1s')) | ||
// Update records | ||
coll.updateMany( | ||
{}, | ||
{ $set: { name: 'unknown', 'address.city': 'San Diego' } } | ||
) | ||
// Wait for the change stream events to be processed | ||
await setTimeout(ms('2s')) | ||
assert.equal(documents[0].fullDocument.address.city, undefined) | ||
assert.equal( | ||
documents[0].updateDescription.updatedFields['address.city'], | ||
undefined | ||
) | ||
// Stop | ||
await changeStream.stop() | ||
}) | ||
test('should omit operation types from change stream', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
test('should omit nested fields when parent field is omitted from change stream', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync({ omit: ['address'] }) | ||
await initState(sync, db, coll) | ||
const operations: string[] = [] | ||
const processRecords = async (docs: ChangeStreamDocument[]) => { | ||
for (const doc of docs) { | ||
await setTimeout(5) | ||
operations.push(doc.operationType) | ||
const documents: Document[] = [] | ||
const processRecords = async (docs: ChangeStreamDocument[]) => { | ||
for (const doc of docs) { | ||
await setTimeout(5) | ||
if (doc.operationType === 'update' && doc.fullDocument) { | ||
documents.push(doc.fullDocument) | ||
} | ||
} | ||
} | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords, { | ||
operationTypes: ['insert'], | ||
// Short timeout since only event will be queued | ||
timeout: 500, | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
// Start | ||
changeStream.start() | ||
await setTimeout(ms('1s')) | ||
// Update records | ||
coll.updateMany({}, { $set: { 'address.zipCode': '90210' } }) | ||
// Wait for the change stream events to be processed | ||
await setTimeout(ms('2s')) | ||
assert.equal(documents[0].address?.zipCode, undefined) | ||
// Stop | ||
await changeStream.stop() | ||
}) | ||
// Start | ||
changeStream.start() | ||
await setTimeout(ms('1s')) | ||
// Update records | ||
await coll.updateMany({}, { $set: { name: 'unknown' } }) | ||
// Insert record | ||
await coll.insertOne(genUser()) | ||
// Wait for the change stream events to be processed | ||
await setTimeout(ms('2s')) | ||
assert.deepEqual(_.uniq(operations), ['insert']) | ||
// Stop | ||
await changeStream.stop() | ||
}) | ||
test('change stream should resume properly', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
test('should omit operation types from change stream', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
const processed = [] | ||
// Change stream | ||
const processRecords = async (docs: ChangeStreamDocument[]) => { | ||
for (const doc of docs) { | ||
await setTimeout(5) | ||
processed.push(doc) | ||
const operations: string[] = [] | ||
const processRecords = async (docs: ChangeStreamDocument[]) => { | ||
for (const doc of docs) { | ||
await setTimeout(5) | ||
operations.push(doc.operationType) | ||
} | ||
} | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
changeStream.start() | ||
// Let change stream connect | ||
await setTimeout(ms('1s')) | ||
// Change documents | ||
coll.updateMany({}, { $set: { createdAt: new Date('2022-01-02') } }) | ||
// Wait for some change stream events to be processed | ||
await setTimeout(ms('2s')) | ||
// Stop | ||
await changeStream.stop() | ||
// Resume change stream | ||
changeStream.start() | ||
// Wait for all documents to be processed | ||
await setTimeout(ms('10s')) | ||
// All change stream docs were processed | ||
assert.equal(processed.length, numDocs) | ||
await changeStream.stop() | ||
}) | ||
test('change stream handle missing oplog entry properly', async () => { | ||
const { coll, db, redis } = await getConns() | ||
const sync = await getSync() | ||
let cursorError: any | ||
sync.emitter.on('cursorError', (event: CursorErrorEvent) => { | ||
cursorError = event | ||
const changeStream = await sync.processChangeStream(processRecords, { | ||
operationTypes: ['insert'], | ||
// Short timeout since only event will be queued | ||
timeout: 500, | ||
}) | ||
// Start | ||
changeStream.start() | ||
await setTimeout(ms('1s')) | ||
// Update records | ||
await coll.updateMany({}, { $set: { name: 'unknown' } }) | ||
// Insert record | ||
await coll.insertOne(genUser()) | ||
// Wait for the change stream events to be processed | ||
await setTimeout(ms('2s')) | ||
assert.deepEqual(_.uniq(operations), ['insert']) | ||
// Stop | ||
await changeStream.stop() | ||
}) | ||
await initState(sync, db, coll) | ||
test('change stream should resume properly', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
// Set missing token key | ||
await redis.set( | ||
sync.keys.changeStreamTokenKey, | ||
'{"_data":"8263F51B8F000000012B022C0100296E5A1004F852F6C89F924F0A8711460F0C1FBD8846645F6964006463F51B8FD1AACE003022EFC80004"}' | ||
) | ||
// Change stream | ||
const processRecords = async () => { | ||
await setTimeout(5) | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
changeStream.start() | ||
// Let change stream connect | ||
await setTimeout(ms('1s')) | ||
assert.ok(missingOplogEntry(cursorError.error)) | ||
await changeStream.stop() | ||
}) | ||
test('change stream handle invalid oplog entry properly', async () => { | ||
const { coll, db, redis } = await getConns() | ||
const sync = await getSync() | ||
let cursorError: any | ||
sync.emitter.on('cursorError', (event: CursorErrorEvent) => { | ||
cursorError = event | ||
const processed = [] | ||
// Change stream | ||
const processRecords = async (docs: ChangeStreamDocument[]) => { | ||
for (const doc of docs) { | ||
await setTimeout(8) | ||
processed.push(doc) | ||
} | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
changeStream.start() | ||
// Let change stream connect | ||
await setTimeout(ms('1s')) | ||
// Change documents | ||
coll.updateMany({}, { $set: { createdAt: new Date('2022-01-02') } }) | ||
// Wait for some change stream events to be processed | ||
await setTimeout(ms('2s')) | ||
// Only a subset of the documents were processed | ||
assert.ok(processed.length < numDocs) | ||
// Stop | ||
await changeStream.stop() | ||
// Resume change stream | ||
changeStream.start() | ||
// Wait for all documents to be processed | ||
await setTimeout(ms('8s')) | ||
// All change stream docs were processed | ||
assert.equal(processed.length, numDocs) | ||
await changeStream.stop() | ||
}) | ||
await initState(sync, db, coll) | ||
test('change stream handle missing oplog entry properly', async () => { | ||
const { coll, db, redis } = await getConns() | ||
const sync = await getSync() | ||
let cursorError: any | ||
sync.emitter.on('cursorError', (event: CursorErrorEvent) => { | ||
cursorError = event | ||
}) | ||
// Set missing token key | ||
await redis.set(sync.keys.changeStreamTokenKey, '{"_data":"123"}') | ||
await initState(sync, db, coll) | ||
// Change stream | ||
const processRecord = async () => { | ||
await setTimeout(5) | ||
} | ||
const changeStream = await sync.processChangeStream(processRecord) | ||
changeStream.start() | ||
// Let change stream connect | ||
await setTimeout(ms('1s')) | ||
// Set missing token key | ||
await redis.set( | ||
sync.keys.changeStreamTokenKey, | ||
'{"_data":"8263F51B8F000000012B022C0100296E5A1004F852F6C89F924F0A8711460F0C1FBD8846645F6964006463F51B8FD1AACE003022EFC80004"}' | ||
) | ||
assert.ok(missingOplogEntry(cursorError.error)) | ||
await changeStream.stop() | ||
}) | ||
// Change stream | ||
const processRecords = async () => { | ||
await setTimeout(5) | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
changeStream.start() | ||
// Let change stream connect | ||
await setTimeout(ms('1s')) | ||
test('change stream should handle empty collection', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
let cursorError = false | ||
sync.emitter.on('cursorError', () => { | ||
cursorError = true | ||
assert.ok(missingOplogEntry(cursorError.error)) | ||
await changeStream.stop() | ||
}) | ||
await initState(sync, db, coll) | ||
// Delete all documents | ||
await coll.deleteMany({}) | ||
test('change stream handle invalid oplog entry properly', async () => { | ||
const { coll, db, redis } = await getConns() | ||
const sync = await getSync() | ||
let cursorError: any | ||
sync.emitter.on('cursorError', (event: CursorErrorEvent) => { | ||
cursorError = event | ||
}) | ||
// Change stream | ||
const processRecords = async () => { | ||
await setTimeout(5) | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
changeStream.start() | ||
// Let change stream connect | ||
await setTimeout(ms('1s')) | ||
await initState(sync, db, coll) | ||
assert.equal(cursorError, false) | ||
await changeStream.stop() | ||
}) | ||
// Set missing token key | ||
await redis.set(sync.keys.changeStreamTokenKey, '{"_data":"123"}') | ||
test('should emit cursorError if change stream is closed', async () => { | ||
// Get a new connection since we're closing the connection in the test | ||
const { redis, coll, db, client } = await getConns({}) | ||
const sync = initSync(redis, coll) | ||
// Change stream | ||
const processRecord = async () => { | ||
await setTimeout(5) | ||
} | ||
const changeStream = await sync.processChangeStream(processRecord) | ||
changeStream.start() | ||
// Let change stream connect | ||
await setTimeout(ms('1s')) | ||
let error: any | ||
sync.emitter.on('cursorError', (event: CursorErrorEvent) => { | ||
console.log(event) | ||
error = event.error | ||
assert.ok(missingOplogEntry(cursorError.error)) | ||
await changeStream.stop() | ||
}) | ||
await initState(sync, db, coll) | ||
const processRecords = async () => { | ||
await setTimeout(500) | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
// Start | ||
changeStream.start() | ||
await setTimeout(ms('1s')) | ||
// Update records | ||
await coll.updateMany({}, { $set: { createdAt: new Date('2022-01-01') } }) | ||
// Close the connection. | ||
await client.close() | ||
await setTimeout(ms('8s')) | ||
assert.ok(error?.message) | ||
}) | ||
test('change stream should handle empty collection', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
let cursorError = false | ||
sync.emitter.on('cursorError', () => { | ||
cursorError = true | ||
}) | ||
test('starting change stream is idempotent', async () => { | ||
const sync = await getSync() | ||
// Change stream | ||
const processRecords = async () => { | ||
await setTimeout(5) | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
// Start twice | ||
changeStream.start() | ||
changeStream.start() | ||
await changeStream.stop() | ||
}) | ||
await initState(sync, db, coll) | ||
// Delete all documents | ||
await coll.deleteMany({}) | ||
test('stopping change stream is idempotent', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
// Change stream | ||
const processRecords = async () => { | ||
await setTimeout(5) | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
changeStream.start() | ||
// Let change stream connect | ||
await setTimeout(ms('1s')) | ||
// Change stream | ||
const processRecords = async () => { | ||
await setTimeout(5) | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
changeStream.start() | ||
// Change documents | ||
await coll.updateMany({}, { $set: { createdAt: new Date('2022-01-03') } }) | ||
// Stop twice | ||
await changeStream.stop() | ||
await changeStream.stop() | ||
}) | ||
assert.equal(cursorError, false) | ||
await changeStream.stop() | ||
}) | ||
test('starting initial scan is idempotent', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
test('should emit cursorError if change stream is closed', async () => { | ||
// Get a new connection since we're closing the connection in the test | ||
const { redis, coll, db, client } = await getConns({}) | ||
const sync = initSync(redis, coll) | ||
const processRecords = async () => { | ||
await setTimeout(50) | ||
} | ||
const initialScan = await sync.runInitialScan(processRecords) | ||
// Start twice | ||
initialScan.start() | ||
initialScan.start() | ||
await initialScan.stop() | ||
}) | ||
let error: any | ||
sync.emitter.on('cursorError', (event: CursorErrorEvent) => { | ||
console.log(event) | ||
error = event.error | ||
}) | ||
await initState(sync, db, coll) | ||
test('stopping initial scan is idempotent', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
const processRecords = async () => { | ||
await setTimeout(500) | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
// Start | ||
changeStream.start() | ||
await setTimeout(ms('1s')) | ||
// Update records | ||
await coll.updateMany({}, { $set: { createdAt: new Date('2022-01-01') } }) | ||
// Close the connection. | ||
await client.close() | ||
await setTimeout(ms('8s')) | ||
assert.ok(error?.message) | ||
}) | ||
const processRecords = async () => { | ||
await setTimeout(50) | ||
} | ||
const initialScan = await sync.runInitialScan(processRecords) | ||
initialScan.start() | ||
await setTimeout(500) | ||
// Stop twice | ||
await initialScan.stop() | ||
await initialScan.stop() | ||
}) | ||
test('Should resync when resync flag is set', async () => { | ||
const { coll, db, redis } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
test('Should resync when resync flag is set', async () => { | ||
const { coll, db, redis } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
let resyncTriggered = false | ||
const processed = [] | ||
let resyncTriggered = false | ||
const processed = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(50) | ||
processed.push(...docs) | ||
} | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(50) | ||
processed.push(...docs) | ||
} | ||
const initialScan = await sync.runInitialScan(processRecords) | ||
const resync = sync.detectResync(250) | ||
sync.emitter.on('resync', async () => { | ||
// Stop checking for resync | ||
resync.stop() | ||
resyncTriggered = true | ||
// Stop the initial scan | ||
await initialScan.stop() | ||
// Reset keys | ||
await sync.reset() | ||
// Reset processed | ||
processed.length = 0 | ||
// Start initial scan | ||
initialScan.start() | ||
}) | ||
// Start initial scan | ||
initialScan.start() | ||
// Start resync detection | ||
resync.start() | ||
// Allow for initial scan to start | ||
await setTimeout(500) | ||
// Trigger resync | ||
await redis.set(sync.keys.resyncKey, 1) | ||
const initialScan = await sync.runInitialScan(processRecords) | ||
const resync = sync.detectResync(250) | ||
sync.emitter.on('resync', async () => { | ||
// Stop checking for resync | ||
resync.stop() | ||
resyncTriggered = true | ||
// Stop the initial scan | ||
// Wait for initial scan to complete | ||
await setTimeout(ms('5s')) | ||
assert.ok(resyncTriggered) | ||
assert.equal(processed.length, numDocs) | ||
await initialScan.stop() | ||
// Reset keys | ||
await sync.reset() | ||
// Reset processed | ||
processed.length = 0 | ||
// Start initial scan | ||
initialScan.start() | ||
}) | ||
// Start initial scan | ||
initialScan.start() | ||
// Start resync detection | ||
resync.start() | ||
// Allow for initial scan to start | ||
await setTimeout(500) | ||
// Trigger resync | ||
await redis.set(sync.keys.resyncKey, 1) | ||
// Wait for initial scan to complete | ||
await setTimeout(ms('5s')) | ||
assert.ok(resyncTriggered) | ||
assert.equal(processed.length, numDocs) | ||
await initialScan.stop() | ||
}) | ||
test('Resync start/stop is idempotent', async () => { | ||
const sync = await getSync() | ||
test('Resync start/stop is idempotent', async () => { | ||
const sync = await getSync() | ||
const resync = sync.detectResync() | ||
resync.start() | ||
resync.start() | ||
resync.stop() | ||
resync.stop() | ||
}) | ||
const resync = sync.detectResync() | ||
resync.start() | ||
resync.start() | ||
resync.stop() | ||
resync.stop() | ||
}) | ||
test('Detect schema change', async () => { | ||
const { db, coll } = await getConns() | ||
const sync = await getSync() | ||
// Set schema | ||
await db.command({ | ||
collMod: coll.collectionName, | ||
validator: { $jsonSchema: schema }, | ||
test('Detect schema change', async () => { | ||
const { db, coll } = await getConns() | ||
const sync = await getSync() | ||
// Set schema | ||
await db.command({ | ||
collMod: coll.collectionName, | ||
validator: { $jsonSchema: schema }, | ||
}) | ||
// Look for a new schema every 250 ms | ||
const schemaChange = await sync.detectSchemaChange(db, { | ||
shouldRemoveMetadata: true, | ||
interval: 250, | ||
}) | ||
let newSchema: object = {} | ||
sync.emitter.on('schemaChange', ({ currentSchema }: SchemaChangeEvent) => { | ||
console.dir(currentSchema, { depth: 10 }) | ||
newSchema = currentSchema | ||
}) | ||
// Start detecting schema changes | ||
schemaChange.start() | ||
// Modify the schema | ||
const modifiedSchema = _.set( | ||
'properties.email', | ||
{ bsonType: 'string' }, | ||
schema | ||
) | ||
await db.command({ | ||
collMod: coll.collectionName, | ||
validator: { $jsonSchema: modifiedSchema }, | ||
}) | ||
await setTimeout(ms('1s')) | ||
assert.deepEqual(modifiedSchema, newSchema) | ||
schemaChange.stop() | ||
}) | ||
// Look for a new schema every 250 ms | ||
const schemaChange = await sync.detectSchemaChange(db, { | ||
shouldRemoveMetadata: true, | ||
interval: 250, | ||
}) | ||
let newSchema: object = {} | ||
sync.emitter.on('schemaChange', ({ currentSchema }: SchemaChangeEvent) => { | ||
console.dir(currentSchema, { depth: 10 }) | ||
newSchema = currentSchema | ||
}) | ||
// Start detecting schema changes | ||
schemaChange.start() | ||
// Modify the schema | ||
const modifiedSchema = _.set( | ||
'properties.email', | ||
{ bsonType: 'string' }, | ||
schema | ||
) | ||
await db.command({ | ||
collMod: coll.collectionName, | ||
validator: { $jsonSchema: modifiedSchema }, | ||
}) | ||
await setTimeout(ms('1s')) | ||
assert.deepEqual(modifiedSchema, newSchema) | ||
schemaChange.stop() | ||
}) | ||
test('Schema change start/stop is idempotent', async () => { | ||
const { db } = await getConns() | ||
const sync = await getSync() | ||
test('Schema change start/stop is idempotent', async () => { | ||
const { db } = await getConns() | ||
const sync = await getSync() | ||
const schemaChange = await sync.detectSchemaChange(db) | ||
schemaChange.start() | ||
schemaChange.start() | ||
schemaChange.stop() | ||
schemaChange.stop() | ||
}) | ||
const schemaChange = await sync.detectSchemaChange(db) | ||
schemaChange.start() | ||
schemaChange.start() | ||
schemaChange.stop() | ||
schemaChange.stop() | ||
}) | ||
test('can extend events', async () => { | ||
const { coll, redis } = await getConns({}) | ||
const sync = initSync<'foo' | 'bar'>(redis, coll) | ||
let emitted = '' | ||
sync.emitter.on('foo', (x: string) => { | ||
emitted = x | ||
test('can extend events', async () => { | ||
const { coll, redis } = await getConns({}) | ||
const sync = initSync<'foo' | 'bar'>(redis, coll) | ||
let emitted = '' | ||
sync.emitter.on('foo', (x: string) => { | ||
emitted = x | ||
}) | ||
sync.emitter.emit('foo', 'bar') | ||
assert.equal(emitted, 'bar') | ||
}) | ||
sync.emitter.emit('foo', 'bar') | ||
assert.equal(emitted, 'bar') | ||
}) |
@@ -97,3 +97,3 @@ import _ from 'lodash/fp.js' | ||
const detectResync = (resyncCheckInterval = ms('1m')) => { | ||
let resyncTimer: NodeJS.Timer | ||
let resyncTimer: NodeJS.Timeout | ||
const state = fsm(simpleStateTransistions, 'stopped', { | ||
@@ -462,3 +462,3 @@ name: 'detectResync', | ||
let timer: NodeJS.Timer | ||
let timer: NodeJS.Timeout | ||
// Check for a cached schema | ||
@@ -479,5 +479,4 @@ let previousSchema = await getCachedCollectionSchema().then((schema) => { | ||
const checkForSchemaChange = async () => { | ||
const currentSchema = await getCollectionSchema(db).then( | ||
maybeRemoveMetadata | ||
) | ||
const currentSchema = | ||
await getCollectionSchema(db).then(maybeRemoveMetadata) | ||
// Schemas are no longer the same | ||
@@ -484,0 +483,0 @@ if (!_.isEqual(currentSchema, previousSchema)) { |
@@ -1,47 +0,49 @@ | ||
import { test } from 'node:test' | ||
import { describe, test } from 'node:test' | ||
import assert from 'node:assert' | ||
import { generatePipelineFromOmit } from './util.js' | ||
test('should generate pipeline from omit with no dotted fields', async () => { | ||
const pipeline = generatePipelineFromOmit(['documents', 'createdAt']) | ||
assert.deepEqual(pipeline, [ | ||
{ | ||
$unset: [ | ||
'fullDocument.documents', | ||
'updateDescription.updatedFields.documents', | ||
'fullDocument.createdAt', | ||
'updateDescription.updatedFields.createdAt', | ||
], | ||
}, | ||
]) | ||
}) | ||
describe('util', () => { | ||
test('should generate pipeline from omit with no dotted fields', () => { | ||
const pipeline = generatePipelineFromOmit(['documents', 'createdAt']) | ||
assert.deepEqual(pipeline, [ | ||
{ | ||
$unset: [ | ||
'fullDocument.documents', | ||
'updateDescription.updatedFields.documents', | ||
'fullDocument.createdAt', | ||
'updateDescription.updatedFields.createdAt', | ||
], | ||
}, | ||
]) | ||
}) | ||
test('should generate pipeline from omit with dotted fields', async () => { | ||
const pipeline = generatePipelineFromOmit([ | ||
'documents.agenda.parsedText', | ||
'documents.agenda.contentType', | ||
'createdAt', | ||
]) | ||
assert.deepEqual(pipeline, [ | ||
{ | ||
$unset: [ | ||
'fullDocument.documents.agenda.parsedText', | ||
'updateDescription.updatedFields.documents.agenda.parsedText', | ||
'fullDocument.documents.agenda.contentType', | ||
'updateDescription.updatedFields.documents.agenda.contentType', | ||
'fullDocument.createdAt', | ||
'updateDescription.updatedFields.createdAt', | ||
], | ||
}, | ||
{ | ||
$set: { | ||
'updateDescription.updatedFields': { | ||
$arrayToObject: { | ||
$filter: { | ||
input: { $objectToArray: '$updateDescription.updatedFields' }, | ||
cond: { | ||
$regexMatch: { | ||
input: '$$this.k', | ||
regex: | ||
'^(?!documents\\.agenda\\.parsedText|documents\\.agenda\\.contentType)', | ||
test('should generate pipeline from omit with dotted fields', () => { | ||
const pipeline = generatePipelineFromOmit([ | ||
'documents.agenda.parsedText', | ||
'documents.agenda.contentType', | ||
'createdAt', | ||
]) | ||
assert.deepEqual(pipeline, [ | ||
{ | ||
$unset: [ | ||
'fullDocument.documents.agenda.parsedText', | ||
'updateDescription.updatedFields.documents.agenda.parsedText', | ||
'fullDocument.documents.agenda.contentType', | ||
'updateDescription.updatedFields.documents.agenda.contentType', | ||
'fullDocument.createdAt', | ||
'updateDescription.updatedFields.createdAt', | ||
], | ||
}, | ||
{ | ||
$set: { | ||
'updateDescription.updatedFields': { | ||
$arrayToObject: { | ||
$filter: { | ||
input: { $objectToArray: '$updateDescription.updatedFields' }, | ||
cond: { | ||
$regexMatch: { | ||
input: '$$this.k', | ||
regex: | ||
'^(?!documents\\.agenda\\.parsedText|documents\\.agenda\\.contentType)', | ||
}, | ||
}, | ||
@@ -53,4 +55,4 @@ }, | ||
}, | ||
}, | ||
]) | ||
]) | ||
}) | ||
}) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
2386
97007
21
+ Addedobj-walker@2.4.0(transitive)
+ Addedprom-utils@0.8.0(transitive)
+ Addedsimple-machines@0.4.0(transitive)
- Removedasync-wait-until@2.0.12(transitive)
- Removedbase64-js@1.5.1(transitive)
- Removedbuffer@5.7.1(transitive)
- Removedieee754@1.2.1(transitive)
- Removedobj-walker@1.10.0(transitive)
- Removedobject-sizeof@1.6.3(transitive)
- Removedprom-utils@0.5.0(transitive)
- Removedsimple-machines@0.3.0(transitive)
Updatedobj-walker@^2.2.0
Updatedprom-utils@^0.8.0
Updatedsimple-machines@^0.4.0