mongochangestream
Advanced tools
Comparing version 0.37.0 to 0.38.0
@@ -0,1 +1,7 @@ | ||
# 0.38.0 | ||
- Remove all health check code in favor of using the `cursorError` event. | ||
- Simplify some code. | ||
- Add support for sort order (asc, desc) on initial scan. | ||
# 0.37.0 | ||
@@ -2,0 +8,0 @@ |
@@ -63,3 +63,3 @@ "use strict"; | ||
const state = (0, simple_machines_1.fsm)(simpleStateTransistions, 'stopped', { | ||
name: 'Resync', | ||
name: 'detectResync', | ||
onStateChange: emitStateChange, | ||
@@ -92,10 +92,2 @@ }); | ||
}; | ||
/** | ||
* Retrieve value from Redis and parse as int if possible | ||
*/ | ||
const getLastSyncedAt = (key) => redis.get(key).then((x) => { | ||
if (x) { | ||
return parseInt(x, 10); | ||
} | ||
}); | ||
async function runInitialScan(processRecords, options = {}) { | ||
@@ -105,3 +97,3 @@ let deferred; | ||
const state = (0, simple_machines_1.fsm)(stateTransitions, 'stopped', { | ||
name: 'Initial scan', | ||
name: 'runInitialScan', | ||
onStateChange: emitStateChange, | ||
@@ -113,14 +105,6 @@ }); | ||
deserialize: (x) => new mongodb_1.ObjectId(x), | ||
order: 'asc', | ||
}; | ||
const sortField = options.sortField || defaultSortField; | ||
/** Get the last id inserted */ | ||
const getLastIdInserted = () => collection | ||
.find() | ||
.project({ [sortField.field]: 1 }) | ||
.sort({ [sortField.field]: -1 }) | ||
.limit(1) | ||
.toArray() | ||
.then((x) => { | ||
return x[0]?.[sortField.field]; | ||
}); | ||
const sortOrderToNum = { asc: 1, desc: -1 }; | ||
const getCursor = async () => { | ||
@@ -146,3 +130,7 @@ // Lookup last id successfully processed | ||
: []), | ||
{ $sort: { [sortField.field]: 1 } }, | ||
{ | ||
$sort: { | ||
[sortField.field]: sortOrderToNum[sortField.order ?? 'asc'], | ||
}, | ||
}, | ||
...omitPipeline, | ||
@@ -154,43 +142,2 @@ ...extendedPipeline, | ||
}; | ||
/** | ||
* Periodically check that records are being processed. | ||
*/ | ||
const healthChecker = () => { | ||
const { interval = (0, ms_1.default)('1m') } = options.healthCheck || {}; | ||
let timer; | ||
const state = (0, simple_machines_1.fsm)(simpleStateTransistions, 'stopped', { | ||
name: 'Initial scan health check', | ||
onStateChange: emitStateChange, | ||
}); | ||
const runHealthCheck = async () => { | ||
debug('Checking health - initial scan'); | ||
const lastHealthCheck = new Date().getTime() - interval; | ||
const withinHealthCheck = (x) => x && x > lastHealthCheck; | ||
const lastSyncedAt = await getLastSyncedAt(keys.lastScanProcessedAtKey); | ||
debug('Last scan processed at %d', lastSyncedAt); | ||
// Records were not synced within the health check window | ||
if (!withinHealthCheck(lastSyncedAt) && !state.is('stopped')) { | ||
debug('Health check failed - initial scan'); | ||
emit('healthCheckFail', { failureType: 'initialScan', lastSyncedAt }); | ||
} | ||
}; | ||
const start = () => { | ||
debug('Starting health check - initial scan'); | ||
if (state.is('started')) { | ||
return; | ||
} | ||
timer = setInterval(runHealthCheck, interval); | ||
state.change('started'); | ||
}; | ||
const stop = () => { | ||
debug('Stopping health check - initial scan'); | ||
if (state.is('stopped')) { | ||
return; | ||
} | ||
clearInterval(timer); | ||
state.change('stopped'); | ||
}; | ||
return { start, stop }; | ||
}; | ||
const healthCheck = healthChecker(); | ||
/** Start the initial scan */ | ||
@@ -216,10 +163,5 @@ const start = async () => { | ||
// We're done | ||
deferred.done(); | ||
state.change('started'); | ||
state.change('stopped'); | ||
return; | ||
} | ||
// Start the health check | ||
if (options.healthCheck?.enabled) { | ||
healthCheck.start(); | ||
} | ||
const _processRecords = async (records) => { | ||
@@ -243,5 +185,2 @@ // Process batch of records | ||
state.change('started'); | ||
// Take a snapshot of the last id inserted into the collection | ||
const lastIdInserted = await getLastIdInserted(); | ||
debug('Last id inserted %s', lastIdInserted); | ||
const ns = { db: collection.dbName, coll: collection.collectionName }; | ||
@@ -265,23 +204,16 @@ const nextChecker = (0, util_js_1.safelyCheckNext)(cursor); | ||
await queue.flush(); | ||
// Emit | ||
// An error occurred checking for next | ||
if (nextChecker.errorExists()) { | ||
emit('cursorError', nextChecker.getLastError()); | ||
emit('cursorError', { | ||
name: 'runInitialScan', | ||
error: nextChecker.getLastError(), | ||
}); | ||
} | ||
// Final id processed | ||
const finalIdProcessed = await redis.get(keys.lastScanIdKey); | ||
debug('Final id processed %s', finalIdProcessed); | ||
// Did we complete the initial scan? | ||
if ( | ||
// No records in the collection | ||
!lastIdInserted || | ||
// Final id processed was at least the last id inserted as of start | ||
(finalIdProcessed && | ||
sortField.deserialize(finalIdProcessed) >= lastIdInserted)) { | ||
// We're all done | ||
else { | ||
debug('Completed initial scan'); | ||
// Stop the health check | ||
healthCheck.stop(); | ||
// Record scan complete | ||
await redis.set(keys.scanCompletedKey, new Date().toString()); | ||
// Emit event | ||
emit('initialScanComplete', { lastId: finalIdProcessed }); | ||
emit('initialScanComplete', {}); | ||
} | ||
@@ -305,4 +237,2 @@ // Resolve deferred | ||
state.change('stopping'); | ||
// Stop the health check | ||
healthCheck.stop(); | ||
// Close the cursor | ||
@@ -328,12 +258,9 @@ await cursor?.close(); | ||
const state = (0, simple_machines_1.fsm)(stateTransitions, 'stopped', { | ||
name: 'Change stream', | ||
name: 'processChangeStream', | ||
onStateChange: emitStateChange, | ||
}); | ||
const defaultOptions = { fullDocument: 'updateLookup' }; | ||
const healthCheck = { | ||
enabled: false, | ||
maxSyncDelay: (0, ms_1.default)('5m'), | ||
interval: (0, ms_1.default)('1m'), | ||
...options.healthCheck, | ||
}; | ||
/** | ||
* Get the change stream, resuming from a previous token if exists. | ||
*/ | ||
const getChangeStream = async () => { | ||
@@ -352,18 +279,2 @@ const omitPipeline = omit ? (0, util_js_1.generatePipelineFromOmit)(omit) : []; | ||
}; | ||
const runHealthCheck = async (eventReceivedAt) => { | ||
debug('Checking health - change stream'); | ||
const lastSyncedAt = await getLastSyncedAt(keys.lastChangeProcessedAtKey); | ||
debug('Event received at %d', eventReceivedAt); | ||
debug('Last change processed at %d', lastSyncedAt); | ||
// Change stream event not synced | ||
if (!lastSyncedAt || lastSyncedAt < eventReceivedAt) { | ||
debug('Health check failed - change stream'); | ||
emit('healthCheckFail', { | ||
failureType: 'changeStream', | ||
eventReceivedAt, | ||
lastSyncedAt, | ||
}); | ||
} | ||
}; | ||
const healthChecker = (0, util_js_1.delayed)(runHealthCheck, healthCheck.maxSyncDelay); | ||
/** Start processing change stream */ | ||
@@ -390,6 +301,2 @@ const start = async () => { | ||
while (await nextChecker.hasNext()) { | ||
// Schedule health check | ||
if (healthCheck.enabled) { | ||
healthChecker(new Date().getTime()); | ||
} | ||
let event = await changeStream.next(); | ||
@@ -407,2 +314,3 @@ debug('Change stream event %O', event); | ||
await processRecord(event); | ||
// Persist state | ||
await redis.mset(keys.changeStreamTokenKey, JSON.stringify(token), keys.lastChangeProcessedAtKey, new Date().getTime()); | ||
@@ -412,3 +320,6 @@ } | ||
if (nextChecker.errorExists()) { | ||
emit('cursorError', nextChecker.getLastError()); | ||
emit('cursorError', { | ||
name: 'processChangeStream', | ||
error: nextChecker.getLastError(), | ||
}); | ||
} | ||
@@ -431,4 +342,2 @@ deferred.done(); | ||
state.change('stopping'); | ||
// Cancel health check | ||
healthChecker.cancel(); | ||
// Close the change stream | ||
@@ -469,3 +378,3 @@ await changeStream?.close(); | ||
const state = (0, simple_machines_1.fsm)(simpleStateTransistions, 'stopped', { | ||
name: 'Schema change', | ||
name: 'detectSchemaChange', | ||
onStateChange: emitStateChange, | ||
@@ -472,0 +381,0 @@ }); |
@@ -15,9 +15,6 @@ import { ChangeStream, AggregationCursor, ChangeStreamDocument, ChangeStreamInsertDocument, Document } from 'mongodb'; | ||
deserialize: (x: string) => T; | ||
/** Sort order: asc or desc. Defaults to asc */ | ||
order?: 'asc' | 'desc'; | ||
} | ||
export interface ScanOptions<T = any> { | ||
healthCheck?: { | ||
enabled: boolean; | ||
/** How often to run the health check. */ | ||
interval?: number; | ||
}; | ||
/** Defaults to _id */ | ||
@@ -29,7 +26,2 @@ sortField?: SortField<T>; | ||
export interface ChangeStreamOptions { | ||
healthCheck?: { | ||
enabled: boolean; | ||
/** The max allowed time for a change stream event to be processed. */ | ||
maxSyncDelay?: number; | ||
}; | ||
pipeline?: Document[]; | ||
@@ -43,15 +35,3 @@ } | ||
} | ||
export type Events = 'cursorError' | 'healthCheckFail' | 'resync' | 'schemaChange' | 'stateChange' | 'initialScanComplete'; | ||
interface InitialScanFailEvent { | ||
type: 'healthCheckFail'; | ||
failureType: 'initialScan'; | ||
lastSyncedAt: number; | ||
} | ||
interface ChangeStreamFailEvent { | ||
type: 'healthCheckFail'; | ||
failureType: 'changeStream'; | ||
lastRecordUpdatedAt: number; | ||
lastSyncedAt: number; | ||
} | ||
export type HealthCheckFailEvent = InitialScanFailEvent | ChangeStreamFailEvent; | ||
export type Events = 'cursorError' | 'resync' | 'schemaChange' | 'stateChange' | 'initialScanComplete'; | ||
export interface ResyncEvent { | ||
@@ -72,10 +52,9 @@ type: 'resync'; | ||
type: 'initialScanComplete'; | ||
lastId: string; | ||
} | ||
export interface CursorErrorEvent { | ||
type: 'cursorError'; | ||
error: unknown; | ||
name: 'runInitialScan' | 'processChangeStream'; | ||
error: Error; | ||
} | ||
export type State = 'starting' | 'started' | 'stopping' | 'stopped'; | ||
export type SimpleState = 'started' | 'stopped'; | ||
export {}; |
@@ -24,5 +24,3 @@ import { Collection } from 'mongodb'; | ||
errorExists: () => boolean; | ||
getLastError: () => { | ||
error: unknown; | ||
}; | ||
getLastError: () => unknown; | ||
}; | ||
@@ -29,0 +27,0 @@ /** |
@@ -80,3 +80,3 @@ "use strict"; | ||
const errorExists = () => Boolean(lastError); | ||
const getLastError = () => ({ error: lastError }); | ||
const getLastError = () => lastError; | ||
return { hasNext, errorExists, getLastError }; | ||
@@ -83,0 +83,0 @@ }; |
{ | ||
"name": "mongochangestream", | ||
"version": "0.37.0", | ||
"version": "0.38.0", | ||
"description": "Sync MongoDB collections via change streams into any database.", | ||
@@ -29,4 +29,3 @@ "author": "GovSpend", | ||
"sql", | ||
"cratedb", | ||
"health" | ||
"cratedb" | ||
], | ||
@@ -33,0 +32,0 @@ "license": "ISC", |
@@ -94,7 +94,6 @@ # Mongo Change Stream | ||
Sometimes things stop working and a restart seems to fix the issue. In order | ||
to automate this process you can pass `{healthCheck: {enabled: true}}` in the options | ||
for `runInitialScan` and `processChangeStream`. This will run a health check | ||
every `{healthCheck: {interval}}` (defaults to 1m). You can restart syncing | ||
by checking for the `healthCheckFail` event. | ||
Look for the `cursorError` event and restart the process or resync as needed. | ||
See also the `missingOplogEntry` utility function that helps determine if an | ||
oplog entry is no longer present and resumption of a change stream from a previous | ||
point is not possible. | ||
@@ -132,4 +131,4 @@ ## Companion Libraries | ||
In this scenario, you will need to enable health checks and restart the | ||
initial scan and/or change stream when a health check failure occurs. | ||
In this scenario, you will need to subscribe to the `cursorError` event and | ||
restart the process or handle otherwise. | ||
@@ -136,0 +135,0 @@ ## Change Stream Strategies |
@@ -13,3 +13,4 @@ /** | ||
SyncOptions, | ||
ChangeStreamOptions, | ||
SortField, | ||
CursorErrorEvent, | ||
} from './types.js' | ||
@@ -22,2 +23,3 @@ import { | ||
Collection, | ||
ObjectId, | ||
} from 'mongodb' | ||
@@ -111,2 +113,27 @@ import Redis from 'ioredis' | ||
test('should run initial scan in reverse sort order', async () => { | ||
const { coll } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, coll) | ||
const processed = [] | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(50) | ||
processed.push(...docs) | ||
} | ||
const sortField: SortField<ObjectId> = { | ||
field: '_id', | ||
serialize: _.toString, | ||
deserialize: (x: string) => new ObjectId(x), | ||
order: 'desc', | ||
} | ||
const scanOptions = { batchSize: 100, sortField } | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
// Wait for initial scan to complete | ||
await initialScan.start() | ||
assert.equal(processed.length, numDocs) | ||
// Stop | ||
await initialScan.stop() | ||
}) | ||
test('should omit fields from initial scan', async () => { | ||
@@ -193,3 +220,3 @@ const { coll } = await getConns() | ||
test('initial scan should not be marked as completed if connection is closed', async () => { | ||
// Memoize hack | ||
// Get a new connection since we're closing the connection in the test | ||
const { coll, redis, client } = await getConns({}) | ||
@@ -516,51 +543,28 @@ const sync = initSync(redis, coll) | ||
test('should fail health check - initial scan', async () => { | ||
const { coll } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, coll) | ||
let healthCheckFailed = false | ||
const processed = [] | ||
let counter = 0 | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(counter++ === 1 ? 1000 : 100) | ||
processed.push(...docs) | ||
} | ||
const scanOptions: QueueOptions & ScanOptions = { | ||
batchSize: 100, | ||
healthCheck: { enabled: true, interval: 500 }, | ||
} | ||
const initialScan = await sync.runInitialScan(processRecords, scanOptions) | ||
sync.emitter.on('healthCheckFail', () => { | ||
healthCheckFailed = true | ||
initialScan.stop() | ||
test('can extend events', async () => { | ||
const { coll, redis } = await getConns({}) | ||
const sync = initSync<'foo' | 'bar'>(redis, coll) | ||
let emitted = '' | ||
sync.emitter.on('foo', (x: string) => { | ||
emitted = x | ||
}) | ||
// Wait for initial scan to complete | ||
await initialScan.start() | ||
assert.ok(healthCheckFailed) | ||
assert.notEqual(processed.length, numDocs) | ||
// Stop | ||
await initialScan.stop() | ||
sync.emitter.emit('foo', 'bar') | ||
assert.equal(emitted, 'bar') | ||
}) | ||
test('should fail health check - change stream', async () => { | ||
const { coll } = await getConns() | ||
const sync = await getSync() | ||
test('should emit cursorError if change stream is closed', async () => { | ||
// Get a new connection since we're closing the connection in the test | ||
const { redis, coll, client } = await getConns({}) | ||
const sync = initSync(redis, coll) | ||
let error: any | ||
sync.emitter.on('cursorError', (event: CursorErrorEvent) => { | ||
console.log(event) | ||
error = event.error | ||
}) | ||
await initState(sync, coll) | ||
let healthCheckFailed = false | ||
const processed = [] | ||
const processRecord = async (doc: ChangeStreamDocument) => { | ||
await setTimeout(ms('2s')) | ||
processed.push(doc) | ||
const processRecord = async () => { | ||
await setTimeout(500) | ||
} | ||
const options: ChangeStreamOptions = { | ||
healthCheck: { enabled: true, maxSyncDelay: ms('1s') }, | ||
} | ||
const changeStream = await sync.processChangeStream(processRecord, options) | ||
sync.emitter.on('healthCheckFail', () => { | ||
healthCheckFailed = true | ||
changeStream.stop() | ||
}) | ||
sync.emitter.on('cursorError', console.log) | ||
const changeStream = await sync.processChangeStream(processRecord) | ||
// Start | ||
@@ -570,20 +574,7 @@ changeStream.start() | ||
// Update records | ||
await coll.updateOne({}, { $set: { name: 'Tom' } }) | ||
// Wait for health checker to pick up failure | ||
await setTimeout(ms('2s')) | ||
assert.ok(healthCheckFailed) | ||
assert.notEqual(processed.length, numDocs) | ||
// Stop | ||
await changeStream.stop() | ||
await coll.updateMany({}, { $set: { createdAt: new Date('2022-01-01') } }) | ||
// Close the connection. | ||
await client.close() | ||
await setTimeout(ms('8s')) | ||
assert.ok(error?.message) | ||
}) | ||
test('can extend events', async () => { | ||
const { coll, redis } = await getConns({}) | ||
const sync = initSync<'foo' | 'bar'>(redis, coll) | ||
let emitted = '' | ||
sync.emitter.on('foo', (x: string) => { | ||
emitted = x | ||
}) | ||
sync.emitter.emit('foo', 'bar') | ||
assert.equal(emitted, 'bar') | ||
}) |
@@ -34,3 +34,2 @@ import _ from 'lodash/fp.js' | ||
when, | ||
delayed, | ||
} from './util.js' | ||
@@ -98,3 +97,3 @@ import EventEmitter from 'eventemitter3' | ||
const state = fsm(simpleStateTransistions, 'stopped', { | ||
name: 'Resync', | ||
name: 'detectResync', | ||
onStateChange: emitStateChange, | ||
@@ -132,12 +131,2 @@ }) | ||
/** | ||
* Retrieve value from Redis and parse as int if possible | ||
*/ | ||
const getLastSyncedAt = (key: string) => | ||
redis.get(key).then((x) => { | ||
if (x) { | ||
return parseInt(x, 10) | ||
} | ||
}) | ||
async function runInitialScan<T = any>( | ||
@@ -150,3 +139,3 @@ processRecords: ProcessRecords, | ||
const state = fsm(stateTransitions, 'stopped', { | ||
name: 'Initial scan', | ||
name: 'runInitialScan', | ||
onStateChange: emitStateChange, | ||
@@ -158,2 +147,3 @@ }) | ||
deserialize: (x: string) => new ObjectId(x), | ||
order: 'asc', | ||
} | ||
@@ -163,14 +153,3 @@ | ||
/** Get the last id inserted */ | ||
const getLastIdInserted = () => | ||
collection | ||
.find() | ||
.project({ [sortField.field]: 1 }) | ||
.sort({ [sortField.field]: -1 }) | ||
.limit(1) | ||
.toArray() | ||
.then((x) => { | ||
return x[0]?.[sortField.field] | ||
}) | ||
const sortOrderToNum = { asc: 1, desc: -1 } | ||
const getCursor = async () => { | ||
@@ -196,3 +175,7 @@ // Lookup last id successfully processed | ||
: []), | ||
{ $sort: { [sortField.field]: 1 } }, | ||
{ | ||
$sort: { | ||
[sortField.field]: sortOrderToNum[sortField.order ?? 'asc'], | ||
}, | ||
}, | ||
...omitPipeline, | ||
@@ -205,46 +188,2 @@ ...extendedPipeline, | ||
/** | ||
* Periodically check that records are being processed. | ||
*/ | ||
const healthChecker = () => { | ||
const { interval = ms('1m') } = options.healthCheck || {} | ||
let timer: NodeJS.Timer | ||
const state = fsm(simpleStateTransistions, 'stopped', { | ||
name: 'Initial scan health check', | ||
onStateChange: emitStateChange, | ||
}) | ||
const runHealthCheck = async () => { | ||
debug('Checking health - initial scan') | ||
const lastHealthCheck = new Date().getTime() - interval | ||
const withinHealthCheck = (x?: number) => x && x > lastHealthCheck | ||
const lastSyncedAt = await getLastSyncedAt(keys.lastScanProcessedAtKey) | ||
debug('Last scan processed at %d', lastSyncedAt) | ||
// Records were not synced within the health check window | ||
if (!withinHealthCheck(lastSyncedAt) && !state.is('stopped')) { | ||
debug('Health check failed - initial scan') | ||
emit('healthCheckFail', { failureType: 'initialScan', lastSyncedAt }) | ||
} | ||
} | ||
const start = () => { | ||
debug('Starting health check - initial scan') | ||
if (state.is('started')) { | ||
return | ||
} | ||
timer = setInterval(runHealthCheck, interval) | ||
state.change('started') | ||
} | ||
const stop = () => { | ||
debug('Stopping health check - initial scan') | ||
if (state.is('stopped')) { | ||
return | ||
} | ||
clearInterval(timer) | ||
state.change('stopped') | ||
} | ||
return { start, stop } | ||
} | ||
const healthCheck = healthChecker() | ||
/** Start the initial scan */ | ||
@@ -270,10 +209,5 @@ const start = async () => { | ||
// We're done | ||
deferred.done() | ||
state.change('started') | ||
state.change('stopped') | ||
return | ||
} | ||
// Start the health check | ||
if (options.healthCheck?.enabled) { | ||
healthCheck.start() | ||
} | ||
@@ -303,5 +237,2 @@ const _processRecords = async (records: ChangeStreamInsertDocument[]) => { | ||
state.change('started') | ||
// Take a snapshot of the last id inserted into the collection | ||
const lastIdInserted = await getLastIdInserted() | ||
debug('Last id inserted %s', lastIdInserted) | ||
@@ -326,24 +257,16 @@ const ns = { db: collection.dbName, coll: collection.collectionName } | ||
await queue.flush() | ||
// Emit | ||
// An error occurred checking for next | ||
if (nextChecker.errorExists()) { | ||
emit('cursorError', nextChecker.getLastError()) | ||
emit('cursorError', { | ||
name: 'runInitialScan', | ||
error: nextChecker.getLastError(), | ||
}) | ||
} | ||
// Final id processed | ||
const finalIdProcessed = await redis.get(keys.lastScanIdKey) | ||
debug('Final id processed %s', finalIdProcessed) | ||
// Did we complete the initial scan? | ||
if ( | ||
// No records in the collection | ||
!lastIdInserted || | ||
// Final id processed was at least the last id inserted as of start | ||
(finalIdProcessed && | ||
sortField.deserialize(finalIdProcessed) >= lastIdInserted) | ||
) { | ||
// We're all done | ||
else { | ||
debug('Completed initial scan') | ||
// Stop the health check | ||
healthCheck.stop() | ||
// Record scan complete | ||
await redis.set(keys.scanCompletedKey, new Date().toString()) | ||
// Emit event | ||
emit('initialScanComplete', { lastId: finalIdProcessed }) | ||
emit('initialScanComplete', {}) | ||
} | ||
@@ -368,4 +291,2 @@ // Resolve deferred | ||
state.change('stopping') | ||
// Stop the health check | ||
healthCheck.stop() | ||
// Close the cursor | ||
@@ -397,3 +318,3 @@ await cursor?.close() | ||
const state = fsm(stateTransitions, 'stopped', { | ||
name: 'Change stream', | ||
name: 'processChangeStream', | ||
onStateChange: emitStateChange, | ||
@@ -403,9 +324,5 @@ }) | ||
const healthCheck = { | ||
enabled: false, | ||
maxSyncDelay: ms('5m'), | ||
interval: ms('1m'), | ||
...options.healthCheck, | ||
} | ||
/** | ||
* Get the change stream, resuming from a previous token if exists. | ||
*/ | ||
const getChangeStream = async () => { | ||
@@ -425,20 +342,2 @@ const omitPipeline = omit ? generatePipelineFromOmit(omit) : [] | ||
const runHealthCheck = async (eventReceivedAt: number) => { | ||
debug('Checking health - change stream') | ||
const lastSyncedAt = await getLastSyncedAt(keys.lastChangeProcessedAtKey) | ||
debug('Event received at %d', eventReceivedAt) | ||
debug('Last change processed at %d', lastSyncedAt) | ||
// Change stream event not synced | ||
if (!lastSyncedAt || lastSyncedAt < eventReceivedAt) { | ||
debug('Health check failed - change stream') | ||
emit('healthCheckFail', { | ||
failureType: 'changeStream', | ||
eventReceivedAt, | ||
lastSyncedAt, | ||
}) | ||
} | ||
} | ||
const healthChecker = delayed(runHealthCheck, healthCheck.maxSyncDelay) | ||
/** Start processing change stream */ | ||
@@ -465,6 +364,2 @@ const start = async () => { | ||
while (await nextChecker.hasNext()) { | ||
// Schedule health check | ||
if (healthCheck.enabled) { | ||
healthChecker(new Date().getTime()) | ||
} | ||
let event = await changeStream.next() | ||
@@ -482,3 +377,3 @@ debug('Change stream event %O', event) | ||
await processRecord(event) | ||
// Persist state | ||
await redis.mset( | ||
@@ -493,3 +388,6 @@ keys.changeStreamTokenKey, | ||
if (nextChecker.errorExists()) { | ||
emit('cursorError', nextChecker.getLastError()) | ||
emit('cursorError', { | ||
name: 'processChangeStream', | ||
error: nextChecker.getLastError(), | ||
}) | ||
} | ||
@@ -513,4 +411,2 @@ deferred.done() | ||
state.change('stopping') | ||
// Cancel health check | ||
healthChecker.cancel() | ||
// Close the change stream | ||
@@ -558,3 +454,3 @@ await changeStream?.close() | ||
const state = fsm(simpleStateTransistions, 'stopped', { | ||
name: 'Schema change', | ||
name: 'detectSchemaChange', | ||
onStateChange: emitStateChange, | ||
@@ -561,0 +457,0 @@ }) |
@@ -30,10 +30,7 @@ import { | ||
deserialize: (x: string) => T | ||
/** Sort order: asc or desc. Defaults to asc */ | ||
order?: 'asc' | 'desc' | ||
} | ||
export interface ScanOptions<T = any> { | ||
healthCheck?: { | ||
enabled: boolean | ||
/** How often to run the health check. */ | ||
interval?: number | ||
} | ||
/** Defaults to _id */ | ||
@@ -46,7 +43,2 @@ sortField?: SortField<T> | ||
export interface ChangeStreamOptions { | ||
healthCheck?: { | ||
enabled: boolean | ||
/** The max allowed time for a change stream event to be processed. */ | ||
maxSyncDelay?: number | ||
} | ||
pipeline?: Document[] | ||
@@ -66,3 +58,2 @@ } | ||
| 'cursorError' | ||
| 'healthCheckFail' | ||
| 'resync' | ||
@@ -73,17 +64,2 @@ | 'schemaChange' | ||
interface InitialScanFailEvent { | ||
type: 'healthCheckFail' | ||
failureType: 'initialScan' | ||
lastSyncedAt: number | ||
} | ||
interface ChangeStreamFailEvent { | ||
type: 'healthCheckFail' | ||
failureType: 'changeStream' | ||
lastRecordUpdatedAt: number | ||
lastSyncedAt: number | ||
} | ||
export type HealthCheckFailEvent = InitialScanFailEvent | ChangeStreamFailEvent | ||
export interface ResyncEvent { | ||
@@ -107,3 +83,2 @@ type: 'resync' | ||
type: 'initialScanComplete' | ||
lastId: string | ||
} | ||
@@ -113,3 +88,4 @@ | ||
type: 'cursorError' | ||
error: unknown | ||
name: 'runInitialScan' | 'processChangeStream' | ||
error: Error | ||
} | ||
@@ -116,0 +92,0 @@ |
@@ -85,3 +85,3 @@ import { Collection } from 'mongodb' | ||
const errorExists = () => Boolean(lastError) | ||
const getLastError = () => ({ error: lastError }) | ||
const getLastError = () => lastError | ||
@@ -88,0 +88,0 @@ return { hasNext, errorExists, getLastError } |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
82159
2036
218