Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mongochangestream

Package Overview
Dependencies
Maintainers
1
Versions
59
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongochangestream - npm Package Compare versions

Comparing version 0.2.0 to 0.3.0

4

CHANGELOG.md

@@ -0,1 +1,5 @@

# 0.3.0
* Batch `runInitialScan`.
# 0.2.0

@@ -2,0 +6,0 @@

5

dist/mongoChangeStream.d.ts
import { Collection } from 'mongodb';
import { ProcessRecord } from './types.js';
import { ProcessRecord, ProcessRecords } from './types.js';
import type { default as Redis } from 'ioredis';
export declare const initSync: (redis: Redis) => {
runInitialScan: (collection: Collection, processRecord: ProcessRecord) => Promise<void>;
runInitialScan: (collection: Collection, processRecords: ProcessRecords, batchSize?: number) => Promise<void>;
processChangeStream: (collection: Collection, processRecord: ProcessRecord, pipeline?: Document[]) => Promise<void>;
syncCollection: (collection: Collection, processRecord: ProcessRecord, pipeline?: Document[]) => void;
reset: (collection: Collection) => Promise<void>;
};

@@ -10,2 +10,3 @@ "use strict";

const debug_1 = __importDefault(require("debug"));
const prom_utils_1 = require("prom-utils");
const debug = (0, debug_1.default)('connectors:mongodbChangeStream');

@@ -33,3 +34,3 @@ const keyPrefix = 'mongodbChangeStream';

*/
const runInitialScan = async (collection, processRecord) => {
const runInitialScan = async (collection, processRecords, batchSize = 100) => {
debug('Running initial scan');

@@ -48,2 +49,11 @@ // Redis keys

debug('Last scan _id %s', lastId);
const _processRecords = async (records) => {
// Process batch of records
await processRecords(records);
// Record last id of the batch
const lastId = records[records.length - 1].fullDocument._id.toString();
await redis.set(lastScanIdKey, lastId);
};
// Create queue
const queue = (0, prom_utils_1.batchQueue)(_processRecords, batchSize);
// Query collection

@@ -63,7 +73,6 @@ const cursor = collection

};
// Process record
await processRecord(changeStreamDoc);
// Record that this document was successfully processed
await redis.set(lastScanIdKey, doc._id.toString());
await queue.enqueue(changeStreamDoc);
}
// Flush the queue
await queue.flush();
// Record scan complete

@@ -99,11 +108,2 @@ await redis.set(scanCompletedKey, new Date().toString());

/**
* Sync a MongoDB collection.
*/
const syncCollection = (collection, processRecord, pipeline = []) => {
// Process the change stream
processChangeStream(collection, processRecord, pipeline);
// Run the initial scan
runInitialScan(collection, processRecord);
};
/**
* Reset Redis state.

@@ -118,3 +118,2 @@ */

processChangeStream,
syncCollection,
reset,

@@ -121,0 +120,0 @@ };

@@ -1,2 +0,3 @@

import { ChangeStreamDocument } from 'mongodb';
import { ChangeStreamDocument, ChangeStreamInsertDocument } from 'mongodb';
export declare type ProcessRecord = (doc: ChangeStreamDocument) => void | Promise<void>;
export declare type ProcessRecords = (doc: ChangeStreamInsertDocument[]) => void | Promise<void>;
{
"name": "mongochangestream",
"version": "0.2.0",
"version": "0.3.0",
"description": "Sync MongoDB collections via change streams into any database.",

@@ -44,4 +44,5 @@ "author": "GovSpend",

"dependencies": {
"debug": "^4.3.4"
"debug": "^4.3.4",
"prom-utils": "^0.1.0"
}
}

@@ -39,8 +39,7 @@ # Mongo Change Stream

Below are the available methods. You can call `runInitialScan` and `processChangeStream`
separately, but the most straightforward way is to call `syncCollection` which combines
both functions.
Below are the available methods.
The `processChangeStream` method will never complete, but `runInitialScan` will complete
once it has scanned all documents in the collection.
once it has scanned all documents in the collection. `runInitialScan` batches records for
efficiency.

@@ -52,7 +51,10 @@ The `reset` method will delete all relevant keys for a given collection in Redis.

export type ProcessRecord = (doc: ChangeStreamDocument) => void | Promise<void>
export type ProcessRecord = (
doc: ChangeStreamDocument | ChangeStreamDocument[]
) => void | Promise<void>
const runInitialScan = async (
collection: Collection,
processRecord: ProcessRecord
processRecord: ProcessRecord,
batchSize = 100
): Promise<void> => ...

@@ -66,8 +68,2 @@

const syncCollection = (
collection: Collection,
processRecord: ProcessRecord,
pipeline: Document[] = []
): void => ...
const reset = async (collection: Collection): Promise<void> => ...

@@ -74,0 +70,0 @@ ```

import { ChangeStreamInsertDocument, Collection, ObjectId } from 'mongodb'
import changeStreamToIterator from './changeStreamToIterator.js'
import { ProcessRecord } from './types.js'
import { ProcessRecord, ProcessRecords } from './types.js'
import _debug from 'debug'
import type { default as Redis } from 'ioredis'
import { batchQueue } from 'prom-utils'

@@ -36,3 +37,4 @@ const debug = _debug('connectors:mongodbChangeStream')

collection: Collection,
processRecord: ProcessRecord
processRecords: ProcessRecords,
batchSize = 100
) => {

@@ -52,2 +54,11 @@ debug('Running initial scan')

debug('Last scan _id %s', lastId)
const _processRecords = async (records: ChangeStreamInsertDocument[]) => {
// Process batch of records
await processRecords(records)
// Record last id of the batch
const lastId = records[records.length - 1].fullDocument._id.toString()
await redis.set(lastScanIdKey, lastId)
}
// Create queue
const queue = batchQueue(_processRecords, batchSize)
// Query collection

@@ -67,7 +78,6 @@ const cursor = collection

} as unknown as ChangeStreamInsertDocument
// Process record
await processRecord(changeStreamDoc)
// Record that this document was successfully processed
await redis.set(lastScanIdKey, doc._id.toString())
await queue.enqueue(changeStreamDoc)
}
// Flush the queue
await queue.flush()
// Record scan complete

@@ -108,15 +118,2 @@ await redis.set(scanCompletedKey, new Date().toString())

}
/**
* Sync a MongoDB collection.
*/
const syncCollection = (
collection: Collection,
processRecord: ProcessRecord,
pipeline: Document[] = []
) => {
// Process the change stream
processChangeStream(collection, processRecord, pipeline)
// Run the initial scan
runInitialScan(collection, processRecord)
}

@@ -134,5 +131,4 @@ /**

processChangeStream,
syncCollection,
reset,
}
}

@@ -1,3 +0,7 @@

import { ChangeStreamDocument } from 'mongodb'
import { ChangeStreamDocument, ChangeStreamInsertDocument } from 'mongodb'
export type ProcessRecord = (doc: ChangeStreamDocument) => void | Promise<void>
export type ProcessRecords = (
doc: ChangeStreamInsertDocument[]
) => void | Promise<void>
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc