mongochangestream
Advanced tools
Comparing version 0.49.0 to 0.50.0
@@ -0,1 +1,7 @@ | ||
# 0.50.0 | ||
- Bump packages, including latest `prom-utils` which allows for throttling of items/sec and bytes/sec. | ||
- New event: `stats` which emits stats for items/sec and bytes/sec when a batch of | ||
records is processed for the initial scan or change stream. | ||
# 0.49.0 | ||
@@ -2,0 +8,0 @@ |
@@ -6,3 +6,4 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.initSync = exports.getKeys = void 0; | ||
exports.getKeys = void 0; | ||
exports.initSync = initSync; | ||
const debug_1 = __importDefault(require("debug")); | ||
@@ -175,2 +176,7 @@ const eventemitter3_1 = __importDefault(require("eventemitter3")); | ||
} | ||
// Emit stats | ||
emit('stats', { | ||
name: 'runInitialScan', | ||
stats: queue.getStats(), | ||
}); | ||
}; | ||
@@ -298,2 +304,7 @@ // Create queue | ||
} | ||
// Emit stats | ||
emit('stats', { | ||
name: 'processChangeStream', | ||
stats: queue.getStats(), | ||
}); | ||
}; | ||
@@ -491,2 +502,1 @@ // New deferred | ||
} | ||
exports.initSync = initSync; |
import { AggregationCursor, ChangeStream, ChangeStreamDocument, ChangeStreamInsertDocument, Document, MongoAPIError, MongoServerError } from 'mongodb'; | ||
import { QueueStats } from 'prom-utils'; | ||
export type Cursor = ChangeStream | AggregationCursor; | ||
@@ -48,3 +49,3 @@ export type JSONSchema = Record<string, any>; | ||
} | ||
export type Events = 'cursorError' | 'resync' | 'schemaChange' | 'stateChange' | 'initialScanComplete'; | ||
export type Events = 'cursorError' | 'resync' | 'schemaChange' | 'stateChange' | 'initialScanComplete' | 'stats'; | ||
export interface ResyncEvent { | ||
@@ -73,4 +74,13 @@ type: 'resync'; | ||
} | ||
/** | ||
* If `maxItemsPerSec` is not set, `stats.itemsPerSec` will be 0. | ||
* If `maxBytesPerSec` is not set, `stats.bytesPerSec` will be 0. | ||
*/ | ||
export interface StatsEvent { | ||
type: 'stats'; | ||
name: 'runInitialScan' | 'processChangeStream'; | ||
stats: QueueStats; | ||
} | ||
export type State = 'starting' | 'started' | 'stopping' | 'stopped'; | ||
export type SimpleState = 'started' | 'stopped'; | ||
export {}; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.delayed = exports.missingOplogEntry = exports.when = exports.removeUnusedFields = exports.traverseSchema = exports.getCollectionKey = exports.omitFieldsForUpdate = exports.generatePipelineFromOmit = exports.setDefaults = void 0; | ||
exports.delayed = exports.missingOplogEntry = exports.removeUnusedFields = exports.traverseSchema = exports.getCollectionKey = exports.omitFieldsForUpdate = exports.generatePipelineFromOmit = exports.setDefaults = void 0; | ||
exports.when = when; | ||
const lodash_1 = require("lodash"); | ||
@@ -86,3 +87,2 @@ const mongodb_1 = require("mongodb"); | ||
} | ||
exports.when = when; | ||
const oplogErrorCodeNames = [ | ||
@@ -89,0 +89,0 @@ 'ChangeStreamHistoryLost', |
{ | ||
"name": "mongochangestream", | ||
"version": "0.49.0", | ||
"version": "0.50.0", | ||
"description": "Sync MongoDB collections via change streams into any database.", | ||
@@ -42,13 +42,16 @@ "author": "GovSpend", | ||
"devDependencies": { | ||
"@eslint/eslintrc": "^3.1.0", | ||
"@eslint/js": "^9.8.0", | ||
"@faker-js/faker": "^8.4.1", | ||
"@trivago/prettier-plugin-sort-imports": "^4.3.0", | ||
"@types/debug": "^4.1.12", | ||
"@types/lodash": "^4.17.5", | ||
"@types/node": "^20.14.2", | ||
"@typescript-eslint/eslint-plugin": "^7.13.0", | ||
"prettier": "^3.3.2", | ||
"typescript": "^5.4.5" | ||
"@types/lodash": "^4.17.7", | ||
"@types/node": "^22.1.0", | ||
"@typescript-eslint/eslint-plugin": "^8.0.0", | ||
"globals": "^15.9.0", | ||
"prettier": "^3.3.3", | ||
"typescript": "^5.5.4" | ||
}, | ||
"dependencies": { | ||
"debug": "^4.3.5", | ||
"debug": "^4.3.6", | ||
"eventemitter3": "^5.0.1", | ||
@@ -58,3 +61,3 @@ "lodash": "^4.17.21", | ||
"obj-walker": "^2.2.0", | ||
"prom-utils": "^0.9.0", | ||
"prom-utils": "^0.10.0", | ||
"simple-machines": "^0.4.0" | ||
@@ -61,0 +64,0 @@ }, |
@@ -21,3 +21,3 @@ /** | ||
import { setTimeout } from 'node:timers/promises' | ||
import { type QueueOptions } from 'prom-utils' | ||
import type { QueueOptions, QueueStats } from 'prom-utils' | ||
@@ -31,2 +31,3 @@ import { initSync } from './mongoChangeStream.js' | ||
SortField, | ||
StatsEvent, | ||
SyncOptions, | ||
@@ -212,2 +213,31 @@ } from './types.js' | ||
test('initial scan should throttle', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
let stats: QueueStats = { itemsPerSec: 0, bytesPerSec: 0 } | ||
sync.emitter.on('stats', (event: StatsEvent) => { | ||
stats = event.stats | ||
}) | ||
const processed = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(50) | ||
processed.push(...docs) | ||
} | ||
const initialScan = await sync.runInitialScan(processRecords, { | ||
batchSize: 100, | ||
maxItemsPerSec: 200, | ||
maxBytesPerSec: 25000, | ||
}) | ||
// Wait for initial scan to complete | ||
await initialScan.start() | ||
assert.equal(processed.length, numDocs) | ||
assert.ok(stats.itemsPerSec > 0 && stats.itemsPerSec < 250) | ||
assert.ok(stats.bytesPerSec > 0 && stats.bytesPerSec < 40000) | ||
// Stop | ||
await initialScan.stop() | ||
}) | ||
test('should allow parallel syncing via uniqueId option', async () => { | ||
@@ -459,2 +489,38 @@ const { coll, db } = await getConns() | ||
// Update records | ||
coll.updateMany({}, { $set: { createdAt: new Date('2022-01-01') } }) | ||
// Wait for the change stream events to be processed | ||
await setTimeout(ms('6s')) | ||
assert.equal(processed.length, numDocs) | ||
// Stop | ||
await changeStream.stop() | ||
// Should not emit cursorError when stopping | ||
assert.equal(cursorError, false) | ||
}) | ||
test('change stream should throttle', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
let stats: QueueStats = { itemsPerSec: 0, bytesPerSec: 0 } | ||
sync.emitter.on('stats', (event: StatsEvent) => { | ||
stats = event.stats | ||
}) | ||
const processed: any[] = [] | ||
const processRecords = async (docs: ChangeStreamDocument[]) => { | ||
for (const doc of docs) { | ||
await setTimeout(5) | ||
processed.push(doc) | ||
} | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords, { | ||
batchSize: 100, | ||
maxItemsPerSec: 100, | ||
maxBytesPerSec: 65000, | ||
}) | ||
// Start | ||
changeStream.start() | ||
await setTimeout(ms('1s')) | ||
// Update records | ||
coll.updateMany( | ||
@@ -469,8 +535,8 @@ {}, | ||
// Wait for the change stream events to be processed | ||
await setTimeout(ms('6s')) | ||
await setTimeout(ms('8s')) | ||
assert.equal(processed.length, numDocs) | ||
assert.ok(stats.itemsPerSec > 0 && stats.itemsPerSec < 200) | ||
assert.ok(stats.bytesPerSec > 0 && stats.bytesPerSec < 75000) | ||
// Stop | ||
await changeStream.stop() | ||
// Should not emit cursorError when stopping | ||
assert.equal(cursorError, false) | ||
}) | ||
@@ -477,0 +543,0 @@ |
@@ -227,2 +227,7 @@ import _debug from 'debug' | ||
} | ||
// Emit stats | ||
emit('stats', { | ||
name: 'runInitialScan', | ||
stats: queue.getStats(), | ||
}) | ||
} | ||
@@ -366,2 +371,7 @@ // Create queue | ||
} | ||
// Emit stats | ||
emit('stats', { | ||
name: 'processChangeStream', | ||
stats: queue.getStats(), | ||
}) | ||
} | ||
@@ -368,0 +378,0 @@ // New deferred |
@@ -10,2 +10,3 @@ import { | ||
} from 'mongodb' | ||
import { QueueStats } from 'prom-utils' | ||
@@ -81,2 +82,3 @@ export type Cursor = ChangeStream | AggregationCursor | ||
| 'initialScanComplete' | ||
| 'stats' | ||
@@ -112,2 +114,12 @@ export interface ResyncEvent { | ||
/** | ||
* If `maxItemsPerSec` is not set, `stats.itemsPerSec` will be 0. | ||
* If `maxBytesPerSec` is not set, `stats.bytesPerSec` will be 0. | ||
*/ | ||
export interface StatsEvent { | ||
type: 'stats' | ||
name: 'runInitialScan' | 'processChangeStream' | ||
stats: QueueStats | ||
} | ||
// State | ||
@@ -114,0 +126,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
113682
2931
11
+ Added@esbuild/aix-ppc64@0.21.5(transitive)
+ Added@esbuild/android-arm@0.21.5(transitive)
+ Added@esbuild/android-arm64@0.21.5(transitive)
+ Added@esbuild/android-x64@0.21.5(transitive)
+ Added@esbuild/darwin-arm64@0.21.5(transitive)
+ Added@esbuild/darwin-x64@0.21.5(transitive)
+ Added@esbuild/freebsd-arm64@0.21.5(transitive)
+ Added@esbuild/freebsd-x64@0.21.5(transitive)
+ Added@esbuild/linux-arm@0.21.5(transitive)
+ Added@esbuild/linux-arm64@0.21.5(transitive)
+ Added@esbuild/linux-ia32@0.21.5(transitive)
+ Added@esbuild/linux-loong64@0.21.5(transitive)
+ Added@esbuild/linux-mips64el@0.21.5(transitive)
+ Added@esbuild/linux-ppc64@0.21.5(transitive)
+ Added@esbuild/linux-riscv64@0.21.5(transitive)
+ Added@esbuild/linux-s390x@0.21.5(transitive)
+ Added@esbuild/linux-x64@0.21.5(transitive)
+ Added@esbuild/netbsd-x64@0.21.5(transitive)
+ Added@esbuild/openbsd-x64@0.21.5(transitive)
+ Added@esbuild/sunos-x64@0.21.5(transitive)
+ Added@esbuild/win32-arm64@0.21.5(transitive)
+ Added@esbuild/win32-ia32@0.21.5(transitive)
+ Added@esbuild/win32-x64@0.21.5(transitive)
+ Added@jridgewell/sourcemap-codec@1.5.0(transitive)
+ Added@rollup/rollup-android-arm-eabi@4.22.4(transitive)
+ Added@rollup/rollup-android-arm64@4.22.4(transitive)
+ Added@rollup/rollup-darwin-arm64@4.22.4(transitive)
+ Added@rollup/rollup-darwin-x64@4.22.4(transitive)
+ Added@rollup/rollup-linux-arm-gnueabihf@4.22.4(transitive)
+ Added@rollup/rollup-linux-arm-musleabihf@4.22.4(transitive)
+ Added@rollup/rollup-linux-arm64-gnu@4.22.4(transitive)
+ Added@rollup/rollup-linux-arm64-musl@4.22.4(transitive)
+ Added@rollup/rollup-linux-powerpc64le-gnu@4.22.4(transitive)
+ Added@rollup/rollup-linux-riscv64-gnu@4.22.4(transitive)
+ Added@rollup/rollup-linux-s390x-gnu@4.22.4(transitive)
+ Added@rollup/rollup-linux-x64-gnu@4.22.4(transitive)
+ Added@rollup/rollup-linux-x64-musl@4.22.4(transitive)
+ Added@rollup/rollup-win32-arm64-msvc@4.22.4(transitive)
+ Added@rollup/rollup-win32-ia32-msvc@4.22.4(transitive)
+ Added@rollup/rollup-win32-x64-msvc@4.22.4(transitive)
+ Added@types/estree@1.0.51.0.6(transitive)
+ Added@vitest/expect@2.1.1(transitive)
+ Added@vitest/mocker@2.1.1(transitive)
+ Added@vitest/pretty-format@2.1.1(transitive)
+ Added@vitest/runner@2.1.1(transitive)
+ Added@vitest/snapshot@2.1.1(transitive)
+ Added@vitest/spy@2.1.1(transitive)
+ Added@vitest/utils@2.1.1(transitive)
+ Addedassertion-error@2.0.1(transitive)
+ Addedcac@6.7.14(transitive)
+ Addedchai@5.1.1(transitive)
+ Addedcheck-error@2.1.1(transitive)
+ Addeddeep-eql@5.0.2(transitive)
+ Addedesbuild@0.21.5(transitive)
+ Addedestree-walker@3.0.3(transitive)
+ Addedfsevents@2.3.3(transitive)
+ Addedget-func-name@2.0.2(transitive)
+ Addedloupe@3.1.1(transitive)
+ Addedmagic-string@0.30.11(transitive)
+ Addednanoid@3.3.7(transitive)
+ Addedpathe@1.1.2(transitive)
+ Addedpathval@2.0.0(transitive)
+ Addedpicocolors@1.1.0(transitive)
+ Addedpostcss@8.4.47(transitive)
+ Addedprom-utils@0.10.0(transitive)
+ Addedrollup@4.22.4(transitive)
+ Addedsiginfo@2.0.0(transitive)
+ Addedsource-map-js@1.2.1(transitive)
+ Addedstackback@0.0.2(transitive)
+ Addedstd-env@3.7.0(transitive)
+ Addedtinybench@2.9.0(transitive)
+ Addedtinyexec@0.3.0(transitive)
+ Addedtinypool@1.0.1(transitive)
+ Addedtinyrainbow@1.2.0(transitive)
+ Addedtinyspy@3.0.2(transitive)
+ Addedvite@5.4.7(transitive)
+ Addedvite-node@2.1.1(transitive)
+ Addedvitest@2.1.1(transitive)
+ Addedwhy-is-node-running@2.3.0(transitive)
- Removedprom-utils@0.9.0(transitive)
Updateddebug@^4.3.6
Updatedprom-utils@^0.10.0