Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mongochangestream

Package Overview
Dependencies
Maintainers
1
Versions
59
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongochangestream - npm Package Compare versions

Comparing version 0.18.0 to 0.19.0

7

CHANGELOG.md

@@ -0,1 +1,8 @@

# 0.19.0
- BREAKING CHANGE: Changed API for `runInitialScan`. You must explicitly call `start` now.
The change stream can be stopped by calling `stop`.
- It is now possible to cleanly stop `runInitialScan` and `processChangeStream`, allowing
for a smooth restarting behavior if a schema change is detected.
# 0.18.0

@@ -2,0 +9,0 @@

15

dist/changeStreamToIterator.d.ts

@@ -1,10 +0,17 @@

import { ChangeStreamOptions, Collection, Document } from 'mongodb';
declare const changeStreamToIterator: (collection: Collection, pipeline: Document[], options: ChangeStreamOptions) => {
import { ChangeStreamDocument, ChangeStream } from 'mongodb';
declare const toIterator: (changeStream: ChangeStream) => {
[Symbol.asyncIterator](): {
next(): Promise<{
value: import("mongodb").ChangeStreamDocument<Document>;
value: ChangeStreamDocument<import("bson").Document>;
done: boolean;
} | {
value: ChangeStreamDocument<import("bson").Document>;
done: boolean;
}>;
return(): Promise<{
value: ChangeStreamDocument<import("bson").Document>;
done: boolean;
}>;
};
};
export default changeStreamToIterator;
export default toIterator;
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const debug_1 = __importDefault(require("debug"));
const debug = (0, debug_1.default)('mongochangestream');
const changeStreamToIterator = (collection, pipeline, options) => {
const changeStream = collection.watch(pipeline, options);
debug('Started change stream - pipeline %O options %O', pipeline, options);
const prom_utils_1 = require("prom-utils");
const done = { value: {}, done: true };
const toIterator = (changeStream) => {
const deferred = (0, prom_utils_1.defer)();
changeStream.once('close', deferred.done);
return {

@@ -15,6 +12,11 @@ [Symbol.asyncIterator]() {

async next() {
return changeStream
.next()
.then((data) => ({ value: data, done: false }));
return Promise.race([
deferred.promise.then(() => done),
changeStream.next().then((data) => ({ value: data, done: false })),
]);
},
async return() {
await changeStream.close();
return done;
},
};

@@ -24,2 +26,2 @@ },

};
exports.default = changeStreamToIterator;
exports.default = toIterator;

@@ -14,3 +14,6 @@ /// <reference types="node" />

export declare const initSync: (redis: Redis, collection: Collection, options?: SyncOptions) => {
runInitialScan: (processRecords: ProcessRecords, options?: QueueOptions & ScanOptions) => Promise<void>;
runInitialScan: (processRecords: ProcessRecords, options?: QueueOptions & ScanOptions) => Promise<{
start: () => Promise<void>;
stop: () => Promise<void>;
}>;
processChangeStream: (processRecord: ProcessRecord, pipeline?: Document[]) => Promise<{

@@ -17,0 +20,0 @@ start: () => Promise<void>;

@@ -48,51 +48,65 @@ "use strict";

const runInitialScan = async (processRecords, options) => {
debug('Running initial scan');
const sortField = options?.sortField || exports.defaultSortField;
// Redis keys
const { scanCompletedKey, lastScanIdKey } = keys;
// Determine if initial scan has already completed
const scanCompleted = await redis.get(scanCompletedKey);
// Scan already completed so return
if (scanCompleted) {
debug(`Initial scan previously completed on %s`, scanCompleted);
return;
}
const _processRecords = async (records) => {
// Process batch of records
await processRecords(records);
const lastDocument = records[records.length - 1].fullDocument;
// Record last id of the batch
const lastId = fp_js_1.default.get(sortField.field, lastDocument);
if (lastId) {
await redis.set(lastScanIdKey, sortField.serialize(lastId));
let deferred;
let cursor;
const start = async () => {
debug('Starting initial scan');
const sortField = options?.sortField || exports.defaultSortField;
// Redis keys
const { scanCompletedKey, lastScanIdKey } = keys;
// Determine if initial scan has already completed
const scanCompleted = await redis.get(scanCompletedKey);
// Scan already completed so return
if (scanCompleted) {
debug(`Initial scan previously completed on %s`, scanCompleted);
return;
}
const _processRecords = async (records) => {
// Process batch of records
await processRecords(records);
const lastDocument = records[records.length - 1].fullDocument;
// Record last id of the batch
const lastId = fp_js_1.default.get(sortField.field, lastDocument);
if (lastId) {
await redis.set(lastScanIdKey, sortField.serialize(lastId));
}
};
// Lookup last id successfully processed
const lastId = await redis.get(lastScanIdKey);
debug('Last scan id %s', lastId);
// Create queue
const queue = (0, prom_utils_1.batchQueue)(_processRecords, options);
// Query collection
cursor = collection
// Skip ids already processed
.find(lastId
? { [sortField.field]: { $gt: sortField.deserialize(lastId) } }
: {}, omit ? { projection: (0, util_js_1.setDefaults)(omit, 0) } : {})
.sort({ [sortField.field]: 1 });
const ns = { db: collection.dbName, coll: collection.collectionName };
// Process documents
for await (const doc of cursor) {
debug('Initial scan doc %O', doc);
deferred = (0, prom_utils_1.defer)();
const changeStreamDoc = {
fullDocument: doc,
operationType: 'insert',
ns,
};
await queue.enqueue(changeStreamDoc);
deferred.done();
}
// Flush the queue
await queue.flush();
// Record scan complete
await redis.set(scanCompletedKey, new Date().toString());
debug('Completed initial scan');
};
// Lookup last id successfully processed
const lastId = await redis.get(lastScanIdKey);
debug('Last scan id %s', lastId);
// Create queue
const queue = (0, prom_utils_1.batchQueue)(_processRecords, options);
// Query collection
const cursor = collection
// Skip ids already processed
.find(lastId
? { [sortField.field]: { $gt: sortField.deserialize(lastId) } }
: {}, omit ? { projection: (0, util_js_1.setDefaults)(omit, 0) } : {})
.sort({ [sortField.field]: 1 });
const ns = { db: collection.dbName, coll: collection.collectionName };
// Process documents
for await (const doc of cursor) {
debug('Initial scan doc %O', doc);
const changeStreamDoc = {
fullDocument: doc,
operationType: 'insert',
ns,
};
await queue.enqueue(changeStreamDoc);
}
// Flush the queue
await queue.flush();
// Record scan complete
await redis.set(scanCompletedKey, new Date().toString());
debug('Completed initial scan');
const stop = async () => {
debug('Stopping initial scan');
// Wait for event to be processed
await deferred?.promise;
// Close the cursor
await cursor?.close();
};
return { start, stop };
};

@@ -109,21 +123,20 @@ const defaultOptions = { fullDocument: 'updateLookup' };

const processChangeStream = async (processRecord, pipeline = []) => {
const abortController = new AbortController();
let deferred;
// Redis keys
const { changeStreamTokenKey } = keys;
// Lookup change stream token
const token = await redis.get(changeStreamTokenKey);
const options = token
? // Resume token found, so set change stream resume point
{ ...defaultOptions, resumeAfter: JSON.parse(token) }
: defaultOptions;
// Get the change stream as an async iterator
const changeStream = (0, changeStreamToIterator_js_1.default)(collection, [...omitPipeline, ...pipeline], options);
let changeStream;
const start = async () => {
for await (let event of changeStream) {
debug('Starting change stream');
// Redis keys
const { changeStreamTokenKey } = keys;
// Lookup change stream token
const token = await redis.get(changeStreamTokenKey);
const options = token
? // Resume token found, so set change stream resume point
{ ...defaultOptions, resumeAfter: JSON.parse(token) }
: defaultOptions;
// Start the change stream
changeStream = collection.watch([...omitPipeline, ...pipeline], options);
const iterator = (0, changeStreamToIterator_js_1.default)(changeStream);
// Get the change stream as an async iterator
for await (let event of iterator) {
debug('Change stream event %O', event);
// Don't process event if stopping
if (abortController.signal.aborted) {
return;
}
deferred = (0, prom_utils_1.defer)();

@@ -144,6 +157,7 @@ // Get resume token

};
const stop = () => {
abortController.abort();
const stop = async () => {
debug('Stopping change stream');
await changeStream.close();
// Wait for event to be processed
return deferred?.promise;
await deferred?.promise;
};

@@ -206,5 +220,5 @@ return { start, stop };

const start = () => {
debug('Started polling for schema changes');
debug('Starting polling for schema changes');
checkForSchemaChange();
// Perform an inital check
checkForSchemaChange();
// Check for schema changes every interval

@@ -214,3 +228,3 @@ timer = setInterval(checkForSchemaChange, interval);

const stop = () => {
debug('Stopped polling for schema changes');
debug('Stopping polling for schema changes');
clearInterval(timer);

@@ -217,0 +231,0 @@ };

{
"name": "mongochangestream",
"version": "0.18.0",
"version": "0.19.0",
"description": "Sync MongoDB collections via change streams into any database.",

@@ -5,0 +5,0 @@ "author": "GovSpend",

@@ -1,13 +0,9 @@

import { ChangeStreamOptions, Collection, Document } from 'mongodb'
import _debug from 'debug'
import { defer } from 'prom-utils'
import { ChangeStreamDocument, ChangeStream } from 'mongodb'
const debug = _debug('mongochangestream')
const done = { value: {} as ChangeStreamDocument, done: true }
const changeStreamToIterator = (
collection: Collection,
pipeline: Document[],
options: ChangeStreamOptions
) => {
const changeStream = collection.watch(pipeline, options)
debug('Started change stream - pipeline %O options %O', pipeline, options)
const toIterator = (changeStream: ChangeStream) => {
const deferred = defer()
changeStream.once('close', deferred.done)
return {

@@ -17,6 +13,11 @@ [Symbol.asyncIterator]() {

async next() {
return changeStream
.next()
.then((data) => ({ value: data, done: false }))
return Promise.race([
deferred.promise.then(() => done),
changeStream.next().then((data) => ({ value: data, done: false })),
])
},
async return() {
await changeStream.close()
return done
},
}

@@ -26,2 +27,2 @@ },

}
export default changeStreamToIterator
export default toIterator

@@ -8,4 +8,5 @@ import _ from 'lodash/fp.js'

Db,
ChangeStream,
} from 'mongodb'
import changeStreamToIterator from './changeStreamToIterator.js'
import toIterator from './changeStreamToIterator.js'
import {

@@ -73,54 +74,71 @@ SyncOptions,

) => {
debug('Running initial scan')
const sortField = options?.sortField || defaultSortField
// Redis keys
const { scanCompletedKey, lastScanIdKey } = keys
// Determine if initial scan has already completed
const scanCompleted = await redis.get(scanCompletedKey)
// Scan already completed so return
if (scanCompleted) {
debug(`Initial scan previously completed on %s`, scanCompleted)
return
}
const _processRecords = async (records: ChangeStreamInsertDocument[]) => {
// Process batch of records
await processRecords(records)
const lastDocument = records[records.length - 1].fullDocument
// Record last id of the batch
const lastId = _.get(sortField.field, lastDocument)
if (lastId) {
await redis.set(lastScanIdKey, sortField.serialize(lastId))
let deferred: Deferred
let cursor: ReturnType<typeof collection.find>
const start = async () => {
debug('Starting initial scan')
const sortField = options?.sortField || defaultSortField
// Redis keys
const { scanCompletedKey, lastScanIdKey } = keys
// Determine if initial scan has already completed
const scanCompleted = await redis.get(scanCompletedKey)
// Scan already completed so return
if (scanCompleted) {
debug(`Initial scan previously completed on %s`, scanCompleted)
return
}
const _processRecords = async (records: ChangeStreamInsertDocument[]) => {
// Process batch of records
await processRecords(records)
const lastDocument = records[records.length - 1].fullDocument
// Record last id of the batch
const lastId = _.get(sortField.field, lastDocument)
if (lastId) {
await redis.set(lastScanIdKey, sortField.serialize(lastId))
}
}
// Lookup last id successfully processed
const lastId = await redis.get(lastScanIdKey)
debug('Last scan id %s', lastId)
// Create queue
const queue = batchQueue(_processRecords, options)
// Query collection
cursor = collection
// Skip ids already processed
.find(
lastId
? { [sortField.field]: { $gt: sortField.deserialize(lastId) } }
: {},
omit ? { projection: setDefaults(omit, 0) } : {}
)
.sort({ [sortField.field]: 1 })
const ns = { db: collection.dbName, coll: collection.collectionName }
// Process documents
for await (const doc of cursor) {
debug('Initial scan doc %O', doc)
deferred = defer()
const changeStreamDoc = {
fullDocument: doc,
operationType: 'insert',
ns,
} as unknown as ChangeStreamInsertDocument
await queue.enqueue(changeStreamDoc)
deferred.done()
}
// Flush the queue
await queue.flush()
// Record scan complete
await redis.set(scanCompletedKey, new Date().toString())
debug('Completed initial scan')
}
// Lookup last id successfully processed
const lastId = await redis.get(lastScanIdKey)
debug('Last scan id %s', lastId)
// Create queue
const queue = batchQueue(_processRecords, options)
// Query collection
const cursor = collection
// Skip ids already processed
.find(
lastId
? { [sortField.field]: { $gt: sortField.deserialize(lastId) } }
: {},
omit ? { projection: setDefaults(omit, 0) } : {}
)
.sort({ [sortField.field]: 1 })
const ns = { db: collection.dbName, coll: collection.collectionName }
// Process documents
for await (const doc of cursor) {
debug('Initial scan doc %O', doc)
const changeStreamDoc = {
fullDocument: doc,
operationType: 'insert',
ns,
} as unknown as ChangeStreamInsertDocument
await queue.enqueue(changeStreamDoc)
const stop = async () => {
debug('Stopping initial scan')
// Wait for event to be processed
await deferred?.promise
// Close the cursor
await cursor?.close()
}
// Flush the queue
await queue.flush()
// Record scan complete
await redis.set(scanCompletedKey, new Date().toString())
debug('Completed initial scan')
return { start, stop }
}

@@ -142,25 +160,21 @@

) => {
const abortController = new AbortController()
let deferred: Deferred
// Redis keys
const { changeStreamTokenKey } = keys
// Lookup change stream token
const token = await redis.get(changeStreamTokenKey)
const options = token
? // Resume token found, so set change stream resume point
{ ...defaultOptions, resumeAfter: JSON.parse(token) }
: defaultOptions
// Get the change stream as an async iterator
const changeStream = changeStreamToIterator(
collection,
[...omitPipeline, ...pipeline],
options
)
let changeStream: ChangeStream
const start = async () => {
for await (let event of changeStream) {
debug('Starting change stream')
// Redis keys
const { changeStreamTokenKey } = keys
// Lookup change stream token
const token = await redis.get(changeStreamTokenKey)
const options = token
? // Resume token found, so set change stream resume point
{ ...defaultOptions, resumeAfter: JSON.parse(token) }
: defaultOptions
// Start the change stream
changeStream = collection.watch([...omitPipeline, ...pipeline], options)
const iterator = toIterator(changeStream)
// Get the change stream as an async iterator
for await (let event of iterator) {
debug('Change stream event %O', event)
// Don't process event if stopping
if (abortController.signal.aborted) {
return
}
deferred = defer()

@@ -181,7 +195,10 @@ // Get resume token

}
const stop = () => {
abortController.abort()
const stop = async () => {
debug('Stopping change stream')
await changeStream.close()
// Wait for event to be processed
return deferred?.promise
await deferred?.promise
}
return { start, stop }

@@ -248,5 +265,5 @@ }

const start = () => {
debug('Started polling for schema changes')
debug('Starting polling for schema changes')
checkForSchemaChange()
// Perform an inital check
checkForSchemaChange()
// Check for schema changes every interval

@@ -256,3 +273,3 @@ timer = setInterval(checkForSchemaChange, interval)

const stop = () => {
debug('Stopped polling for schema changes')
debug('Stopping polling for schema changes')
clearInterval(timer)

@@ -259,0 +276,0 @@ }

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc