Socket
Socket
Sign inDemoInstall

mongochangestream

Package Overview
Dependencies
Maintainers
0
Versions
56
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongochangestream - npm Package Compare versions

Comparing version 0.49.0 to 0.50.0

eslint.config.mjs

6

CHANGELOG.md

@@ -0,1 +1,7 @@

# 0.50.0
- Bump packages, including latest `prom-utils` which allows for throttling of items/sec and bytes/sec.
- New event: `stats` which emits stats for items/sec and bytes/sec when a batch of
records is processed for the initial scan or change stream.
# 0.49.0

@@ -2,0 +8,0 @@

14

dist/mongoChangeStream.js

@@ -6,3 +6,4 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.initSync = exports.getKeys = void 0;
exports.getKeys = void 0;
exports.initSync = initSync;
const debug_1 = __importDefault(require("debug"));

@@ -175,2 +176,7 @@ const eventemitter3_1 = __importDefault(require("eventemitter3"));

}
// Emit stats
emit('stats', {
name: 'runInitialScan',
stats: queue.getStats(),
});
};

@@ -298,2 +304,7 @@ // Create queue

}
// Emit stats
emit('stats', {
name: 'processChangeStream',
stats: queue.getStats(),
});
};

@@ -491,2 +502,1 @@ // New deferred

}
exports.initSync = initSync;
import { AggregationCursor, ChangeStream, ChangeStreamDocument, ChangeStreamInsertDocument, Document, MongoAPIError, MongoServerError } from 'mongodb';
import { QueueStats } from 'prom-utils';
export type Cursor = ChangeStream | AggregationCursor;

@@ -48,3 +49,3 @@ export type JSONSchema = Record<string, any>;

}
export type Events = 'cursorError' | 'resync' | 'schemaChange' | 'stateChange' | 'initialScanComplete';
export type Events = 'cursorError' | 'resync' | 'schemaChange' | 'stateChange' | 'initialScanComplete' | 'stats';
export interface ResyncEvent {

@@ -73,4 +74,13 @@ type: 'resync';

}
/**
* If `maxItemsPerSec` is not set, `stats.itemsPerSec` will be 0.
* If `maxBytesPerSec` is not set, `stats.bytesPerSec` will be 0.
*/
export interface StatsEvent {
type: 'stats';
name: 'runInitialScan' | 'processChangeStream';
stats: QueueStats;
}
export type State = 'starting' | 'started' | 'stopping' | 'stopped';
export type SimpleState = 'started' | 'stopped';
export {};

4

dist/util.js
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.delayed = exports.missingOplogEntry = exports.when = exports.removeUnusedFields = exports.traverseSchema = exports.getCollectionKey = exports.omitFieldsForUpdate = exports.generatePipelineFromOmit = exports.setDefaults = void 0;
exports.delayed = exports.missingOplogEntry = exports.removeUnusedFields = exports.traverseSchema = exports.getCollectionKey = exports.omitFieldsForUpdate = exports.generatePipelineFromOmit = exports.setDefaults = void 0;
exports.when = when;
const lodash_1 = require("lodash");

