Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mongochangestream

Package Overview
Dependencies
Maintainers
1
Versions
59
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongochangestream - npm Package Compare versions

Comparing version 0.5.0 to 0.6.0

4

CHANGELOG.md

@@ -0,1 +1,5 @@

# 0.6.0
* `sortField` option for overriding the default sorting field of `_id`.
# 0.5.0

@@ -2,0 +6,0 @@

12

dist/mongoChangeStream.d.ts

@@ -1,9 +0,15 @@

import { Collection } from 'mongodb';
import { ProcessRecord, ProcessRecords } from './types.js';
import _ from 'lodash/fp.js';
import { Collection, ObjectId } from 'mongodb';
import { ProcessRecord, ProcessRecords, SyncOptions } from './types.js';
import type { default as Redis } from 'ioredis';
import { QueueOptions } from 'prom-utils';
export declare const defaultSortField: {
field: string;
serialize: _.LodashToString;
deserialize: (x: string) => ObjectId;
};
export declare const initSync: (redis: Redis) => {
runInitialScan: (collection: Collection, processRecords: ProcessRecords, options?: QueueOptions) => Promise<void>;
runInitialScan: (collection: Collection, processRecords: ProcessRecords, options?: QueueOptions & SyncOptions) => Promise<void>;
processChangeStream: (collection: Collection, processRecord: ProcessRecord, pipeline?: Document[]) => Promise<void>;
reset: (collection: Collection) => Promise<void>;
};

@@ -6,3 +6,4 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.initSync = void 0;
exports.initSync = exports.defaultSortField = void 0;
const fp_js_1 = __importDefault(require("lodash/fp.js"));
const mongodb_1 = require("mongodb");

@@ -30,2 +31,7 @@ const changeStreamToIterator_js_1 = __importDefault(require("./changeStreamToIterator.js"));

};
exports.defaultSortField = {
field: '_id',
serialize: fp_js_1.default.toString,
deserialize: (x) => new mongodb_1.ObjectId(x),
};
const initSync = (redis) => {

@@ -37,2 +43,3 @@ /**

debug('Running initial scan');
const sortField = options?.sortField || exports.defaultSortField;
// Redis keys

@@ -47,12 +54,15 @@ const { scanCompletedKey, lastScanIdKey } = getKeys(collection);

}
// Lookup last _id successfully processed
const lastId = await redis.get(lastScanIdKey);
debug('Last scan _id %s', lastId);
const _processRecords = async (records) => {
// Process batch of records
await processRecords(records);
const lastDocument = records[records.length - 1].fullDocument;
// Record last id of the batch
const lastId = records[records.length - 1].fullDocument._id.toString();
await redis.set(lastScanIdKey, lastId);
const lastId = fp_js_1.default.get(sortField.field, lastDocument);
if (lastId) {
await redis.set(lastScanIdKey, sortField.serialize(lastId));
}
};
// Lookup last id successfully processed
const lastId = await redis.get(lastScanIdKey);
debug('Last scan id %s', lastId);
// Create queue

@@ -63,4 +73,6 @@ const queue = (0, prom_utils_1.batchQueue)(_processRecords, options);

// Skip ids already processed
.find(lastId ? { _id: { $gt: new mongodb_1.ObjectId(lastId) } } : {})
.sort({ _id: 1 });
.find(lastId
? { [sortField.field]: { $gt: sortField.deserialize(lastId) } }
: {})
.sort({ [sortField.field]: 1 });
const ns = { db: collection.dbName, coll: collection.collectionName };

@@ -67,0 +79,0 @@ // Process documents

import { ChangeStreamDocument, ChangeStreamInsertDocument } from 'mongodb';
export declare type ProcessRecord = (doc: ChangeStreamDocument) => void | Promise<void>;
export declare type ProcessRecords = (doc: ChangeStreamInsertDocument[]) => void | Promise<void>;
export interface SyncOptions<T = any> {
sortField?: {
field: string;
serialize: (x: T) => string;
deserialize: (x: string) => T;
};
}
{
"name": "mongochangestream",
"version": "0.5.0",
"version": "0.6.0",
"description": "Sync MongoDB collections via change streams into any database.",

@@ -36,2 +36,3 @@ "author": "GovSpend",

"@types/debug": "^4.1.7",
"@types/lodash": "^4.14.182",
"@typescript-eslint/eslint-plugin": "^5.32.0",

@@ -46,4 +47,5 @@ "eslint": "^8.21.0",

"debug": "^4.3.4",
"lodash": "^4.17.21",
"prom-utils": "^0.2.0"
}
}

@@ -0,4 +1,5 @@

import _ from 'lodash/fp.js'
import { ChangeStreamInsertDocument, Collection, ObjectId } from 'mongodb'
import changeStreamToIterator from './changeStreamToIterator.js'
import { ProcessRecord, ProcessRecords } from './types.js'
import { ProcessRecord, ProcessRecords, SyncOptions } from './types.js'
import _debug from 'debug'

@@ -31,2 +32,8 @@ import type { default as Redis } from 'ioredis'

export const defaultSortField = {
field: '_id',
serialize: _.toString,
deserialize: (x: string) => new ObjectId(x),
}
export const initSync = (redis: Redis) => {

@@ -39,5 +46,6 @@ /**

processRecords: ProcessRecords,
options?: QueueOptions
options?: QueueOptions & SyncOptions
) => {
debug('Running initial scan')
const sortField = options?.sortField || defaultSortField
// Redis keys

@@ -52,12 +60,15 @@ const { scanCompletedKey, lastScanIdKey } = getKeys(collection)

}
// Lookup last _id successfully processed
const lastId = await redis.get(lastScanIdKey)
debug('Last scan _id %s', lastId)
const _processRecords = async (records: ChangeStreamInsertDocument[]) => {
// Process batch of records
await processRecords(records)
const lastDocument = records[records.length - 1].fullDocument
// Record last id of the batch
const lastId = records[records.length - 1].fullDocument._id.toString()
await redis.set(lastScanIdKey, lastId)
const lastId = _.get(sortField.field, lastDocument)
if (lastId) {
await redis.set(lastScanIdKey, sortField.serialize(lastId))
}
}
// Lookup last id successfully processed
const lastId = await redis.get(lastScanIdKey)
debug('Last scan id %s', lastId)
// Create queue

@@ -68,4 +79,8 @@ const queue = batchQueue(_processRecords, options)

// Skip ids already processed
.find(lastId ? { _id: { $gt: new ObjectId(lastId) } } : {})
.sort({ _id: 1 })
.find(
lastId
? { [sortField.field]: { $gt: sortField.deserialize(lastId) } }
: {}
)
.sort({ [sortField.field]: 1 })
const ns = { db: collection.dbName, coll: collection.collectionName }

@@ -72,0 +87,0 @@ // Process documents

@@ -8,1 +8,9 @@ import { ChangeStreamDocument, ChangeStreamInsertDocument } from 'mongodb'

) => void | Promise<void>
export interface SyncOptions<T = any> {
sortField?: {
field: string
serialize: (x: T) => string
deserialize: (x: string) => T
}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc