mongochangestream
Advanced tools
Comparing version 0.42.0 to 0.43.0
@@ -0,1 +1,7 @@ | ||
# 0.43.0 | ||
- Optionally pass an array of operation types (`insert`, `update`, ...) to `processChangeStream`. | ||
This allows you to skip operations you don't care about. For example, a `delete` operation | ||
due to a collection TTL index. | ||
# 0.42.0 | ||
@@ -2,0 +8,0 @@ |
@@ -253,2 +253,4 @@ "use strict"; | ||
const defaultOptions = { fullDocument: 'updateLookup' }; | ||
const operationTypes = options.operationTypes; | ||
debug('Operation types %o', operationTypes); | ||
/** | ||
@@ -309,2 +311,7 @@ * Get the change stream, resuming from a previous token if exists. | ||
debug('Change stream event %O', event); | ||
// Skip the event if the operation type is not one we care about | ||
if (operationTypes && !operationTypes.includes(event.operationType)) { | ||
debug('Skipping operation type: %s', event.operationType); | ||
continue; | ||
} | ||
// Omit nested fields that are not handled by $unset. | ||
@@ -311,0 +318,0 @@ // For example, if 'a' was omitted then 'a.b.c' should be omitted. |
@@ -27,2 +27,3 @@ import { ChangeStream, AggregationCursor, ChangeStreamDocument, ChangeStreamInsertDocument, Document, MongoServerError, MongoAPIError } from 'mongodb'; | ||
pipeline?: Document[]; | ||
operationTypes?: ChangeStreamDocument['operationType'][]; | ||
} | ||
@@ -29,0 +30,0 @@ export interface ChangeOptions { |
{ | ||
"name": "mongochangestream", | ||
"version": "0.42.0", | ||
"version": "0.43.0", | ||
"description": "Sync MongoDB collections via change streams into any database.", | ||
@@ -5,0 +5,0 @@ "author": "GovSpend", |
@@ -396,2 +396,33 @@ /** | ||
test('should omit operation types from change stream', async () => { | ||
const { coll, db } = await getConns() | ||
const sync = await getSync() | ||
await initState(sync, db, coll) | ||
const operations: string[] = [] | ||
const processRecords = async (docs: ChangeStreamDocument[]) => { | ||
for (const doc of docs) { | ||
await setTimeout(5) | ||
operations.push(doc.operationType) | ||
} | ||
} | ||
const changeStream = await sync.processChangeStream(processRecords, { | ||
operationTypes: ['insert'], | ||
// Short timeout since only event will be queued | ||
timeout: 500, | ||
}) | ||
// Start | ||
changeStream.start() | ||
await setTimeout(ms('1s')) | ||
// Update records | ||
await coll.updateMany({}, { $set: { name: 'unknown' } }) | ||
// Insert record | ||
await coll.insertOne(genUser()) | ||
// Wait for the change stream events to be processed | ||
await setTimeout(ms('2s')) | ||
assert.deepEqual(_.uniq(operations), ['insert']) | ||
// Stop | ||
await changeStream.stop() | ||
}) | ||
test('change stream should resume properly', async () => { | ||
@@ -398,0 +429,0 @@ const { coll, db } = await getConns() |
@@ -313,2 +313,4 @@ import _ from 'lodash/fp.js' | ||
const defaultOptions = { fullDocument: 'updateLookup' } | ||
const operationTypes = options.operationTypes | ||
debug('Operation types %o', operationTypes) | ||
@@ -377,2 +379,7 @@ /** | ||
debug('Change stream event %O', event) | ||
// Skip the event if the operation type is not one we care about | ||
if (operationTypes && !operationTypes.includes(event.operationType)) { | ||
debug('Skipping operation type: %s', event.operationType) | ||
continue | ||
} | ||
// Omit nested fields that are not handled by $unset. | ||
@@ -379,0 +386,0 @@ // For example, if 'a' was omitted then 'a.b.c' should be omitted. |
@@ -49,2 +49,3 @@ import { | ||
pipeline?: Document[] | ||
operationTypes?: ChangeStreamDocument['operationType'][] | ||
} | ||
@@ -51,0 +52,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
91735
2271