Socket
Socket
Sign inDemoInstall

mongochangestream

Package Overview
Dependencies
Maintainers
0
Versions
56
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongochangestream - npm Package Compare versions

Comparing version 0.47.0 to 0.48.0

dist/safelyCheckNext.d.ts

6

CHANGELOG.md

@@ -0,1 +1,7 @@

# 0.48.0
- Fixed bug where `processChangeStream` exits prematurely.
- Fixed bug when omitting an updated nested field.
- Omit fields from `updateDescription.removedFields` to prevent downstream issues.
# 0.47.0

@@ -2,0 +8,0 @@

10

dist/mongoChangeStream.js

@@ -14,2 +14,3 @@ "use strict";

const simple_machines_1 = require("simple-machines");
const safelyCheckNext_js_1 = require("./safelyCheckNext.js");
const util_js_1 = require("./util.js");

@@ -182,3 +183,3 @@ const debug = (0, debug_1.default)('mongochangestream');

const ns = { db: collection.dbName, coll: collection.collectionName };
const nextChecker = (0, util_js_1.safelyCheckNext)(cursor);
const nextChecker = (0, safelyCheckNext_js_1.safelyCheckNext)(cursor);
let doc;

@@ -308,6 +309,7 @@ // Process documents

state.change('started');
const nextChecker = (0, util_js_1.safelyCheckNext)(changeStream);
const nextChecker = (0, safelyCheckNext_js_1.safelyCheckNext)(changeStream);
let event;
// Consume change stream
while ((event = await nextChecker.getNext())) {
while (await nextChecker.hasNext()) {
event = await changeStream.next();
debug('Change stream event %O', event);

@@ -322,3 +324,3 @@ // Skip the event if the operation type is not one we care about

if (event.operationType === 'update' && omit) {
event = (0, util_js_1.omitFieldForUpdate)(omit)(event);
(0, util_js_1.omitFieldsForUpdate)(omit, event);
}

@@ -325,0 +327,0 @@ await queue.enqueue(event);

35

dist/util.d.ts

@@ -1,8 +0,22 @@

import _ from 'lodash/fp.js';
import { type Collection } from 'mongodb';
import type { Cursor, CursorError, JSONSchema } from './types.js';
import { type ChangeStreamUpdateDocument, type Collection } from 'mongodb';
import type { CursorError, JSONSchema } from './types.js';
export declare const setDefaults: (keys: string[], val: any) => Record<string, any>;
export declare const generatePipelineFromOmit: (omit: string[]) => any[];
export declare const omitFields: (omitPaths: string[]) => _.LodashOmitBy1x1<unknown>;
export declare const omitFieldForUpdate: (omitPaths: string[]) => _.LodashUpdate1x3;
export declare const generatePipelineFromOmit: (omit: string[]) => {
$unset: string[];
}[];
/**
* Dotted path updates like { $set: {'a.b.c': 'foo'} } result in the following:
* ```ts
* {
* updatedDescription: {
* updateFields: {
* 'a.b.c': 'foo'
* }
* }
* }
* ```
* Therefore, to remove 'a.b' we have to walk the `updateFields` object
* and unset the omitted paths.
*/
export declare const omitFieldsForUpdate: (omittedPaths: string[], event: ChangeStreamUpdateDocument) => void;
export declare const getCollectionKey: (collection: Collection) => string;

@@ -16,11 +30,2 @@ export declare const traverseSchema: (x: JSONSchema) => any;

/**
* Get next record without throwing an exception.
* Get the last error safely via `getLastError`.
*/
export declare const safelyCheckNext: (cursor: Cursor) => {
getNext: () => Promise<any>;
errorExists: () => boolean;
getLastError: () => unknown;
};
/**
* Check if error message indicates a missing or invalid oplog entry.

@@ -27,0 +32,0 @@ */

"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.delayed = exports.missingOplogEntry = exports.safelyCheckNext = exports.when = exports.removeUnusedFields = exports.traverseSchema = exports.getCollectionKey = exports.omitFieldForUpdate = exports.omitFields = exports.generatePipelineFromOmit = exports.setDefaults = void 0;
const debug_1 = __importDefault(require("debug"));
const fp_js_1 = __importDefault(require("lodash/fp.js"));
exports.delayed = exports.missingOplogEntry = exports.when = exports.removeUnusedFields = exports.traverseSchema = exports.getCollectionKey = exports.omitFieldsForUpdate = exports.generatePipelineFromOmit = exports.setDefaults = void 0;
const lodash_1 = require("lodash");
const mongodb_1 = require("mongodb");
const obj_walker_1 = require("obj-walker");
const debug = (0, debug_1.default)('mongochangestream');
const setDefaults = (keys, val) => {

@@ -20,2 +15,10 @@ const obj = {};

exports.setDefaults = setDefaults;
const generatePipelineFromOmit = (omit) => {
const fields = omit.flatMap((field) => [
`fullDocument.${field}`,
`updateDescription.updatedFields.${field}`,
]);
return [{ $unset: fields }];
};
exports.generatePipelineFromOmit = generatePipelineFromOmit;
/**

@@ -32,45 +35,21 @@ * Dotted path updates like { $set: {'a.b.c': 'foo'} } result in the following:

* ```
* Therefore, to remove 'a.b' we have to convert the `updateFields`
* object to an array, filter the array with a regex, and convert
* the array back to an object.
* Therefore, to remove 'a.b' we have to walk the `updateFields` object
* and unset the omitted paths.
*/
const removeDottedPaths = (omit) => {
const dottedFields = omit
.filter((x) => x.includes('.'))
// Escape periods
.map((x) => x.replaceAll('.', '\\.'));
if (dottedFields.length) {
return {
$set: {
'updateDescription.updatedFields': {
$arrayToObject: {
$filter: {
input: { $objectToArray: '$updateDescription.updatedFields' },
cond: {
$regexMatch: {
input: '$$this.k',
regex: `^(?!${dottedFields.join('|')})`,
},
},
},
},
},
},
};
const omitFieldsForUpdate = (omittedPaths, event) => {
const shouldOmit = (path) => omittedPaths.find((omittedPath) => path === omittedPath || path.startsWith(`${omittedPath}.`));
if (event.updateDescription.updatedFields) {
(0, obj_walker_1.map)(event.updateDescription.updatedFields, (node) => {
const fullPath = node.path.join('.');
if (!shouldOmit(fullPath)) {
return node.val;
}
}, { modifyInPlace: true });
}
if (event.updateDescription.removedFields) {
const removedFields = event.updateDescription.removedFields.filter((removedPath) => !shouldOmit(removedPath));
(0, lodash_1.set)(event, 'updateDescription.removedFields', removedFields);
}
};
const generatePipelineFromOmit = (omit) => {
const fields = omit.flatMap((field) => [
`fullDocument.${field}`,
`updateDescription.updatedFields.${field}`,
]);
const dottedPathsStage = removeDottedPaths(omit);
const pipeline = [{ $unset: fields }];
return dottedPathsStage ? pipeline.concat([dottedPathsStage]) : pipeline;
};
exports.generatePipelineFromOmit = generatePipelineFromOmit;
const omitFields = (omitPaths) => fp_js_1.default.omitBy((_val, key) => fp_js_1.default.find((omitPath) => fp_js_1.default.startsWith(`${omitPath}.`, key), omitPaths));
exports.omitFields = omitFields;
const omitFieldForUpdate = (omitPaths) => fp_js_1.default.update('updateDescription.updatedFields', (0, exports.omitFields)(omitPaths));
exports.omitFieldForUpdate = omitFieldForUpdate;
exports.omitFieldsForUpdate = omitFieldsForUpdate;
const getCollectionKey = (collection) => `${collection.dbName}:${collection.collectionName}`;

@@ -110,24 +89,2 @@ exports.getCollectionKey = getCollectionKey;

exports.when = when;
/**
* Get next record without throwing an exception.
* Get the last error safely via `getLastError`.
*/
const safelyCheckNext = (cursor) => {
let lastError;
const getNext = async () => {
debug('safelyCheckNext called');
try {
return await cursor.tryNext();
}
catch (e) {
debug('safelyCheckNext error: %o', e);
lastError = e;
return null;
}
};
const errorExists = () => Boolean(lastError);
const getLastError = () => lastError;
return { getNext, errorExists, getLastError };
};
exports.safelyCheckNext = safelyCheckNext;
const oplogErrorCodeNames = [

@@ -134,0 +91,0 @@ 'ChangeStreamHistoryLost',

{
"name": "mongochangestream",
"version": "0.47.0",
"version": "0.48.0",
"description": "Sync MongoDB collections via change streams into any database.",

@@ -20,3 +20,3 @@ "author": "GovSpend",

"test": "node --env-file=.env --test --test-force-exit",
"test:only": "DEBUG=* node --env-file=.env --test --test-only --test-force-exit"
"test:only": "DEBUG=* DEBUG_DEPTH=10 node --env-file=.env --test --test-only --test-force-exit"
},

@@ -23,0 +23,0 @@ "keywords": [

@@ -53,2 +53,3 @@ /**

name: faker.person.fullName(),
likes: [faker.animal.dog(), faker.animal.cat()],
address: {

@@ -73,2 +74,8 @@ city: faker.location.city(),

name: { bsonType: 'string' },
likes: {
bsonType: 'array',
items: {
bsonType: 'string',
},
},
address: {

@@ -439,3 +446,3 @@ bsonType: 'object',

})
const processed = []
const processed: any[] = []
const processRecords = async (docs: ChangeStreamDocument[]) => {

@@ -452,3 +459,10 @@ for (const doc of docs) {

// Update records
coll.updateMany({}, { $set: { createdAt: new Date('2022-01-01') } })
coll.updateMany(
{},
{
$set: { createdAt: new Date('2022-01-01') },
$unset: { 'address.city': '', 'address.geo.lat': '' },
$pop: { likes: 1 },
}
)
// Wait for the change stream events to be processed

@@ -490,4 +504,6 @@ await setTimeout(ms('10s'))

'address.geo.lat': 24,
'address.geo.long': 25,
},
$unset: {
'address.geo.long': '',
},
}

@@ -500,3 +516,3 @@ )

assert.equal(documents[0].fullDocument.address.geo, undefined)
const fields = ['address.city', 'address.geo.lat', 'address.geo.long']
const fields = ['address.city', 'address.geo.lat']
for (const field of fields) {

@@ -508,2 +524,3 @@ assert.equal(

}
assert.deepEqual(documents[0].updateDescription.removedFields, [])
// Stop

@@ -513,2 +530,47 @@ await changeStream.stop()

test('should omit fields from change stream - nested dotted path', async () => {
const { coll, db } = await getConns()
// address.geo is a path prefix relative to the paths being updated below
const sync = await getSync({ omit: ['address.geo.lat'] })
await initState(sync, db, coll)
const documents: Document[] = []
const processRecords = async (docs: ChangeStreamDocument[]) => {
for (const doc of docs) {
await setTimeout(5)
if (doc.operationType === 'update' && doc.fullDocument) {
documents.push(doc)
}
}
}
const changeStream = await sync.processChangeStream(processRecords)
// Start
changeStream.start()
await setTimeout(ms('1s'))
// Update record
coll.updateMany({}, [
{
$set: {
'address.geo.lat': 24,
'address.geo.long': 25,
},
},
])
// Wait for the change stream events to be processed
await setTimeout(ms('2s'))
// Assertions
assert.equal(documents[0].fullDocument.address.geo.long, 25)
assert.equal(documents[0].fullDocument.address.geo.lat, undefined)
assert.equal(
documents[0].updateDescription.updatedFields['address.geo'].long,
25
)
assert.equal(
documents[0].updateDescription.updatedFields['address.geo'].lat,
undefined
)
// Stop
await changeStream.stop()
})
test('should omit fields from change stream - object', async () => {

@@ -585,3 +647,3 @@ const { coll, db } = await getConns()

test('change stream should resume properly', async () => {
test('change stream should resume after being stopped', async () => {
const { coll, db } = await getConns()

@@ -614,3 +676,3 @@ const sync = await getSync()

// Wait for all documents to be processed
await setTimeout(ms('8s'))
await setTimeout(ms('5s'))
// All change stream docs were processed

@@ -621,2 +683,36 @@ assert.equal(processed.length, numDocs)

test('change stream should resume after pause in events', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
let processed = []
// Change stream
const processRecords = async (docs: ChangeStreamDocument[]) => {
for (const doc of docs) {
await setTimeout(8)
processed.push(doc)
}
}
const changeStream = await sync.processChangeStream(processRecords)
changeStream.start()
// Let change stream connect
await setTimeout(ms('1s'))
// Change all documents
coll.updateMany({}, { $set: { createdAt: new Date('2022-01-02') } })
// Wait for all documents to be processed
await setTimeout(ms('6s'))
// All change stream docs were processed
assert.equal(processed.length, numDocs)
// Reset processed
processed = []
// Change all documents
coll.updateMany({}, { $set: { createdAt: new Date('2022-01-03') } })
// Wait for all documents to be processed
await setTimeout(ms('6s'))
// All change stream docs were processed
assert.equal(processed.length, numDocs)
await changeStream.stop()
})
test('change stream handle missing oplog entry properly', async () => {

@@ -623,0 +719,0 @@ const { coll, db, redis } = await getConns()

@@ -19,2 +19,3 @@ import _debug from 'debug'

import { safelyCheckNext } from './safelyCheckNext.js'
import {

@@ -36,5 +37,4 @@ ChangeOptions,

getCollectionKey,
omitFieldForUpdate,
omitFieldsForUpdate,
removeUnusedFields,
safelyCheckNext,
setDefaults,

@@ -381,3 +381,4 @@ when,

// Consume change stream
while ((event = await nextChecker.getNext())) {
while (await nextChecker.hasNext()) {
event = await changeStream.next()
debug('Change stream event %O', event)

@@ -392,3 +393,3 @@ // Skip the event if the operation type is not one we care about

if (event.operationType === 'update' && omit) {
event = omitFieldForUpdate(omit)(event) as ChangeStreamDocument
omitFieldsForUpdate(omit, event)
}

@@ -395,0 +396,0 @@ await queue.enqueue(event)

import assert from 'node:assert'
import { describe, test } from 'node:test'
import { generatePipelineFromOmit, removeUnusedFields } from './util.js'
import {
generatePipelineFromOmit,
omitFieldsForUpdate,
removeUnusedFields,
} from './util.js'

@@ -21,40 +25,2 @@ describe('util', () => {

})
test('should generate pipeline from omit with dotted fields', () => {
const pipeline = generatePipelineFromOmit([
'documents.agenda.parsedText',
'documents.agenda.contentType',
'createdAt',
])
assert.deepEqual(pipeline, [
{
$unset: [
'fullDocument.documents.agenda.parsedText',
'updateDescription.updatedFields.documents.agenda.parsedText',
'fullDocument.documents.agenda.contentType',
'updateDescription.updatedFields.documents.agenda.contentType',
'fullDocument.createdAt',
'updateDescription.updatedFields.createdAt',
],
},
{
$set: {
'updateDescription.updatedFields': {
$arrayToObject: {
$filter: {
input: { $objectToArray: '$updateDescription.updatedFields' },
cond: {
$regexMatch: {
input: '$$this.k',
regex:
'^(?!documents\\.agenda\\.parsedText|documents\\.agenda\\.contentType)',
},
},
},
},
},
},
},
])
})
})

@@ -125,2 +91,110 @@ describe('removeUnusedFields', () => {

})
describe('omitFieldsForUpdate', () => {
test('should remove omitted fields from removedFields - exact', () => {
const event: any = {
updateDescription: {
updatedFields: {},
removedFields: ['address.geo.long'],
truncatedArrays: [],
},
}
const expected = {
updateDescription: {
updatedFields: {},
removedFields: [],
truncatedArrays: [],
},
}
omitFieldsForUpdate(['address.geo.long'], event)
assert.deepEqual(event, expected)
})
test('should remove omitted fields from removedFields - prefix', () => {
const event: any = {
updateDescription: {
updatedFields: {},
removedFields: ['address.geo.long'],
truncatedArrays: [],
},
}
const expected = {
updateDescription: {
updatedFields: {},
removedFields: [],
truncatedArrays: [],
},
}
omitFieldsForUpdate(['address.geo'], event)
assert.deepEqual(event, expected)
})
test('should remove omitted fields from updatedFields - exact', () => {
const event: any = {
updateDescription: {
updatedFields: {
name: 'unknown',
'address.city': 'San Diego',
},
removedFields: [],
truncatedArrays: [],
},
}
const expected = {
updateDescription: {
updatedFields: {
name: 'unknown',
},
removedFields: [],
truncatedArrays: [],
},
}
omitFieldsForUpdate(['address.city'], event)
assert.deepEqual(event, expected)
})
test('should remove omitted fields from updatedFields - prefix', () => {
const event: any = {
updateDescription: {
updatedFields: {
name: 'unknown',
'address.geo.lat': 24,
},
removedFields: [],
truncatedArrays: [],
},
}
const expected = {
updateDescription: {
updatedFields: {
name: 'unknown',
},
removedFields: [],
truncatedArrays: [],
},
}
omitFieldsForUpdate(['address.geo'], event)
assert.deepEqual(event, expected)
})
test('should remove omitted fields from updatedFields - nested', () => {
const event: any = {
updateDescription: {
updatedFields: {
name: 'unknown',
'address.geo': { lat: 24, long: 25 },
},
removedFields: [],
truncatedArrays: [],
},
}
const expected = {
updateDescription: {
updatedFields: {
name: 'unknown',
'address.geo': { long: 25 },
},
removedFields: [],
truncatedArrays: [],
},
}
omitFieldsForUpdate(['address.geo.lat'], event)
assert.deepEqual(event, expected)
})
})
})

@@ -1,10 +0,11 @@

import _debug from 'debug'
import _ from 'lodash/fp.js'
import { type Collection, MongoServerError } from 'mongodb'
import { type Node, walkEach } from 'obj-walker'
import { set } from 'lodash'
import {
type ChangeStreamUpdateDocument,
type Collection,
MongoServerError,
} from 'mongodb'
import { map, type Node, walkEach } from 'obj-walker'
import type { Cursor, CursorError, JSONSchema } from './types.js'
import type { CursorError, JSONSchema } from './types.js'
const debug = _debug('mongochangestream')
export const setDefaults = (keys: string[], val: any) => {

@@ -18,2 +19,10 @@ const obj: Record<string, any> = {}

export const generatePipelineFromOmit = (omit: string[]) => {
const fields = omit.flatMap((field) => [
`fullDocument.${field}`,
`updateDescription.updatedFields.${field}`,
])
return [{ $unset: fields }]
}
/**

@@ -30,50 +39,35 @@ * Dotted path updates like { $set: {'a.b.c': 'foo'} } result in the following:

* ```
* Therefore, to remove 'a.b' we have to convert the `updateFields`
* object to an array, filter the array with a regex, and convert
* the array back to an object.
* Therefore, to remove 'a.b' we have to walk the `updateFields` object
* and unset the omitted paths.
*/
const removeDottedPaths = (omit: string[]) => {
const dottedFields = omit
.filter((x) => x.includes('.'))
// Escape periods
.map((x) => x.replaceAll('.', '\\.'))
if (dottedFields.length) {
return {
$set: {
'updateDescription.updatedFields': {
$arrayToObject: {
$filter: {
input: { $objectToArray: '$updateDescription.updatedFields' },
cond: {
$regexMatch: {
input: '$$this.k',
regex: `^(?!${dottedFields.join('|')})`,
},
},
},
},
},
export const omitFieldsForUpdate = (
omittedPaths: string[],
event: ChangeStreamUpdateDocument
) => {
const shouldOmit = (path: string) =>
omittedPaths.find(
(omittedPath) =>
path === omittedPath || path.startsWith(`${omittedPath}.`)
)
if (event.updateDescription.updatedFields) {
map(
event.updateDescription.updatedFields,
(node) => {
const fullPath = node.path.join('.')
if (!shouldOmit(fullPath)) {
return node.val
}
},
}
{ modifyInPlace: true }
)
}
if (event.updateDescription.removedFields) {
const removedFields = event.updateDescription.removedFields.filter(
(removedPath) => !shouldOmit(removedPath)
)
set(event, 'updateDescription.removedFields', removedFields)
}
}
export const generatePipelineFromOmit = (omit: string[]) => {
const fields = omit.flatMap((field) => [
`fullDocument.${field}`,
`updateDescription.updatedFields.${field}`,
])
const dottedPathsStage = removeDottedPaths(omit)
const pipeline: any[] = [{ $unset: fields }]
return dottedPathsStage ? pipeline.concat([dottedPathsStage]) : pipeline
}
export const omitFields = (omitPaths: string[]) =>
_.omitBy((_val, key) =>
_.find((omitPath) => _.startsWith(`${omitPath}.`, key), omitPaths)
)
export const omitFieldForUpdate = (omitPaths: string[]) =>
_.update('updateDescription.updatedFields', omitFields(omitPaths))
export const getCollectionKey = (collection: Collection) =>

@@ -116,26 +110,2 @@ `${collection.dbName}:${collection.collectionName}`

/**
* Get next record without throwing an exception.
* Get the last error safely via `getLastError`.
*/
export const safelyCheckNext = (cursor: Cursor) => {
let lastError: unknown
const getNext = async () => {
debug('safelyCheckNext called')
try {
return await cursor.tryNext()
} catch (e) {
debug('safelyCheckNext error: %o', e)
lastError = e
return null
}
}
const errorExists = () => Boolean(lastError)
const getLastError = () => lastError
return { getNext, errorExists, getLastError }
}
const oplogErrorCodeNames = [

@@ -142,0 +112,0 @@ 'ChangeStreamHistoryLost',

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc