mongochangestream
Advanced tools
Comparing version 0.47.0 to 0.48.0
@@ -0,1 +1,7 @@ | ||
# 0.48.0 | ||
- Fixed bug where `processChangeStream` exits prematurely. | ||
- Fixed bug when omitting an updated nested field. | ||
- Omit fields from `updateDescription.removedFields` to prevent downstream issues. | ||
# 0.47.0 | ||
@@ -2,0 +8,0 @@ |
@@ -14,2 +14,3 @@ "use strict"; | ||
const simple_machines_1 = require("simple-machines"); | ||
const safelyCheckNext_js_1 = require("./safelyCheckNext.js"); | ||
const util_js_1 = require("./util.js"); | ||
@@ -182,3 +183,3 @@ const debug = (0, debug_1.default)('mongochangestream'); | ||
const ns = { db: collection.dbName, coll: collection.collectionName }; | ||
const nextChecker = (0, util_js_1.safelyCheckNext)(cursor); | ||
const nextChecker = (0, safelyCheckNext_js_1.safelyCheckNext)(cursor); | ||
let doc; | ||
@@ -308,6 +309,7 @@ // Process documents | ||
state.change('started'); | ||
const nextChecker = (0, util_js_1.safelyCheckNext)(changeStream); | ||
const nextChecker = (0, safelyCheckNext_js_1.safelyCheckNext)(changeStream); | ||
let event; | ||
// Consume change stream | ||
while ((event = await nextChecker.getNext())) { | ||
while (await nextChecker.hasNext()) { | ||
event = await changeStream.next(); | ||
debug('Change stream event %O', event); | ||
@@ -322,3 +324,3 @@ // Skip the event if the operation type is not one we care about | ||
if (event.operationType === 'update' && omit) { | ||
event = (0, util_js_1.omitFieldForUpdate)(omit)(event); | ||
(0, util_js_1.omitFieldsForUpdate)(omit, event); | ||
} | ||
@@ -325,0 +327,0 @@ await queue.enqueue(event); |
@@ -1,8 +0,22 @@ | ||
import _ from 'lodash/fp.js'; | ||
import { type Collection } from 'mongodb'; | ||
import type { Cursor, CursorError, JSONSchema } from './types.js'; | ||
import { type ChangeStreamUpdateDocument, type Collection } from 'mongodb'; | ||
import type { CursorError, JSONSchema } from './types.js'; | ||
export declare const setDefaults: (keys: string[], val: any) => Record<string, any>; | ||
export declare const generatePipelineFromOmit: (omit: string[]) => any[]; | ||
export declare const omitFields: (omitPaths: string[]) => _.LodashOmitBy1x1<unknown>; | ||
export declare const omitFieldForUpdate: (omitPaths: string[]) => _.LodashUpdate1x3; | ||
export declare const generatePipelineFromOmit: (omit: string[]) => { | ||
$unset: string[]; | ||
}[]; | ||
/** | ||
* Dotted path updates like { $set: {'a.b.c': 'foo'} } result in the following: | ||
* ```ts | ||
* { | ||
* updatedDescription: { | ||
* updateFields: { | ||
* 'a.b.c': 'foo' | ||
* } | ||
* } | ||
* } | ||
* ``` | ||
* Therefore, to remove 'a.b' we have to walk the `updateFields` object | ||
* and unset the omitted paths. | ||
*/ | ||
export declare const omitFieldsForUpdate: (omittedPaths: string[], event: ChangeStreamUpdateDocument) => void; | ||
export declare const getCollectionKey: (collection: Collection) => string; | ||
@@ -16,11 +30,2 @@ export declare const traverseSchema: (x: JSONSchema) => any; | ||
/** | ||
* Get next record without throwing an exception. | ||
* Get the last error safely via `getLastError`. | ||
*/ | ||
export declare const safelyCheckNext: (cursor: Cursor) => { | ||
getNext: () => Promise<any>; | ||
errorExists: () => boolean; | ||
getLastError: () => unknown; | ||
}; | ||
/** | ||
* Check if error message indicates a missing or invalid oplog entry. | ||
@@ -27,0 +32,0 @@ */ |
"use strict"; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.delayed = exports.missingOplogEntry = exports.safelyCheckNext = exports.when = exports.removeUnusedFields = exports.traverseSchema = exports.getCollectionKey = exports.omitFieldForUpdate = exports.omitFields = exports.generatePipelineFromOmit = exports.setDefaults = void 0; | ||
const debug_1 = __importDefault(require("debug")); | ||
const fp_js_1 = __importDefault(require("lodash/fp.js")); | ||
exports.delayed = exports.missingOplogEntry = exports.when = exports.removeUnusedFields = exports.traverseSchema = exports.getCollectionKey = exports.omitFieldsForUpdate = exports.generatePipelineFromOmit = exports.setDefaults = void 0; | ||
const lodash_1 = require("lodash"); | ||
const mongodb_1 = require("mongodb"); | ||
const obj_walker_1 = require("obj-walker"); | ||
const debug = (0, debug_1.default)('mongochangestream'); | ||
const setDefaults = (keys, val) => { | ||
@@ -20,2 +15,10 @@ const obj = {}; | ||
exports.setDefaults = setDefaults; | ||
const generatePipelineFromOmit = (omit) => { | ||
const fields = omit.flatMap((field) => [ | ||
`fullDocument.${field}`, | ||
`updateDescription.updatedFields.${field}`, | ||
]); | ||
return [{ $unset: fields }]; | ||
}; | ||
exports.generatePipelineFromOmit = generatePipelineFromOmit; | ||
/** | ||
@@ -32,45 +35,21 @@ * Dotted path updates like { $set: {'a.b.c': 'foo'} } result in the following: | ||
* ``` | ||
* Therefore, to remove 'a.b' we have to convert the `updateFields` | ||
* object to an array, filter the array with a regex, and convert | ||
* the array back to an object. | ||
* Therefore, to remove 'a.b' we have to walk the `updateFields` object | ||
* and unset the omitted paths. | ||
*/ | ||
const removeDottedPaths = (omit) => { | ||
const dottedFields = omit | ||
.filter((x) => x.includes('.')) | ||
// Escape periods | ||
.map((x) => x.replaceAll('.', '\\.')); | ||
if (dottedFields.length) { | ||
return { | ||
$set: { | ||
'updateDescription.updatedFields': { | ||
$arrayToObject: { | ||
$filter: { | ||
input: { $objectToArray: '$updateDescription.updatedFields' }, | ||
cond: { | ||
$regexMatch: { | ||
input: '$$this.k', | ||
regex: `^(?!${dottedFields.join('|')})`, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}; | ||
const omitFieldsForUpdate = (omittedPaths, event) => { | ||
const shouldOmit = (path) => omittedPaths.find((omittedPath) => path === omittedPath || path.startsWith(`${omittedPath}.`)); | ||
if (event.updateDescription.updatedFields) { | ||
(0, obj_walker_1.map)(event.updateDescription.updatedFields, (node) => { | ||
const fullPath = node.path.join('.'); | ||
if (!shouldOmit(fullPath)) { | ||
return node.val; | ||
} | ||
}, { modifyInPlace: true }); | ||
} | ||
if (event.updateDescription.removedFields) { | ||
const removedFields = event.updateDescription.removedFields.filter((removedPath) => !shouldOmit(removedPath)); | ||
(0, lodash_1.set)(event, 'updateDescription.removedFields', removedFields); | ||
} | ||
}; | ||
const generatePipelineFromOmit = (omit) => { | ||
const fields = omit.flatMap((field) => [ | ||
`fullDocument.${field}`, | ||
`updateDescription.updatedFields.${field}`, | ||
]); | ||
const dottedPathsStage = removeDottedPaths(omit); | ||
const pipeline = [{ $unset: fields }]; | ||
return dottedPathsStage ? pipeline.concat([dottedPathsStage]) : pipeline; | ||
}; | ||
exports.generatePipelineFromOmit = generatePipelineFromOmit; | ||
const omitFields = (omitPaths) => fp_js_1.default.omitBy((_val, key) => fp_js_1.default.find((omitPath) => fp_js_1.default.startsWith(`${omitPath}.`, key), omitPaths)); | ||
exports.omitFields = omitFields; | ||
const omitFieldForUpdate = (omitPaths) => fp_js_1.default.update('updateDescription.updatedFields', (0, exports.omitFields)(omitPaths)); | ||
exports.omitFieldForUpdate = omitFieldForUpdate; | ||
exports.omitFieldsForUpdate = omitFieldsForUpdate; | ||
const getCollectionKey = (collection) => `${collection.dbName}:${collection.collectionName}`; | ||
@@ -110,24 +89,2 @@ exports.getCollectionKey = getCollectionKey; | ||
exports.when = when; | ||
/** | ||
* Get next record without throwing an exception. | ||
* Get the last error safely via `getLastError`. | ||
*/ | ||
const safelyCheckNext = (cursor) => { | ||
let lastError; | ||
const getNext = async () => { | ||
debug('safelyCheckNext called'); | ||
try { | ||
return await cursor.tryNext(); | ||
} | ||
catch (e) { | ||
debug('safelyCheckNext error: %o', e); | ||
lastError = e; | ||
return null; | ||
} | ||
}; | ||
const errorExists = () => Boolean(lastError); | ||
const getLastError = () => lastError; | ||
return { getNext, errorExists, getLastError }; | ||
}; | ||
exports.safelyCheckNext = safelyCheckNext; | ||
const oplogErrorCodeNames = [ | ||
@@ -134,0 +91,0 @@ 'ChangeStreamHistoryLost', |
{ | ||
"name": "mongochangestream", | ||
"version": "0.47.0", | ||
"version": "0.48.0", | ||
"description": "Sync MongoDB collections via change streams into any database.", | ||
@@ -20,3 +20,3 @@ "author": "GovSpend", | ||
"test": "node --env-file=.env --test --test-force-exit", | ||
"test:only": "DEBUG=* node --env-file=.env --test --test-only --test-force-exit" | ||
"test:only": "DEBUG=* DEBUG_DEPTH=10 node --env-file=.env --test --test-only --test-force-exit" | ||
}, | ||
@@ -23,0 +23,0 @@ "keywords": [ |
@@ -53,2 +53,3 @@ /** | ||
name: faker.person.fullName(), | ||
likes: [faker.animal.dog(), faker.animal.cat()], | ||
address: { | ||
@@ -73,2 +74,8 @@ city: faker.location.city(), | ||
name: { bsonType: 'string' }, | ||
likes: { | ||
bsonType: 'array', | ||
items: { | ||
bsonType: 'string', | ||
}, | ||
}, | ||
address: { | ||
@@ -439,3 +446,3 @@ bsonType: 'object', | ||
}) | ||
const processed = [] | ||
const processed: any[] = [] | ||
const processRecords = async (docs: ChangeStreamDocument[]) => { | ||
@@ -452,3 +459,10 @@ for (const doc of docs) { | ||
// Update records | ||
coll.updateMany({}, { $set: { createdAt: new Date('2022-01-01') } }) | ||
coll.updateMany( | ||
{}, | ||
{ | ||
$set: { createdAt: new Date('2022-01-01') }, | ||
$unset: { 'address.city': '', 'address.geo.lat': '' }, | ||
$pop: { likes: 1 }, | ||
} | ||
) | ||
// Wait for the change stream events to be processed | ||
@@ -490,4 +504,6 @@ await setTimeout(ms('10s')) | ||
'address.geo.lat': 24, | ||
'address.geo.long': 25, | ||
}, | ||
$unset: { | ||
'address.geo.long': '', | ||
}, | ||
} | ||
@@ -500,3 +516,3 @@ ) | ||
assert.equal(documents[0].fullDocument.address.geo, undefined) | ||
const fields = ['address.city', 'address.geo.lat', 'address.geo.long'] | ||
const fields = ['address.city', 'address.geo.lat'] | ||
for (const field of fields) { | ||
@@ -508,2 +524,3 @@ assert.equal( | ||
} | ||
assert.deepEqual(documents[0].updateDescription.removedFields, []) | ||
// Stop | ||
@@ -513,2 +530,47 @@ await changeStream.stop() | ||
test('should omit fields from change stream - nested dotted path', async () => { | ||
const { coll, db } = await getConns() | ||
// address.geo is a path prefix relative to the paths being updated below | ||
const sync = await getSync({ omit: ['address.geo.lat'] }) | ||
await initState(sync, db, coll) | ||
const documents: Document[] = [] | ||
const processRecords = async (docs: ChangeStreamDocument[]) => { | ||
for (const doc of docs) { | ||
await setTimeout(5) | ||
if (doc.operationType === 'update' && doc.fullDocument) { | ||
documents.push(doc) | ||
} | ||
} | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
// Start | ||
changeStream.start() | ||
await setTimeout(ms('1s')) | ||
// Update record | ||
coll.updateMany({}, [ | ||
{ | ||
$set: { | ||
'address.geo.lat': 24, | ||
'address.geo.long': 25, | ||
}, | ||
}, | ||
]) | ||
// Wait for the change stream events to be processed | ||
await setTimeout(ms('2s')) | ||
// Assertions | ||
assert.equal(documents[0].fullDocument.address.geo.long, 25) | ||
assert.equal(documents[0].fullDocument.address.geo.lat, undefined) | ||
assert.equal( | ||
documents[0].updateDescription.updatedFields['address.geo'].long, | ||
25 | ||
) | ||
assert.equal( | ||
documents[0].updateDescription.updatedFields['address.geo'].lat, | ||
undefined | ||
) | ||
// Stop | ||
await changeStream.stop() | ||
}) | ||
test('should omit fields from change stream - object', async () => { | ||
@@ -585,3 +647,3 @@ const { coll, db } = await getConns() | ||
test('change stream should resume properly', async () => { | ||
test('change stream should resume after being stopped', async () => { | ||
const { coll, db } = await getConns() | ||
@@ -614,3 +676,3 @@ const sync = await getSync() | ||
// Wait for all documents to be processed | ||
await setTimeout(ms('8s')) | ||
await setTimeout(ms('5s')) | ||
// All change stream docs were processed | ||
@@ -621,2 +683,36 @@ assert.equal(processed.length, numDocs) | ||
test('change stream should resume after pause in events', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
let processed = [] | ||
// Change stream | ||
const processRecords = async (docs: ChangeStreamDocument[]) => { | ||
for (const doc of docs) { | ||
await setTimeout(8) | ||
processed.push(doc) | ||
} | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
changeStream.start() | ||
// Let change stream connect | ||
await setTimeout(ms('1s')) | ||
// Change all documents | ||
coll.updateMany({}, { $set: { createdAt: new Date('2022-01-02') } }) | ||
// Wait for all documents to be processed | ||
await setTimeout(ms('6s')) | ||
// All change stream docs were processed | ||
assert.equal(processed.length, numDocs) | ||
// Reset processed | ||
processed = [] | ||
// Change all documents | ||
coll.updateMany({}, { $set: { createdAt: new Date('2022-01-03') } }) | ||
// Wait for all documents to be processed | ||
await setTimeout(ms('6s')) | ||
// All change stream docs were processed | ||
assert.equal(processed.length, numDocs) | ||
await changeStream.stop() | ||
}) | ||
test('change stream handle missing oplog entry properly', async () => { | ||
@@ -623,0 +719,0 @@ const { coll, db, redis } = await getConns() |
@@ -19,2 +19,3 @@ import _debug from 'debug' | ||
import { safelyCheckNext } from './safelyCheckNext.js' | ||
import { | ||
@@ -36,5 +37,4 @@ ChangeOptions, | ||
getCollectionKey, | ||
omitFieldForUpdate, | ||
omitFieldsForUpdate, | ||
removeUnusedFields, | ||
safelyCheckNext, | ||
setDefaults, | ||
@@ -381,3 +381,4 @@ when, | ||
// Consume change stream | ||
while ((event = await nextChecker.getNext())) { | ||
while (await nextChecker.hasNext()) { | ||
event = await changeStream.next() | ||
debug('Change stream event %O', event) | ||
@@ -392,3 +393,3 @@ // Skip the event if the operation type is not one we care about | ||
if (event.operationType === 'update' && omit) { | ||
event = omitFieldForUpdate(omit)(event) as ChangeStreamDocument | ||
omitFieldsForUpdate(omit, event) | ||
} | ||
@@ -395,0 +396,0 @@ await queue.enqueue(event) |
import assert from 'node:assert' | ||
import { describe, test } from 'node:test' | ||
import { generatePipelineFromOmit, removeUnusedFields } from './util.js' | ||
import { | ||
generatePipelineFromOmit, | ||
omitFieldsForUpdate, | ||
removeUnusedFields, | ||
} from './util.js' | ||
@@ -21,40 +25,2 @@ describe('util', () => { | ||
}) | ||
test('should generate pipeline from omit with dotted fields', () => { | ||
const pipeline = generatePipelineFromOmit([ | ||
'documents.agenda.parsedText', | ||
'documents.agenda.contentType', | ||
'createdAt', | ||
]) | ||
assert.deepEqual(pipeline, [ | ||
{ | ||
$unset: [ | ||
'fullDocument.documents.agenda.parsedText', | ||
'updateDescription.updatedFields.documents.agenda.parsedText', | ||
'fullDocument.documents.agenda.contentType', | ||
'updateDescription.updatedFields.documents.agenda.contentType', | ||
'fullDocument.createdAt', | ||
'updateDescription.updatedFields.createdAt', | ||
], | ||
}, | ||
{ | ||
$set: { | ||
'updateDescription.updatedFields': { | ||
$arrayToObject: { | ||
$filter: { | ||
input: { $objectToArray: '$updateDescription.updatedFields' }, | ||
cond: { | ||
$regexMatch: { | ||
input: '$$this.k', | ||
regex: | ||
'^(?!documents\\.agenda\\.parsedText|documents\\.agenda\\.contentType)', | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
]) | ||
}) | ||
}) | ||
@@ -125,2 +91,110 @@ describe('removeUnusedFields', () => { | ||
}) | ||
describe('omitFieldsForUpdate', () => { | ||
test('should remove omitted fields from removedFields - exact', () => { | ||
const event: any = { | ||
updateDescription: { | ||
updatedFields: {}, | ||
removedFields: ['address.geo.long'], | ||
truncatedArrays: [], | ||
}, | ||
} | ||
const expected = { | ||
updateDescription: { | ||
updatedFields: {}, | ||
removedFields: [], | ||
truncatedArrays: [], | ||
}, | ||
} | ||
omitFieldsForUpdate(['address.geo.long'], event) | ||
assert.deepEqual(event, expected) | ||
}) | ||
test('should remove omitted fields from removedFields - prefix', () => { | ||
const event: any = { | ||
updateDescription: { | ||
updatedFields: {}, | ||
removedFields: ['address.geo.long'], | ||
truncatedArrays: [], | ||
}, | ||
} | ||
const expected = { | ||
updateDescription: { | ||
updatedFields: {}, | ||
removedFields: [], | ||
truncatedArrays: [], | ||
}, | ||
} | ||
omitFieldsForUpdate(['address.geo'], event) | ||
assert.deepEqual(event, expected) | ||
}) | ||
test('should remove omitted fields from updatedFields - exact', () => { | ||
const event: any = { | ||
updateDescription: { | ||
updatedFields: { | ||
name: 'unknown', | ||
'address.city': 'San Diego', | ||
}, | ||
removedFields: [], | ||
truncatedArrays: [], | ||
}, | ||
} | ||
const expected = { | ||
updateDescription: { | ||
updatedFields: { | ||
name: 'unknown', | ||
}, | ||
removedFields: [], | ||
truncatedArrays: [], | ||
}, | ||
} | ||
omitFieldsForUpdate(['address.city'], event) | ||
assert.deepEqual(event, expected) | ||
}) | ||
test('should remove omitted fields from updatedFields - prefix', () => { | ||
const event: any = { | ||
updateDescription: { | ||
updatedFields: { | ||
name: 'unknown', | ||
'address.geo.lat': 24, | ||
}, | ||
removedFields: [], | ||
truncatedArrays: [], | ||
}, | ||
} | ||
const expected = { | ||
updateDescription: { | ||
updatedFields: { | ||
name: 'unknown', | ||
}, | ||
removedFields: [], | ||
truncatedArrays: [], | ||
}, | ||
} | ||
omitFieldsForUpdate(['address.geo'], event) | ||
assert.deepEqual(event, expected) | ||
}) | ||
test('should remove omitted fields from updatedFields - nested', () => { | ||
const event: any = { | ||
updateDescription: { | ||
updatedFields: { | ||
name: 'unknown', | ||
'address.geo': { lat: 24, long: 25 }, | ||
}, | ||
removedFields: [], | ||
truncatedArrays: [], | ||
}, | ||
} | ||
const expected = { | ||
updateDescription: { | ||
updatedFields: { | ||
name: 'unknown', | ||
'address.geo': { long: 25 }, | ||
}, | ||
removedFields: [], | ||
truncatedArrays: [], | ||
}, | ||
} | ||
omitFieldsForUpdate(['address.geo.lat'], event) | ||
assert.deepEqual(event, expected) | ||
}) | ||
}) | ||
}) |
118
src/util.ts
@@ -1,10 +0,11 @@ | ||
import _debug from 'debug' | ||
import _ from 'lodash/fp.js' | ||
import { type Collection, MongoServerError } from 'mongodb' | ||
import { type Node, walkEach } from 'obj-walker' | ||
import { set } from 'lodash' | ||
import { | ||
type ChangeStreamUpdateDocument, | ||
type Collection, | ||
MongoServerError, | ||
} from 'mongodb' | ||
import { map, type Node, walkEach } from 'obj-walker' | ||
import type { Cursor, CursorError, JSONSchema } from './types.js' | ||
import type { CursorError, JSONSchema } from './types.js' | ||
const debug = _debug('mongochangestream') | ||
export const setDefaults = (keys: string[], val: any) => { | ||
@@ -18,2 +19,10 @@ const obj: Record<string, any> = {} | ||
export const generatePipelineFromOmit = (omit: string[]) => { | ||
const fields = omit.flatMap((field) => [ | ||
`fullDocument.${field}`, | ||
`updateDescription.updatedFields.${field}`, | ||
]) | ||
return [{ $unset: fields }] | ||
} | ||
/** | ||
@@ -30,50 +39,35 @@ * Dotted path updates like { $set: {'a.b.c': 'foo'} } result in the following: | ||
* ``` | ||
* Therefore, to remove 'a.b' we have to convert the `updateFields` | ||
* object to an array, filter the array with a regex, and convert | ||
* the array back to an object. | ||
* Therefore, to remove 'a.b' we have to walk the `updateFields` object | ||
* and unset the omitted paths. | ||
*/ | ||
const removeDottedPaths = (omit: string[]) => { | ||
const dottedFields = omit | ||
.filter((x) => x.includes('.')) | ||
// Escape periods | ||
.map((x) => x.replaceAll('.', '\\.')) | ||
if (dottedFields.length) { | ||
return { | ||
$set: { | ||
'updateDescription.updatedFields': { | ||
$arrayToObject: { | ||
$filter: { | ||
input: { $objectToArray: '$updateDescription.updatedFields' }, | ||
cond: { | ||
$regexMatch: { | ||
input: '$$this.k', | ||
regex: `^(?!${dottedFields.join('|')})`, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
export const omitFieldsForUpdate = ( | ||
omittedPaths: string[], | ||
event: ChangeStreamUpdateDocument | ||
) => { | ||
const shouldOmit = (path: string) => | ||
omittedPaths.find( | ||
(omittedPath) => | ||
path === omittedPath || path.startsWith(`${omittedPath}.`) | ||
) | ||
if (event.updateDescription.updatedFields) { | ||
map( | ||
event.updateDescription.updatedFields, | ||
(node) => { | ||
const fullPath = node.path.join('.') | ||
if (!shouldOmit(fullPath)) { | ||
return node.val | ||
} | ||
}, | ||
} | ||
{ modifyInPlace: true } | ||
) | ||
} | ||
if (event.updateDescription.removedFields) { | ||
const removedFields = event.updateDescription.removedFields.filter( | ||
(removedPath) => !shouldOmit(removedPath) | ||
) | ||
set(event, 'updateDescription.removedFields', removedFields) | ||
} | ||
} | ||
export const generatePipelineFromOmit = (omit: string[]) => { | ||
const fields = omit.flatMap((field) => [ | ||
`fullDocument.${field}`, | ||
`updateDescription.updatedFields.${field}`, | ||
]) | ||
const dottedPathsStage = removeDottedPaths(omit) | ||
const pipeline: any[] = [{ $unset: fields }] | ||
return dottedPathsStage ? pipeline.concat([dottedPathsStage]) : pipeline | ||
} | ||
export const omitFields = (omitPaths: string[]) => | ||
_.omitBy((_val, key) => | ||
_.find((omitPath) => _.startsWith(`${omitPath}.`, key), omitPaths) | ||
) | ||
export const omitFieldForUpdate = (omitPaths: string[]) => | ||
_.update('updateDescription.updatedFields', omitFields(omitPaths)) | ||
export const getCollectionKey = (collection: Collection) => | ||
@@ -116,26 +110,2 @@ `${collection.dbName}:${collection.collectionName}` | ||
/** | ||
* Get next record without throwing an exception. | ||
* Get the last error safely via `getLastError`. | ||
*/ | ||
export const safelyCheckNext = (cursor: Cursor) => { | ||
let lastError: unknown | ||
const getNext = async () => { | ||
debug('safelyCheckNext called') | ||
try { | ||
return await cursor.tryNext() | ||
} catch (e) { | ||
debug('safelyCheckNext error: %o', e) | ||
lastError = e | ||
return null | ||
} | ||
} | ||
const errorExists = () => Boolean(lastError) | ||
const getLastError = () => lastError | ||
return { getNext, errorExists, getLastError } | ||
} | ||
const oplogErrorCodeNames = [ | ||
@@ -142,0 +112,0 @@ 'ChangeStreamHistoryLost', |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
108050
24
2776