mongochangestream
Advanced tools
Comparing version 0.46.0 to 0.47.0
@@ -0,1 +1,6 @@ | ||
# 0.47.0 | ||
- Bumped peer dependencies for `mongodb`. | ||
- Reworked `safelyCheckNext`. | ||
# 0.46.0 | ||
@@ -2,0 +7,0 @@ |
@@ -182,33 +182,33 @@ "use strict"; | ||
const nextChecker = (0, util_js_1.safelyCheckNext)(cursor); | ||
let doc; | ||
// Process documents | ||
while (await nextChecker.hasNext()) { | ||
const doc = await cursor.next(); | ||
while ((doc = await nextChecker.getNext())) { | ||
debug('Initial scan doc %O', doc); | ||
// Doc can be null if cursor is closed | ||
if (doc) { | ||
const changeStreamDoc = { | ||
fullDocument: doc, | ||
operationType: 'insert', | ||
ns, | ||
}; | ||
await queue.enqueue(changeStreamDoc); | ||
} | ||
const changeStreamDoc = { | ||
fullDocument: doc, | ||
operationType: 'insert', | ||
ns, | ||
}; | ||
await queue.enqueue(changeStreamDoc); | ||
} | ||
// Flush the queue | ||
await queue.flush(); | ||
// An error occurred getting next and we are not stopping | ||
if (nextChecker.errorExists() && !state.is('stopping')) { | ||
emit('cursorError', { | ||
name: 'runInitialScan', | ||
error: nextChecker.getLastError(), | ||
}); | ||
// We are not stopping | ||
if (!state.is('stopping')) { | ||
// An error occurred getting next | ||
if (nextChecker.errorExists()) { | ||
emit('cursorError', { | ||
name: 'runInitialScan', | ||
error: nextChecker.getLastError(), | ||
}); | ||
} | ||
// Exited cleanly from the loop so we're done | ||
else { | ||
debug('Completed initial scan'); | ||
// Record scan complete | ||
await redis.set(keys.scanCompletedKey, new Date().toString()); | ||
// Emit event | ||
emit('initialScanComplete', {}); | ||
} | ||
} | ||
// Exited cleanly from the loop so we're done | ||
if (!nextChecker.errorExists()) { | ||
debug('Completed initial scan'); | ||
// Record scan complete | ||
await redis.set(keys.scanCompletedKey, new Date().toString()); | ||
// Emit event | ||
emit('initialScanComplete', {}); | ||
} | ||
// Resolve deferred | ||
@@ -308,5 +308,5 @@ deferred.done(); | ||
const nextChecker = (0, util_js_1.safelyCheckNext)(changeStream); | ||
let event; | ||
// Consume change stream | ||
while (await nextChecker.hasNext()) { | ||
let event = await changeStream.next(); | ||
while ((event = await nextChecker.getNext())) { | ||
debug('Change stream event %O', event); | ||
@@ -313,0 +313,0 @@ // Skip the event if the operation type is not one we care about |
@@ -16,7 +16,7 @@ import _ from 'lodash/fp.js'; | ||
/** | ||
* Check if the cursor has next without throwing an exception. | ||
* Get next record without throwing an exception. | ||
* Get the last error safely via `getLastError`. | ||
*/ | ||
export declare const safelyCheckNext: (cursor: Cursor) => { | ||
hasNext: () => Promise<boolean>; | ||
getNext: () => Promise<any>; | ||
errorExists: () => boolean; | ||
@@ -23,0 +23,0 @@ getLastError: () => unknown; |
@@ -109,3 +109,3 @@ "use strict"; | ||
/** | ||
* Check if the cursor has next without throwing an exception. | ||
* Get next record without throwing an exception. | ||
* Get the last error safely via `getLastError`. | ||
@@ -115,12 +115,6 @@ */ | ||
let lastError; | ||
const hasNext = async () => { | ||
const getNext = async () => { | ||
debug('safelyCheckNext called'); | ||
try { | ||
// Prevents hasNext from hanging when the cursor is already closed | ||
if (cursor.closed) { | ||
debug('safelyCheckNext cursor closed'); | ||
lastError = new Error('cursor closed'); | ||
return false; | ||
} | ||
return await cursor.hasNext(); | ||
return await cursor.tryNext(); | ||
} | ||
@@ -130,3 +124,3 @@ catch (e) { | ||
lastError = e; | ||
return false; | ||
return null; | ||
} | ||
@@ -136,3 +130,3 @@ }; | ||
const getLastError = () => lastError; | ||
return { hasNext, errorExists, getLastError }; | ||
return { getNext, errorExists, getLastError }; | ||
}; | ||
@@ -139,0 +133,0 @@ exports.safelyCheckNext = safelyCheckNext; |
{ | ||
"name": "mongochangestream", | ||
"version": "0.46.0", | ||
"version": "0.47.0", | ||
"description": "Sync MongoDB collections via change streams into any database.", | ||
@@ -62,3 +62,3 @@ "author": "GovSpend", | ||
"ioredis": ">= 5.4.1", | ||
"mongodb": ">= 6.6.1" | ||
"mongodb": ">= 6.8.0" | ||
}, | ||
@@ -65,0 +65,0 @@ "prettier": { |
@@ -330,3 +330,3 @@ /** | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
await setTimeout(15) | ||
await setTimeout(50) | ||
processed.push(...docs) | ||
@@ -347,3 +347,3 @@ } | ||
// Allow for some records to be processed | ||
await setTimeout(500) | ||
await setTimeout(200) | ||
// Stop the initial scan | ||
@@ -367,4 +367,8 @@ await initialScan.stop() | ||
const sync = initSync(redis, coll) | ||
let cursorErrorEmitted = false | ||
sync.emitter.on('stateChange', console.log) | ||
sync.emitter.on('cursorError', console.log) | ||
sync.emitter.on('cursorError', (e: unknown) => { | ||
cursorErrorEmitted = true | ||
console.log(e) | ||
}) | ||
await initState(sync, db, coll) | ||
@@ -390,2 +394,3 @@ | ||
assert.equal(completedAt, null) | ||
assert.ok(cursorErrorEmitted) | ||
// Stop | ||
@@ -690,29 +695,2 @@ await initialScan.stop() | ||
test('should emit cursorError if change stream is closed', async () => { | ||
// Get a new connection since we're closing the connection in the test | ||
const { redis, coll, db, client } = await getConns({}) | ||
const sync = initSync(redis, coll) | ||
let error: any | ||
sync.emitter.on('cursorError', (event: CursorErrorEvent) => { | ||
console.log(event) | ||
error = event.error | ||
}) | ||
await initState(sync, db, coll) | ||
const processRecords = async () => { | ||
await setTimeout(500) | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords) | ||
// Start | ||
changeStream.start() | ||
await setTimeout(ms('1s')) | ||
// Update records | ||
await coll.updateMany({}, { $set: { createdAt: new Date('2022-01-01') } }) | ||
// Close the connection. | ||
await client.close() | ||
await setTimeout(ms('8s')) | ||
assert.ok(error?.message) | ||
}) | ||
test('Should resync when resync flag is set', async () => { | ||
@@ -719,0 +697,0 @@ const { coll, db, redis } = await getConns() |
@@ -11,2 +11,3 @@ import _debug from 'debug' | ||
type Db, | ||
type Document, | ||
ObjectId, | ||
@@ -237,33 +238,33 @@ } from 'mongodb' | ||
const nextChecker = safelyCheckNext(cursor) | ||
let doc: Document | null | ||
// Process documents | ||
while (await nextChecker.hasNext()) { | ||
const doc = await cursor.next() | ||
while ((doc = await nextChecker.getNext())) { | ||
debug('Initial scan doc %O', doc) | ||
// Doc can be null if cursor is closed | ||
if (doc) { | ||
const changeStreamDoc = { | ||
fullDocument: doc, | ||
operationType: 'insert', | ||
ns, | ||
} as unknown as ChangeStreamInsertDocument | ||
await queue.enqueue(changeStreamDoc) | ||
} | ||
const changeStreamDoc = { | ||
fullDocument: doc, | ||
operationType: 'insert', | ||
ns, | ||
} as unknown as ChangeStreamInsertDocument | ||
await queue.enqueue(changeStreamDoc) | ||
} | ||
// Flush the queue | ||
await queue.flush() | ||
// An error occurred getting next and we are not stopping | ||
if (nextChecker.errorExists() && !state.is('stopping')) { | ||
emit('cursorError', { | ||
name: 'runInitialScan', | ||
error: nextChecker.getLastError(), | ||
}) | ||
// We are not stopping | ||
if (!state.is('stopping')) { | ||
// An error occurred getting next | ||
if (nextChecker.errorExists()) { | ||
emit('cursorError', { | ||
name: 'runInitialScan', | ||
error: nextChecker.getLastError(), | ||
}) | ||
} | ||
// Exited cleanly from the loop so we're done | ||
else { | ||
debug('Completed initial scan') | ||
// Record scan complete | ||
await redis.set(keys.scanCompletedKey, new Date().toString()) | ||
// Emit event | ||
emit('initialScanComplete', {}) | ||
} | ||
} | ||
// Exited cleanly from the loop so we're done | ||
if (!nextChecker.errorExists()) { | ||
debug('Completed initial scan') | ||
// Record scan complete | ||
await redis.set(keys.scanCompletedKey, new Date().toString()) | ||
// Emit event | ||
emit('initialScanComplete', {}) | ||
} | ||
// Resolve deferred | ||
@@ -378,5 +379,5 @@ deferred.done() | ||
const nextChecker = safelyCheckNext(changeStream) | ||
let event: ChangeStreamDocument | null | ||
// Consume change stream | ||
while (await nextChecker.hasNext()) { | ||
let event = await changeStream.next() | ||
while ((event = await nextChecker.getNext())) { | ||
debug('Change stream event %O', event) | ||
@@ -391,3 +392,3 @@ // Skip the event if the operation type is not one we care about | ||
if (event.operationType === 'update' && omit) { | ||
event = omitFieldForUpdate(omit)(event) | ||
event = omitFieldForUpdate(omit)(event) as ChangeStreamDocument | ||
} | ||
@@ -394,0 +395,0 @@ await queue.enqueue(event) |
@@ -115,3 +115,3 @@ import _debug from 'debug' | ||
/** | ||
* Check if the cursor has next without throwing an exception. | ||
* Get next record without throwing an exception. | ||
* Get the last error safely via `getLastError`. | ||
@@ -122,16 +122,10 @@ */ | ||
const hasNext = async () => { | ||
const getNext = async () => { | ||
debug('safelyCheckNext called') | ||
try { | ||
// Prevents hasNext from hanging when the cursor is already closed | ||
if (cursor.closed) { | ||
debug('safelyCheckNext cursor closed') | ||
lastError = new Error('cursor closed') | ||
return false | ||
} | ||
return await cursor.hasNext() | ||
return await cursor.tryNext() | ||
} catch (e) { | ||
debug('safelyCheckNext error: %o', e) | ||
lastError = e | ||
return false | ||
return null | ||
} | ||
@@ -143,3 +137,3 @@ } | ||
return { hasNext, errorExists, getLastError } | ||
return { getNext, errorExists, getLastError } | ||
} | ||
@@ -146,0 +140,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
102815
2578