mongochangestream
Advanced tools
Comparing version 0.19.0 to 0.19.1
@@ -0,1 +1,7 @@ | ||
# 0.19.1 | ||
- Fix issue with `runInitialScan` where calling `stop` before the scan had finished | ||
would incorrectly set the scan completed key in Redis. Also, `stop` now awaits flushing | ||
the queue. | ||
# 0.19.0 | ||
@@ -2,0 +8,0 @@ |
@@ -50,2 +50,3 @@ "use strict"; | ||
let cursor; | ||
const abortController = new AbortController(); | ||
const start = async () => { | ||
@@ -72,2 +73,3 @@ debug('Starting initial scan'); | ||
} | ||
deferred.done(); | ||
}; | ||
@@ -97,16 +99,19 @@ // Lookup last id successfully processed | ||
await queue.enqueue(changeStreamDoc); | ||
deferred.done(); | ||
} | ||
// Flush the queue | ||
await queue.flush(); | ||
// Record scan complete | ||
await redis.set(scanCompletedKey, new Date().toString()); | ||
debug('Completed initial scan'); | ||
// Don't record scan complete if aborted | ||
if (!abortController.signal.aborted) { | ||
// Record scan complete | ||
await redis.set(scanCompletedKey, new Date().toString()); | ||
debug('Completed initial scan'); | ||
} | ||
}; | ||
const stop = async () => { | ||
debug('Stopping initial scan'); | ||
// Wait for event to be processed | ||
await deferred?.promise; | ||
// Close the cursor | ||
await cursor?.close(); | ||
abortController.abort(); | ||
// Wait for the queue to be flushed | ||
await deferred?.promise; | ||
}; | ||
@@ -160,3 +165,3 @@ return { start, stop }; | ||
debug('Stopping change stream'); | ||
await changeStream.close(); | ||
await changeStream?.close(); | ||
// Wait for event to be processed | ||
@@ -163,0 +168,0 @@ await deferred?.promise; |
{ | ||
"name": "mongochangestream", | ||
"version": "0.19.0", | ||
"version": "0.19.1", | ||
"description": "Sync MongoDB collections via change streams into any database.", | ||
@@ -5,0 +5,0 @@ "author": "GovSpend", |
@@ -45,3 +45,4 @@ # Mongo Change Stream | ||
const sync = initSync(redis, coll) | ||
sync.syncCollection(processRecord) | ||
const initialScan = await sync.runInitialScan(processRecords) | ||
initialScan.start() | ||
const changeStream = await sync.processChangeStream(processRecord) | ||
@@ -48,0 +49,0 @@ changeStream.start() |
@@ -75,2 +75,3 @@ import _ from 'lodash/fp.js' | ||
let cursor: ReturnType<typeof collection.find> | ||
const abortController = new AbortController() | ||
@@ -98,2 +99,3 @@ const start = async () => { | ||
} | ||
deferred.done() | ||
} | ||
@@ -126,9 +128,11 @@ // Lookup last id successfully processed | ||
await queue.enqueue(changeStreamDoc) | ||
deferred.done() | ||
} | ||
// Flush the queue | ||
await queue.flush() | ||
// Record scan complete | ||
await redis.set(scanCompletedKey, new Date().toString()) | ||
debug('Completed initial scan') | ||
// Don't record scan complete if aborted | ||
if (!abortController.signal.aborted) { | ||
// Record scan complete | ||
await redis.set(scanCompletedKey, new Date().toString()) | ||
debug('Completed initial scan') | ||
} | ||
} | ||
@@ -138,6 +142,7 @@ | ||
debug('Stopping initial scan') | ||
// Wait for event to be processed | ||
await deferred?.promise | ||
// Close the cursor | ||
await cursor?.close() | ||
abortController.abort() | ||
// Wait for the queue to be flushed | ||
await deferred?.promise | ||
} | ||
@@ -199,3 +204,3 @@ | ||
debug('Stopping change stream') | ||
await changeStream.close() | ||
await changeStream?.close() | ||
// Wait for event to be processed | ||
@@ -202,0 +207,0 @@ await deferred?.promise |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
35224
757
165