@@ -86,3 +87,2 @@ const mongodb_1 = require("mongodb");

}
exports.when = when;
const oplogErrorCodeNames = [

@@ -89,0 +89,0 @@ 'ChangeStreamHistoryLost',

{
"name": "mongochangestream",
"version": "0.49.0",
"version": "0.50.0",
"description": "Sync MongoDB collections via change streams into any database.",

@@ -42,13 +42,16 @@ "author": "GovSpend",

"devDependencies": {
"@eslint/eslintrc": "^3.1.0",
"@eslint/js": "^9.8.0",
"@faker-js/faker": "^8.4.1",
"@trivago/prettier-plugin-sort-imports": "^4.3.0",
"@types/debug": "^4.1.12",
"@types/lodash": "^4.17.5",
"@types/node": "^20.14.2",
"@typescript-eslint/eslint-plugin": "^7.13.0",
"prettier": "^3.3.2",
"typescript": "^5.4.5"
"@types/lodash": "^4.17.7",
"@types/node": "^22.1.0",
"@typescript-eslint/eslint-plugin": "^8.0.0",
"globals": "^15.9.0",
"prettier": "^3.3.3",
"typescript": "^5.5.4"
},
"dependencies": {
"debug": "^4.3.5",
"debug": "^4.3.6",
"eventemitter3": "^5.0.1",

@@ -58,3 +61,3 @@ "lodash": "^4.17.21",

"obj-walker": "^2.2.0",
"prom-utils": "^0.9.0",
"prom-utils": "^0.10.0",
"simple-machines": "^0.4.0"

@@ -61,0 +64,0 @@ },

@@ -21,3 +21,3 @@ /**

import { setTimeout } from 'node:timers/promises'
import { type QueueOptions } from 'prom-utils'
import type { QueueOptions, QueueStats } from 'prom-utils'

@@ -31,2 +31,3 @@ import { initSync } from './mongoChangeStream.js'

SortField,
StatsEvent,
SyncOptions,

@@ -212,2 +213,31 @@ } from './types.js'

test('initial scan should throttle', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
let stats: QueueStats = { itemsPerSec: 0, bytesPerSec: 0 }
sync.emitter.on('stats', (event: StatsEvent) => {
stats = event.stats
})
const processed = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(50)
processed.push(...docs)
}
const initialScan = await sync.runInitialScan(processRecords, {
batchSize: 100,
maxItemsPerSec: 200,
maxBytesPerSec: 25000,
})
// Wait for initial scan to complete
await initialScan.start()
assert.equal(processed.length, numDocs)
assert.ok(stats.itemsPerSec > 0 && stats.itemsPerSec < 250)
assert.ok(stats.bytesPerSec > 0 && stats.bytesPerSec < 40000)
// Stop
await initialScan.stop()
})
test('should allow parallel syncing via uniqueId option', async () => {

@@ -459,2 +489,38 @@ const { coll, db } = await getConns()

// Update records
coll.updateMany({}, { $set: { createdAt: new Date('2022-01-01') } })
// Wait for the change stream events to be processed
await setTimeout(ms('6s'))
assert.equal(processed.length, numDocs)
// Stop
await changeStream.stop()
// Should not emit cursorError when stopping
assert.equal(cursorError, false)
})
test('change stream should throttle', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
let stats: QueueStats = { itemsPerSec: 0, bytesPerSec: 0 }
sync.emitter.on('stats', (event: StatsEvent) => {
stats = event.stats
})
const processed: any[] = []
const processRecords = async (docs: ChangeStreamDocument[]) => {
for (const doc of docs) {
await setTimeout(5)
processed.push(doc)
}
}
const changeStream = await sync.processChangeStream(processRecords, {
batchSize: 100,
maxItemsPerSec: 100,
maxBytesPerSec: 65000,
})
// Start
changeStream.start()
await setTimeout(ms('1s'))
// Update records
coll.updateMany(

@@ -469,8 +535,8 @@ {},

// Wait for the change stream events to be processed
await setTimeout(ms('6s'))
await setTimeout(ms('8s'))
assert.equal(processed.length, numDocs)
assert.ok(stats.itemsPerSec > 0 && stats.itemsPerSec < 200)
assert.ok(stats.bytesPerSec > 0 && stats.bytesPerSec < 75000)
// Stop
await changeStream.stop()
// Should not emit cursorError when stopping
assert.equal(cursorError, false)
})

@@ -477,0 +543,0 @@

@@ -227,2 +227,7 @@ import _debug from 'debug'

}
// Emit stats
emit('stats', {
name: 'runInitialScan',
stats: queue.getStats(),
})
}

@@ -366,2 +371,7 @@ // Create queue

}
// Emit stats
emit('stats', {
name: 'processChangeStream',
stats: queue.getStats(),
})
}

@@ -368,0 +378,0 @@ // New deferred

@@ -10,2 +10,3 @@ import {

} from 'mongodb'
import { QueueStats } from 'prom-utils'

@@ -81,2 +82,3 @@ export type Cursor = ChangeStream | AggregationCursor

| 'initialScanComplete'
| 'stats'

@@ -112,2 +114,12 @@ export interface ResyncEvent {

/**
* If `maxItemsPerSec` is not set, `stats.itemsPerSec` will be 0.
* If `maxBytesPerSec` is not set, `stats.bytesPerSec` will be 0.
*/
export interface StatsEvent {
type: 'stats'
name: 'runInitialScan' | 'processChangeStream'
stats: QueueStats
}
// State

@@ -114,0 +126,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc