Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mongochangestream

Package Overview
Dependencies
Maintainers
1
Versions
59
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongochangestream - npm Package Compare versions

Comparing version 0.46.0 to 0.47.0

5

CHANGELOG.md

@@ -0,1 +1,6 @@

# 0.47.0
- Bumped peer dependencies for `mongodb`.
- Reworked `safelyCheckNext`.
# 0.46.0

@@ -2,0 +7,0 @@

54

dist/mongoChangeStream.js

@@ -182,33 +182,33 @@ "use strict";

const nextChecker = (0, util_js_1.safelyCheckNext)(cursor);
let doc;
// Process documents
while (await nextChecker.hasNext()) {
const doc = await cursor.next();
while ((doc = await nextChecker.getNext())) {
debug('Initial scan doc %O', doc);
// Doc can be null if cursor is closed
if (doc) {
const changeStreamDoc = {
fullDocument: doc,
operationType: 'insert',
ns,
};
await queue.enqueue(changeStreamDoc);
}
const changeStreamDoc = {
fullDocument: doc,
operationType: 'insert',
ns,
};
await queue.enqueue(changeStreamDoc);
}
// Flush the queue
await queue.flush();
// An error occurred getting next and we are not stopping
if (nextChecker.errorExists() && !state.is('stopping')) {
emit('cursorError', {
name: 'runInitialScan',
error: nextChecker.getLastError(),
});
// We are not stopping
if (!state.is('stopping')) {
// An error occurred getting next
if (nextChecker.errorExists()) {
emit('cursorError', {
name: 'runInitialScan',
error: nextChecker.getLastError(),
});
}
// Exited cleanly from the loop so we're done
else {
debug('Completed initial scan');
// Record scan complete
await redis.set(keys.scanCompletedKey, new Date().toString());
// Emit event
emit('initialScanComplete', {});
}
}
// Exited cleanly from the loop so we're done
if (!nextChecker.errorExists()) {
debug('Completed initial scan');
// Record scan complete
await redis.set(keys.scanCompletedKey, new Date().toString());
// Emit event
emit('initialScanComplete', {});
}
// Resolve deferred

@@ -308,5 +308,5 @@ deferred.done();

const nextChecker = (0, util_js_1.safelyCheckNext)(changeStream);
let event;
// Consume change stream
while (await nextChecker.hasNext()) {
let event = await changeStream.next();
while ((event = await nextChecker.getNext())) {
debug('Change stream event %O', event);

@@ -313,0 +313,0 @@ // Skip the event if the operation type is not one we care about

@@ -16,7 +16,7 @@ import _ from 'lodash/fp.js';

/**
* Check if the cursor has next without throwing an exception.
* Get next record without throwing an exception.
* Get the last error safely via `getLastError`.
*/
export declare const safelyCheckNext: (cursor: Cursor) => {
hasNext: () => Promise<boolean>;
getNext: () => Promise<any>;
errorExists: () => boolean;

@@ -23,0 +23,0 @@ getLastError: () => unknown;

@@ -109,3 +109,3 @@ "use strict";

/**
* Check if the cursor has next without throwing an exception.
* Get next record without throwing an exception.
* Get the last error safely via `getLastError`.

@@ -115,12 +115,6 @@ */

let lastError;
const hasNext = async () => {
const getNext = async () => {
debug('safelyCheckNext called');
try {
// Prevents hasNext from hanging when the cursor is already closed
if (cursor.closed) {
debug('safelyCheckNext cursor closed');
lastError = new Error('cursor closed');
return false;
}
return await cursor.hasNext();
return await cursor.tryNext();
}

@@ -130,3 +124,3 @@ catch (e) {

lastError = e;
return false;
return null;
}

@@ -136,3 +130,3 @@ };

const getLastError = () => lastError;
return { hasNext, errorExists, getLastError };
return { getNext, errorExists, getLastError };
};

@@ -139,0 +133,0 @@ exports.safelyCheckNext = safelyCheckNext;

{
"name": "mongochangestream",
"version": "0.46.0",
"version": "0.47.0",
"description": "Sync MongoDB collections via change streams into any database.",

@@ -62,3 +62,3 @@ "author": "GovSpend",

"ioredis": ">= 5.4.1",
"mongodb": ">= 6.6.1"
"mongodb": ">= 6.8.0"
},

@@ -65,0 +65,0 @@ "prettier": {

@@ -330,3 +330,3 @@ /**

const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(15)
await setTimeout(50)
processed.push(...docs)

@@ -347,3 +347,3 @@ }

// Allow for some records to be processed
await setTimeout(500)
await setTimeout(200)
// Stop the initial scan

@@ -367,4 +367,8 @@ await initialScan.stop()

const sync = initSync(redis, coll)
let cursorErrorEmitted = false
sync.emitter.on('stateChange', console.log)
sync.emitter.on('cursorError', console.log)
sync.emitter.on('cursorError', (e: unknown) => {
cursorErrorEmitted = true
console.log(e)
})
await initState(sync, db, coll)

@@ -390,2 +394,3 @@

assert.equal(completedAt, null)
assert.ok(cursorErrorEmitted)
// Stop

@@ -690,29 +695,2 @@ await initialScan.stop()

test('should emit cursorError if change stream is closed', async () => {
// Get a new connection since we're closing the connection in the test
const { redis, coll, db, client } = await getConns({})
const sync = initSync(redis, coll)
let error: any
sync.emitter.on('cursorError', (event: CursorErrorEvent) => {
console.log(event)
error = event.error
})
await initState(sync, db, coll)
const processRecords = async () => {
await setTimeout(500)
}
const changeStream = await sync.processChangeStream(processRecords)
// Start
changeStream.start()
await setTimeout(ms('1s'))
// Update records
await coll.updateMany({}, { $set: { createdAt: new Date('2022-01-01') } })
// Close the connection.
await client.close()
await setTimeout(ms('8s'))
assert.ok(error?.message)
})
test('Should resync when resync flag is set', async () => {

@@ -719,0 +697,0 @@ const { coll, db, redis } = await getConns()

@@ -11,2 +11,3 @@ import _debug from 'debug'

type Db,
type Document,
ObjectId,

@@ -237,33 +238,33 @@ } from 'mongodb'

const nextChecker = safelyCheckNext(cursor)
let doc: Document | null
// Process documents
while (await nextChecker.hasNext()) {
const doc = await cursor.next()
while ((doc = await nextChecker.getNext())) {
debug('Initial scan doc %O', doc)
// Doc can be null if cursor is closed
if (doc) {
const changeStreamDoc = {
fullDocument: doc,
operationType: 'insert',
ns,
} as unknown as ChangeStreamInsertDocument
await queue.enqueue(changeStreamDoc)
}
const changeStreamDoc = {
fullDocument: doc,
operationType: 'insert',
ns,
} as unknown as ChangeStreamInsertDocument
await queue.enqueue(changeStreamDoc)
}
// Flush the queue
await queue.flush()
// An error occurred getting next and we are not stopping
if (nextChecker.errorExists() && !state.is('stopping')) {
emit('cursorError', {
name: 'runInitialScan',
error: nextChecker.getLastError(),
})
// We are not stopping
if (!state.is('stopping')) {
// An error occurred getting next
if (nextChecker.errorExists()) {
emit('cursorError', {
name: 'runInitialScan',
error: nextChecker.getLastError(),
})
}
// Exited cleanly from the loop so we're done
else {
debug('Completed initial scan')
// Record scan complete
await redis.set(keys.scanCompletedKey, new Date().toString())
// Emit event
emit('initialScanComplete', {})
}
}
// Exited cleanly from the loop so we're done
if (!nextChecker.errorExists()) {
debug('Completed initial scan')
// Record scan complete
await redis.set(keys.scanCompletedKey, new Date().toString())
// Emit event
emit('initialScanComplete', {})
}
// Resolve deferred

@@ -378,5 +379,5 @@ deferred.done()

const nextChecker = safelyCheckNext(changeStream)
let event: ChangeStreamDocument | null
// Consume change stream
while (await nextChecker.hasNext()) {
let event = await changeStream.next()
while ((event = await nextChecker.getNext())) {
debug('Change stream event %O', event)

@@ -391,3 +392,3 @@ // Skip the event if the operation type is not one we care about

if (event.operationType === 'update' && omit) {
event = omitFieldForUpdate(omit)(event)
event = omitFieldForUpdate(omit)(event) as ChangeStreamDocument
}

@@ -394,0 +395,0 @@ await queue.enqueue(event)

@@ -115,3 +115,3 @@ import _debug from 'debug'

/**
* Check if the cursor has next without throwing an exception.
* Get next record without throwing an exception.
* Get the last error safely via `getLastError`.

@@ -122,16 +122,10 @@ */

const hasNext = async () => {
const getNext = async () => {
debug('safelyCheckNext called')
try {
// Prevents hasNext from hanging when the cursor is already closed
if (cursor.closed) {
debug('safelyCheckNext cursor closed')
lastError = new Error('cursor closed')
return false
}
return await cursor.hasNext()
return await cursor.tryNext()
} catch (e) {
debug('safelyCheckNext error: %o', e)
lastError = e
return false
return null
}

@@ -143,3 +137,3 @@ }

return { hasNext, errorExists, getLastError }
return { getNext, errorExists, getLastError }
}

@@ -146,0 +140,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc