Socket
Socket
Sign inDemoInstall

mongochangestream

Package Overview
Dependencies
Maintainers
1
Versions
56
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongochangestream - npm Package Compare versions

Comparing version 0.42.0 to 0.43.0

6

CHANGELOG.md

@@ -0,1 +1,7 @@

# 0.43.0
- Optionally pass an array of operation types (`insert`, `update`, ...) to `processChangeStream`.
This allows you to skip operations you don't care about. For example, a `delete` operation
due to a collection TTL index.
# 0.42.0

@@ -2,0 +8,0 @@

@@ -253,2 +253,4 @@ "use strict";

const defaultOptions = { fullDocument: 'updateLookup' };
const operationTypes = options.operationTypes;
debug('Operation types %o', operationTypes);
/**

@@ -309,2 +311,7 @@ * Get the change stream, resuming from a previous token if exists.

debug('Change stream event %O', event);
// Skip the event if the operation type is not one we care about
if (operationTypes && !operationTypes.includes(event.operationType)) {
debug('Skipping operation type: %s', event.operationType);
continue;
}
// Omit nested fields that are not handled by $unset.

@@ -311,0 +318,0 @@ // For example, if 'a' was omitted then 'a.b.c' should be omitted.

1

dist/types.d.ts

@@ -27,2 +27,3 @@ import { ChangeStream, AggregationCursor, ChangeStreamDocument, ChangeStreamInsertDocument, Document, MongoServerError, MongoAPIError } from 'mongodb';

pipeline?: Document[];
operationTypes?: ChangeStreamDocument['operationType'][];
}

@@ -29,0 +30,0 @@ export interface ChangeOptions {

2

package.json
{
"name": "mongochangestream",
"version": "0.42.0",
"version": "0.43.0",
"description": "Sync MongoDB collections via change streams into any database.",

@@ -5,0 +5,0 @@ "author": "GovSpend",

@@ -396,2 +396,33 @@ /**

test('should omit operation types from change stream', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
const operations: string[] = []
const processRecords = async (docs: ChangeStreamDocument[]) => {
for (const doc of docs) {
await setTimeout(5)
operations.push(doc.operationType)
}
}
const changeStream = await sync.processChangeStream(processRecords, {
operationTypes: ['insert'],
// Short timeout since only event will be queued
timeout: 500,
})
// Start
changeStream.start()
await setTimeout(ms('1s'))
// Update records
await coll.updateMany({}, { $set: { name: 'unknown' } })
// Insert record
await coll.insertOne(genUser())
// Wait for the change stream events to be processed
await setTimeout(ms('2s'))
assert.deepEqual(_.uniq(operations), ['insert'])
// Stop
await changeStream.stop()
})
test('change stream should resume properly', async () => {

@@ -398,0 +429,0 @@ const { coll, db } = await getConns()

@@ -313,2 +313,4 @@ import _ from 'lodash/fp.js'

const defaultOptions = { fullDocument: 'updateLookup' }
const operationTypes = options.operationTypes
debug('Operation types %o', operationTypes)

@@ -377,2 +379,7 @@ /**

debug('Change stream event %O', event)
// Skip the event if the operation type is not one we care about
if (operationTypes && !operationTypes.includes(event.operationType)) {
debug('Skipping operation type: %s', event.operationType)
continue
}
// Omit nested fields that are not handled by $unset.

@@ -379,0 +386,0 @@ // For example, if 'a' was omitted then 'a.b.c' should be omitted.

@@ -49,2 +49,3 @@ import {

pipeline?: Document[]
operationTypes?: ChangeStreamDocument['operationType'][]
}

@@ -51,0 +52,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc