Socket
Socket
Sign inDemoInstall

mongochangestream

Package Overview
Dependencies
Maintainers
1
Versions
56
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongochangestream - npm Package Compare versions

Comparing version 0.19.0 to 0.19.1

6

CHANGELOG.md

@@ -0,1 +1,7 @@

# 0.19.1
- Fix issue with `runInitialScan` where calling `stop` before the scan had finished
would incorrectly set the scan completed key in Redis. Also, `stop` now awaits flushing
the queue.
# 0.19.0

@@ -2,0 +8,0 @@

19

dist/mongoChangeStream.js

@@ -50,2 +50,3 @@ "use strict";

let cursor;
const abortController = new AbortController();
const start = async () => {

@@ -72,2 +73,3 @@ debug('Starting initial scan');

}
deferred.done();
};

@@ -97,16 +99,19 @@ // Lookup last id successfully processed

await queue.enqueue(changeStreamDoc);
deferred.done();
}
// Flush the queue
await queue.flush();
// Record scan complete
await redis.set(scanCompletedKey, new Date().toString());
debug('Completed initial scan');
// Don't record scan complete if aborted
if (!abortController.signal.aborted) {
// Record scan complete
await redis.set(scanCompletedKey, new Date().toString());
debug('Completed initial scan');
}
};
const stop = async () => {
debug('Stopping initial scan');
// Wait for event to be processed
await deferred?.promise;
// Close the cursor
await cursor?.close();
abortController.abort();
// Wait for the queue to be flushed
await deferred?.promise;
};

@@ -160,3 +165,3 @@ return { start, stop };

debug('Stopping change stream');
await changeStream.close();
await changeStream?.close();
// Wait for event to be processed

@@ -163,0 +168,0 @@ await deferred?.promise;

{
"name": "mongochangestream",
"version": "0.19.0",
"version": "0.19.1",
"description": "Sync MongoDB collections via change streams into any database.",

@@ -5,0 +5,0 @@ "author": "GovSpend",

@@ -45,3 +45,4 @@ # Mongo Change Stream

const sync = initSync(redis, coll)
sync.syncCollection(processRecord)
const initialScan = await sync.runInitialScan(processRecords)
initialScan.start()
const changeStream = await sync.processChangeStream(processRecord)

@@ -48,0 +49,0 @@ changeStream.start()

@@ -75,2 +75,3 @@ import _ from 'lodash/fp.js'

let cursor: ReturnType<typeof collection.find>
const abortController = new AbortController()

@@ -98,2 +99,3 @@ const start = async () => {

}
deferred.done()
}

@@ -126,9 +128,11 @@ // Lookup last id successfully processed

await queue.enqueue(changeStreamDoc)
deferred.done()
}
// Flush the queue
await queue.flush()
// Record scan complete
await redis.set(scanCompletedKey, new Date().toString())
debug('Completed initial scan')
// Don't record scan complete if aborted
if (!abortController.signal.aborted) {
// Record scan complete
await redis.set(scanCompletedKey, new Date().toString())
debug('Completed initial scan')
}
}

@@ -138,6 +142,7 @@

debug('Stopping initial scan')
// Wait for event to be processed
await deferred?.promise
// Close the cursor
await cursor?.close()
abortController.abort()
// Wait for the queue to be flushed
await deferred?.promise
}

@@ -199,3 +204,3 @@

debug('Stopping change stream')
await changeStream.close()
await changeStream?.close()
// Wait for event to be processed

@@ -202,0 +207,0 @@ await deferred?.promise

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc