Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mongochangestream

Package Overview
Dependencies
Maintainers
1
Versions
59
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongochangestream - npm Package Compare versions

Comparing version 0.37.0 to 0.38.0

6

CHANGELOG.md

@@ -0,1 +1,7 @@

# 0.38.0
- Remove all health check code in favor of using the `cursorError` event.
- Simplify some code.
- Add support for sort order (asc, desc) on initial scan.
# 0.37.0

@@ -2,0 +8,0 @@

147

dist/mongoChangeStream.js

@@ -63,3 +63,3 @@ "use strict";

const state = (0, simple_machines_1.fsm)(simpleStateTransistions, 'stopped', {
name: 'Resync',
name: 'detectResync',
onStateChange: emitStateChange,

@@ -92,10 +92,2 @@ });

};
/**
* Retrieve value from Redis and parse as int if possible
*/
const getLastSyncedAt = (key) => redis.get(key).then((x) => {
if (x) {
return parseInt(x, 10);
}
});
async function runInitialScan(processRecords, options = {}) {

@@ -105,3 +97,3 @@ let deferred;

const state = (0, simple_machines_1.fsm)(stateTransitions, 'stopped', {
name: 'Initial scan',
name: 'runInitialScan',
onStateChange: emitStateChange,

@@ -113,14 +105,6 @@ });

deserialize: (x) => new mongodb_1.ObjectId(x),
order: 'asc',
};
const sortField = options.sortField || defaultSortField;
/** Get the last id inserted */
const getLastIdInserted = () => collection
.find()
.project({ [sortField.field]: 1 })
.sort({ [sortField.field]: -1 })
.limit(1)
.toArray()
.then((x) => {
return x[0]?.[sortField.field];
});
const sortOrderToNum = { asc: 1, desc: -1 };
const getCursor = async () => {

@@ -146,3 +130,7 @@ // Lookup last id successfully processed

: []),
{ $sort: { [sortField.field]: 1 } },
{
$sort: {
[sortField.field]: sortOrderToNum[sortField.order ?? 'asc'],
},
},
...omitPipeline,

@@ -154,43 +142,2 @@ ...extendedPipeline,

};
/**
* Periodically check that records are being processed.
*/
const healthChecker = () => {
const { interval = (0, ms_1.default)('1m') } = options.healthCheck || {};
let timer;
const state = (0, simple_machines_1.fsm)(simpleStateTransistions, 'stopped', {
name: 'Initial scan health check',
onStateChange: emitStateChange,
});
const runHealthCheck = async () => {
debug('Checking health - initial scan');
const lastHealthCheck = new Date().getTime() - interval;
const withinHealthCheck = (x) => x && x > lastHealthCheck;
const lastSyncedAt = await getLastSyncedAt(keys.lastScanProcessedAtKey);
debug('Last scan processed at %d', lastSyncedAt);
// Records were not synced within the health check window
if (!withinHealthCheck(lastSyncedAt) && !state.is('stopped')) {
debug('Health check failed - initial scan');
emit('healthCheckFail', { failureType: 'initialScan', lastSyncedAt });
}
};
const start = () => {
debug('Starting health check - initial scan');
if (state.is('started')) {
return;
}
timer = setInterval(runHealthCheck, interval);
state.change('started');
};
const stop = () => {
debug('Stopping health check - initial scan');
if (state.is('stopped')) {
return;
}
clearInterval(timer);
state.change('stopped');
};
return { start, stop };
};
const healthCheck = healthChecker();
/** Start the initial scan */

@@ -216,10 +163,5 @@ const start = async () => {

// We're done
deferred.done();
state.change('started');
state.change('stopped');
return;
}
// Start the health check
if (options.healthCheck?.enabled) {
healthCheck.start();
}
const _processRecords = async (records) => {

@@ -243,5 +185,2 @@ // Process batch of records

state.change('started');
// Take a snapshot of the last id inserted into the collection
const lastIdInserted = await getLastIdInserted();
debug('Last id inserted %s', lastIdInserted);
const ns = { db: collection.dbName, coll: collection.collectionName };

@@ -265,23 +204,16 @@ const nextChecker = (0, util_js_1.safelyCheckNext)(cursor);

await queue.flush();
// Emit
// An error occurred checking for next
if (nextChecker.errorExists()) {
emit('cursorError', nextChecker.getLastError());
emit('cursorError', {
name: 'runInitialScan',
error: nextChecker.getLastError(),
});
}
// Final id processed
const finalIdProcessed = await redis.get(keys.lastScanIdKey);
debug('Final id processed %s', finalIdProcessed);
// Did we complete the initial scan?
if (
// No records in the collection
!lastIdInserted ||
// Final id processed was at least the last id inserted as of start
(finalIdProcessed &&
sortField.deserialize(finalIdProcessed) >= lastIdInserted)) {
// We're all done
else {
debug('Completed initial scan');
// Stop the health check
healthCheck.stop();
// Record scan complete
await redis.set(keys.scanCompletedKey, new Date().toString());
// Emit event
emit('initialScanComplete', { lastId: finalIdProcessed });
emit('initialScanComplete', {});
}

@@ -305,4 +237,2 @@ // Resolve deferred

state.change('stopping');
// Stop the health check
healthCheck.stop();
// Close the cursor

@@ -328,12 +258,9 @@ await cursor?.close();

const state = (0, simple_machines_1.fsm)(stateTransitions, 'stopped', {
name: 'Change stream',
name: 'processChangeStream',
onStateChange: emitStateChange,
});
const defaultOptions = { fullDocument: 'updateLookup' };
const healthCheck = {
enabled: false,
maxSyncDelay: (0, ms_1.default)('5m'),
interval: (0, ms_1.default)('1m'),
...options.healthCheck,
};
/**
* Get the change stream, resuming from a previous token if exists.
*/
const getChangeStream = async () => {

@@ -352,18 +279,2 @@ const omitPipeline = omit ? (0, util_js_1.generatePipelineFromOmit)(omit) : [];

};
const runHealthCheck = async (eventReceivedAt) => {
debug('Checking health - change stream');
const lastSyncedAt = await getLastSyncedAt(keys.lastChangeProcessedAtKey);
debug('Event received at %d', eventReceivedAt);
debug('Last change processed at %d', lastSyncedAt);
// Change stream event not synced
if (!lastSyncedAt || lastSyncedAt < eventReceivedAt) {
debug('Health check failed - change stream');
emit('healthCheckFail', {
failureType: 'changeStream',
eventReceivedAt,
lastSyncedAt,
});
}
};
const healthChecker = (0, util_js_1.delayed)(runHealthCheck, healthCheck.maxSyncDelay);
/** Start processing change stream */

@@ -390,6 +301,2 @@ const start = async () => {

while (await nextChecker.hasNext()) {
// Schedule health check
if (healthCheck.enabled) {
healthChecker(new Date().getTime());
}
let event = await changeStream.next();

@@ -407,2 +314,3 @@ debug('Change stream event %O', event);

await processRecord(event);
// Persist state
await redis.mset(keys.changeStreamTokenKey, JSON.stringify(token), keys.lastChangeProcessedAtKey, new Date().getTime());

@@ -412,3 +320,6 @@ }

if (nextChecker.errorExists()) {
emit('cursorError', nextChecker.getLastError());
emit('cursorError', {
name: 'processChangeStream',
error: nextChecker.getLastError(),
});
}

@@ -431,4 +342,2 @@ deferred.done();

state.change('stopping');
// Cancel health check
healthChecker.cancel();
// Close the change stream

@@ -469,3 +378,3 @@ await changeStream?.close();

const state = (0, simple_machines_1.fsm)(simpleStateTransistions, 'stopped', {
name: 'Schema change',
name: 'detectSchemaChange',
onStateChange: emitStateChange,

@@ -472,0 +381,0 @@ });

@@ -15,9 +15,6 @@ import { ChangeStream, AggregationCursor, ChangeStreamDocument, ChangeStreamInsertDocument, Document } from 'mongodb';

deserialize: (x: string) => T;
/** Sort order: asc or desc. Defaults to asc */
order?: 'asc' | 'desc';
}
export interface ScanOptions<T = any> {
healthCheck?: {
enabled: boolean;
/** How often to run the health check. */
interval?: number;
};
/** Defaults to _id */

@@ -29,7 +26,2 @@ sortField?: SortField<T>;

export interface ChangeStreamOptions {
healthCheck?: {
enabled: boolean;
/** The max allowed time for a change stream event to be processed. */
maxSyncDelay?: number;
};
pipeline?: Document[];

@@ -43,15 +35,3 @@ }

}
export type Events = 'cursorError' | 'healthCheckFail' | 'resync' | 'schemaChange' | 'stateChange' | 'initialScanComplete';
interface InitialScanFailEvent {
type: 'healthCheckFail';
failureType: 'initialScan';
lastSyncedAt: number;
}
interface ChangeStreamFailEvent {
type: 'healthCheckFail';
failureType: 'changeStream';
lastRecordUpdatedAt: number;
lastSyncedAt: number;
}
export type HealthCheckFailEvent = InitialScanFailEvent | ChangeStreamFailEvent;
export type Events = 'cursorError' | 'resync' | 'schemaChange' | 'stateChange' | 'initialScanComplete';
export interface ResyncEvent {

@@ -72,10 +52,9 @@ type: 'resync';

type: 'initialScanComplete';
lastId: string;
}
export interface CursorErrorEvent {
type: 'cursorError';
error: unknown;
name: 'runInitialScan' | 'processChangeStream';
error: Error;
}
export type State = 'starting' | 'started' | 'stopping' | 'stopped';
export type SimpleState = 'started' | 'stopped';
export {};

@@ -24,5 +24,3 @@ import { Collection } from 'mongodb';

errorExists: () => boolean;
getLastError: () => {
error: unknown;
};
getLastError: () => unknown;
};

@@ -29,0 +27,0 @@ /**

@@ -80,3 +80,3 @@ "use strict";

const errorExists = () => Boolean(lastError);
const getLastError = () => ({ error: lastError });
const getLastError = () => lastError;
return { hasNext, errorExists, getLastError };

@@ -83,0 +83,0 @@ };

{
"name": "mongochangestream",
"version": "0.37.0",
"version": "0.38.0",
"description": "Sync MongoDB collections via change streams into any database.",

@@ -29,4 +29,3 @@ "author": "GovSpend",

"sql",
"cratedb",
"health"
"cratedb"
],

@@ -33,0 +32,0 @@ "license": "ISC",

@@ -94,7 +94,6 @@ # Mongo Change Stream

Sometimes things stop working and a restart seems to fix the issue. In order
to automate this process you can pass `{healthCheck: {enabled: true}}` in the options
for `runInitialScan` and `processChangeStream`. This will run a health check
every `{healthCheck: {interval}}` (defaults to 1m). You can restart syncing
by checking for the `healthCheckFail` event.
Look for the `cursorError` event and restart the process or resync as needed.
See also the `missingOplogEntry` utility function that helps determine if an
oplog entry is no longer present and resumption of a change stream from a previous
point is not possible.

@@ -132,4 +131,4 @@ ## Companion Libraries

In this scenario, you will need to enable health checks and restart the
initial scan and/or change stream when a health check failure occurs.
In this scenario, you will need to subscribe to the `cursorError` event and
restart the process or handle otherwise.

@@ -136,0 +135,0 @@ ## Change Stream Strategies

@@ -13,3 +13,4 @@ /**

SyncOptions,
ChangeStreamOptions,
SortField,
CursorErrorEvent,
} from './types.js'

@@ -22,2 +23,3 @@ import {

Collection,
ObjectId,
} from 'mongodb'

@@ -111,2 +113,27 @@ import Redis from 'ioredis'

test('should run initial scan in reverse sort order', async () => {
const { coll } = await getConns()
const sync = await getSync()
await initState(sync, coll)
const processed = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(50)
processed.push(...docs)
}
const sortField: SortField<ObjectId> = {
field: '_id',
serialize: _.toString,
deserialize: (x: string) => new ObjectId(x),
order: 'desc',
}
const scanOptions = { batchSize: 100, sortField }
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
// Wait for initial scan to complete
await initialScan.start()
assert.equal(processed.length, numDocs)
// Stop
await initialScan.stop()
})
test('should omit fields from initial scan', async () => {

@@ -193,3 +220,3 @@ const { coll } = await getConns()

test('initial scan should not be marked as completed if connection is closed', async () => {
// Memoize hack
// Get a new connection since we're closing the connection in the test
const { coll, redis, client } = await getConns({})

@@ -516,51 +543,28 @@ const sync = initSync(redis, coll)

test('should fail health check - initial scan', async () => {
const { coll } = await getConns()
const sync = await getSync()
await initState(sync, coll)
let healthCheckFailed = false
const processed = []
let counter = 0
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(counter++ === 1 ? 1000 : 100)
processed.push(...docs)
}
const scanOptions: QueueOptions & ScanOptions = {
batchSize: 100,
healthCheck: { enabled: true, interval: 500 },
}
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
sync.emitter.on('healthCheckFail', () => {
healthCheckFailed = true
initialScan.stop()
test('can extend events', async () => {
const { coll, redis } = await getConns({})
const sync = initSync<'foo' | 'bar'>(redis, coll)
let emitted = ''
sync.emitter.on('foo', (x: string) => {
emitted = x
})
// Wait for initial scan to complete
await initialScan.start()
assert.ok(healthCheckFailed)
assert.notEqual(processed.length, numDocs)
// Stop
await initialScan.stop()
sync.emitter.emit('foo', 'bar')
assert.equal(emitted, 'bar')
})
test('should fail health check - change stream', async () => {
const { coll } = await getConns()
const sync = await getSync()
test('should emit cursorError if change stream is closed', async () => {
// Get a new connection since we're closing the connection in the test
const { redis, coll, client } = await getConns({})
const sync = initSync(redis, coll)
let error: any
sync.emitter.on('cursorError', (event: CursorErrorEvent) => {
console.log(event)
error = event.error
})
await initState(sync, coll)
let healthCheckFailed = false
const processed = []
const processRecord = async (doc: ChangeStreamDocument) => {
await setTimeout(ms('2s'))
processed.push(doc)
const processRecord = async () => {
await setTimeout(500)
}
const options: ChangeStreamOptions = {
healthCheck: { enabled: true, maxSyncDelay: ms('1s') },
}
const changeStream = await sync.processChangeStream(processRecord, options)
sync.emitter.on('healthCheckFail', () => {
healthCheckFailed = true
changeStream.stop()
})
sync.emitter.on('cursorError', console.log)
const changeStream = await sync.processChangeStream(processRecord)
// Start

@@ -570,20 +574,7 @@ changeStream.start()

// Update records
await coll.updateOne({}, { $set: { name: 'Tom' } })
// Wait for health checker to pick up failure
await setTimeout(ms('2s'))
assert.ok(healthCheckFailed)
assert.notEqual(processed.length, numDocs)
// Stop
await changeStream.stop()
await coll.updateMany({}, { $set: { createdAt: new Date('2022-01-01') } })
// Close the connection.
await client.close()
await setTimeout(ms('8s'))
assert.ok(error?.message)
})
test('can extend events', async () => {
const { coll, redis } = await getConns({})
const sync = initSync<'foo' | 'bar'>(redis, coll)
let emitted = ''
sync.emitter.on('foo', (x: string) => {
emitted = x
})
sync.emitter.emit('foo', 'bar')
assert.equal(emitted, 'bar')
})

@@ -34,3 +34,2 @@ import _ from 'lodash/fp.js'

when,
delayed,
} from './util.js'

@@ -98,3 +97,3 @@ import EventEmitter from 'eventemitter3'

const state = fsm(simpleStateTransistions, 'stopped', {
name: 'Resync',
name: 'detectResync',
onStateChange: emitStateChange,

@@ -132,12 +131,2 @@ })

/**
* Retrieve value from Redis and parse as int if possible
*/
const getLastSyncedAt = (key: string) =>
redis.get(key).then((x) => {
if (x) {
return parseInt(x, 10)
}
})
async function runInitialScan<T = any>(

@@ -150,3 +139,3 @@ processRecords: ProcessRecords,

const state = fsm(stateTransitions, 'stopped', {
name: 'Initial scan',
name: 'runInitialScan',
onStateChange: emitStateChange,

@@ -158,2 +147,3 @@ })

deserialize: (x: string) => new ObjectId(x),
order: 'asc',
}

@@ -163,14 +153,3 @@

/** Get the last id inserted */
const getLastIdInserted = () =>
collection
.find()
.project({ [sortField.field]: 1 })
.sort({ [sortField.field]: -1 })
.limit(1)
.toArray()
.then((x) => {
return x[0]?.[sortField.field]
})
const sortOrderToNum = { asc: 1, desc: -1 }
const getCursor = async () => {

@@ -196,3 +175,7 @@ // Lookup last id successfully processed

: []),
{ $sort: { [sortField.field]: 1 } },
{
$sort: {
[sortField.field]: sortOrderToNum[sortField.order ?? 'asc'],
},
},
...omitPipeline,

@@ -205,46 +188,2 @@ ...extendedPipeline,

/**
* Periodically check that records are being processed.
*/
const healthChecker = () => {
const { interval = ms('1m') } = options.healthCheck || {}
let timer: NodeJS.Timer
const state = fsm(simpleStateTransistions, 'stopped', {
name: 'Initial scan health check',
onStateChange: emitStateChange,
})
const runHealthCheck = async () => {
debug('Checking health - initial scan')
const lastHealthCheck = new Date().getTime() - interval
const withinHealthCheck = (x?: number) => x && x > lastHealthCheck
const lastSyncedAt = await getLastSyncedAt(keys.lastScanProcessedAtKey)
debug('Last scan processed at %d', lastSyncedAt)
// Records were not synced within the health check window
if (!withinHealthCheck(lastSyncedAt) && !state.is('stopped')) {
debug('Health check failed - initial scan')
emit('healthCheckFail', { failureType: 'initialScan', lastSyncedAt })
}
}
const start = () => {
debug('Starting health check - initial scan')
if (state.is('started')) {
return
}
timer = setInterval(runHealthCheck, interval)
state.change('started')
}
const stop = () => {
debug('Stopping health check - initial scan')
if (state.is('stopped')) {
return
}
clearInterval(timer)
state.change('stopped')
}
return { start, stop }
}
const healthCheck = healthChecker()
/** Start the initial scan */

@@ -270,10 +209,5 @@ const start = async () => {

// We're done
deferred.done()
state.change('started')
state.change('stopped')
return
}
// Start the health check
if (options.healthCheck?.enabled) {
healthCheck.start()
}

@@ -303,5 +237,2 @@ const _processRecords = async (records: ChangeStreamInsertDocument[]) => {

state.change('started')
// Take a snapshot of the last id inserted into the collection
const lastIdInserted = await getLastIdInserted()
debug('Last id inserted %s', lastIdInserted)

@@ -326,24 +257,16 @@ const ns = { db: collection.dbName, coll: collection.collectionName }

await queue.flush()
// Emit
// An error occurred checking for next
if (nextChecker.errorExists()) {
emit('cursorError', nextChecker.getLastError())
emit('cursorError', {
name: 'runInitialScan',
error: nextChecker.getLastError(),
})
}
// Final id processed
const finalIdProcessed = await redis.get(keys.lastScanIdKey)
debug('Final id processed %s', finalIdProcessed)
// Did we complete the initial scan?
if (
// No records in the collection
!lastIdInserted ||
// Final id processed was at least the last id inserted as of start
(finalIdProcessed &&
sortField.deserialize(finalIdProcessed) >= lastIdInserted)
) {
// We're all done
else {
debug('Completed initial scan')
// Stop the health check
healthCheck.stop()
// Record scan complete
await redis.set(keys.scanCompletedKey, new Date().toString())
// Emit event
emit('initialScanComplete', { lastId: finalIdProcessed })
emit('initialScanComplete', {})
}

@@ -368,4 +291,2 @@ // Resolve deferred

state.change('stopping')
// Stop the health check
healthCheck.stop()
// Close the cursor

@@ -397,3 +318,3 @@ await cursor?.close()

const state = fsm(stateTransitions, 'stopped', {
name: 'Change stream',
name: 'processChangeStream',
onStateChange: emitStateChange,

@@ -403,9 +324,5 @@ })

const healthCheck = {
enabled: false,
maxSyncDelay: ms('5m'),
interval: ms('1m'),
...options.healthCheck,
}
/**
* Get the change stream, resuming from a previous token if exists.
*/
const getChangeStream = async () => {

@@ -425,20 +342,2 @@ const omitPipeline = omit ? generatePipelineFromOmit(omit) : []

const runHealthCheck = async (eventReceivedAt: number) => {
debug('Checking health - change stream')
const lastSyncedAt = await getLastSyncedAt(keys.lastChangeProcessedAtKey)
debug('Event received at %d', eventReceivedAt)
debug('Last change processed at %d', lastSyncedAt)
// Change stream event not synced
if (!lastSyncedAt || lastSyncedAt < eventReceivedAt) {
debug('Health check failed - change stream')
emit('healthCheckFail', {
failureType: 'changeStream',
eventReceivedAt,
lastSyncedAt,
})
}
}
const healthChecker = delayed(runHealthCheck, healthCheck.maxSyncDelay)
/** Start processing change stream */

@@ -465,6 +364,2 @@ const start = async () => {

while (await nextChecker.hasNext()) {
// Schedule health check
if (healthCheck.enabled) {
healthChecker(new Date().getTime())
}
let event = await changeStream.next()

@@ -482,3 +377,3 @@ debug('Change stream event %O', event)

await processRecord(event)
// Persist state
await redis.mset(

@@ -493,3 +388,6 @@ keys.changeStreamTokenKey,

if (nextChecker.errorExists()) {
emit('cursorError', nextChecker.getLastError())
emit('cursorError', {
name: 'processChangeStream',
error: nextChecker.getLastError(),
})
}

@@ -513,4 +411,2 @@ deferred.done()

state.change('stopping')
// Cancel health check
healthChecker.cancel()
// Close the change stream

@@ -558,3 +454,3 @@ await changeStream?.close()

const state = fsm(simpleStateTransistions, 'stopped', {
name: 'Schema change',
name: 'detectSchemaChange',
onStateChange: emitStateChange,

@@ -561,0 +457,0 @@ })

@@ -30,10 +30,7 @@ import {

deserialize: (x: string) => T
/** Sort order: asc or desc. Defaults to asc */
order?: 'asc' | 'desc'
}
export interface ScanOptions<T = any> {
healthCheck?: {
enabled: boolean
/** How often to run the health check. */
interval?: number
}
/** Defaults to _id */

@@ -46,7 +43,2 @@ sortField?: SortField<T>

export interface ChangeStreamOptions {
healthCheck?: {
enabled: boolean
/** The max allowed time for a change stream event to be processed. */
maxSyncDelay?: number
}
pipeline?: Document[]

@@ -66,3 +58,2 @@ }

| 'cursorError'
| 'healthCheckFail'
| 'resync'

@@ -73,17 +64,2 @@ | 'schemaChange'

interface InitialScanFailEvent {
type: 'healthCheckFail'
failureType: 'initialScan'
lastSyncedAt: number
}
interface ChangeStreamFailEvent {
type: 'healthCheckFail'
failureType: 'changeStream'
lastRecordUpdatedAt: number
lastSyncedAt: number
}
export type HealthCheckFailEvent = InitialScanFailEvent | ChangeStreamFailEvent
export interface ResyncEvent {

@@ -107,3 +83,2 @@ type: 'resync'

type: 'initialScanComplete'
lastId: string
}

@@ -113,3 +88,4 @@

type: 'cursorError'
error: unknown
name: 'runInitialScan' | 'processChangeStream'
error: Error
}

@@ -116,0 +92,0 @@

@@ -85,3 +85,3 @@ import { Collection } from 'mongodb'

const errorExists = () => Boolean(lastError)
const getLastError = () => ({ error: lastError })
const getLastError = () => lastError

@@ -88,0 +88,0 @@ return { hasNext, errorExists, getLastError }

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc