Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mongochangestream

Package Overview
Dependencies
Maintainers
1
Versions
59
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongochangestream - npm Package Compare versions

Comparing version 0.16.0 to 0.17.0

5

CHANGELOG.md

@@ -0,1 +1,6 @@

# 0.17.0
- BREAKING CHANGE: `initSync` now takes `collection`.
- NEW: `detectSchemaChange`.
# 0.16.0

@@ -2,0 +7,0 @@

34

dist/mongoChangeStream.d.ts

@@ -0,14 +1,8 @@

/// <reference types="node" />
import _ from 'lodash/fp.js';
import { Collection, Document, ObjectId } from 'mongodb';
import { Collection, Document, ObjectId, Db } from 'mongodb';
import { SyncOptions, ProcessRecord, ProcessRecords, ScanOptions } from './types.js';
import type { default as Redis } from 'ioredis';
import { QueueOptions } from 'prom-utils';
/**
* Get Redis keys used for the given collection.
*/
export declare const getKeys: (collection: Collection) => {
scanCompletedKey: string;
lastScanIdKey: string;
changeStreamTokenKey: string;
};
import events from 'node:events';
export declare const defaultSortField: {

@@ -19,10 +13,22 @@ field: string;

};
export declare const initSync: (redis: Redis, options?: SyncOptions) => {
runInitialScan: (collection: Collection, processRecords: ProcessRecords, options?: QueueOptions & ScanOptions) => Promise<void>;
processChangeStream: (collection: Collection, processRecord: ProcessRecord, pipeline?: Document[]) => Promise<{
export declare const initSync: (redis: Redis, collection: Collection, options?: SyncOptions) => {
runInitialScan: (processRecords: ProcessRecords, options?: QueueOptions & ScanOptions) => Promise<void>;
processChangeStream: (processRecord: ProcessRecord, pipeline?: Document[]) => Promise<{
start: () => Promise<void>;
stop: () => void;
}>;
reset: (collection: Collection) => Promise<void>;
clearCompletedOn: (collection: Collection) => Promise<void>;
reset: () => Promise<void>;
clearCompletedOn: () => Promise<void>;
getCollectionSchema: (db: Db) => Promise<object | undefined>;
detectSchemaChange: (db: Db, interval?: number) => Promise<{
start: () => void;
stop: () => void;
emitter: events;
}>;
keys: {
scanCompletedKey: string;
lastScanIdKey: string;
changeStreamTokenKey: string;
schemaKey: string;
};
};

@@ -6,3 +6,3 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.initSync = exports.defaultSortField = exports.getKeys = void 0;
exports.initSync = exports.defaultSortField = void 0;
const fp_js_1 = __importDefault(require("lodash/fp.js"));

@@ -14,2 +14,4 @@ const mongodb_1 = require("mongodb");

const util_js_1 = require("./util.js");
const node_events_1 = __importDefault(require("node:events"));
const ms_1 = __importDefault(require("ms"));
const debug = (0, debug_1.default)('mongochangestream');

@@ -26,2 +28,3 @@ const keyPrefix = 'mongoChangeStream';

const changeStreamTokenKey = `${keyPrefix}:${collectionKey}:changeStreamToken`;
const schemaKey = `${keyPrefix}:${collectionKey}:schema`;
return {

@@ -31,5 +34,5 @@ scanCompletedKey,

changeStreamTokenKey,
schemaKey,
};
};
exports.getKeys = getKeys;
exports.defaultSortField = {

@@ -40,3 +43,4 @@ field: '_id',

};
const initSync = (redis, options) => {
const initSync = (redis, collection, options) => {
const keys = getKeys(collection);
const omit = options?.omit;

@@ -48,7 +52,7 @@ const omitPipeline = omit ? (0, util_js_1.generatePipelineFromOmit)(omit) : [];

*/
const runInitialScan = async (collection, processRecords, options) => {
const runInitialScan = async (processRecords, options) => {
debug('Running initial scan');
const sortField = options?.sortField || exports.defaultSortField;
// Redis keys
const { scanCompletedKey, lastScanIdKey } = (0, exports.getKeys)(collection);
const { scanCompletedKey, lastScanIdKey } = keys;
// Determine if initial scan has already completed

@@ -109,6 +113,6 @@ const scanCompleted = await redis.get(scanCompletedKey);

*/
const processChangeStream = async (collection, processRecord, pipeline = []) => {
const processChangeStream = async (processRecord, pipeline = []) => {
const abortController = new AbortController();
// Redis keys
const { changeStreamTokenKey } = (0, exports.getKeys)(collection);
const { changeStreamTokenKey } = keys;
// Lookup change stream token

@@ -146,5 +150,4 @@ const token = await redis.get(changeStreamTokenKey);

*/
const reset = async (collection) => {
const keys = Object.values((0, exports.getKeys)(collection));
await redis.del(...keys);
const reset = async () => {
await redis.del(...Object.values(keys));
};

@@ -154,6 +157,57 @@ /**

*/
const clearCompletedOn = async (collection) => {
const keys = (0, exports.getKeys)(collection);
const clearCompletedOn = async () => {
await redis.del(keys.scanCompletedKey);
};
/**
* Get the existing JSON schema for the collection.
*/
const getCollectionSchema = async (db) => {
const colls = await db
.listCollections({ name: collection.collectionName })
.toArray();
return fp_js_1.default.get('0.options.validator.$jsonSchema', colls);
};
/**
* Get cached collection schema
*/
const getCachedCollectionSchema = () => redis.get(keys.schemaKey).then((val) => val && JSON.parse(val));
/**
* Check for schema changes every interval and emit 'change' event if found.
*/
const detectSchemaChange = async (db, interval = (0, ms_1.default)('10s')) => {
const emitter = new node_events_1.default.EventEmitter();
let timer;
// Check for a cached schema
let previousSchema = await getCachedCollectionSchema();
if (!previousSchema) {
const schema = await getCollectionSchema(db);
// Persist schema
await redis.setnx(keys.schemaKey, JSON.stringify(schema));
previousSchema = schema;
}
// Check for a schema change
const checkForSchemaChange = async () => {
const currentSchema = await getCollectionSchema(db);
// Schemas are no longer the same
if (!fp_js_1.default.isEqual(currentSchema, previousSchema)) {
debug('Schema change detected %O', currentSchema);
emitter.emit('change', { initialSchema: previousSchema, currentSchema });
// Persist schema
await redis.set(keys.schemaKey, JSON.stringify(currentSchema));
previousSchema = currentSchema;
}
};
const start = () => {
debug('Started polling for schema changes');
// Perform an inital check
checkForSchemaChange();
// Check for schema changes every interval
timer = setInterval(checkForSchemaChange, interval);
};
const stop = () => {
debug('Stopped polling for schema changes');
clearInterval(timer);
};
return { start, stop, emitter };
};
return {

@@ -164,4 +218,7 @@ runInitialScan,

clearCompletedOn,
getCollectionSchema,
detectSchemaChange,
keys,
};
};
exports.initSync = initSync;
{
"name": "mongochangestream",
"version": "0.16.0",
"version": "0.17.0",
"description": "Sync MongoDB collections via change streams into any database.",

@@ -45,2 +45,3 @@ "author": "GovSpend",

"lodash": "^4.17.21",
"ms": "^2.1.3",
"prom-utils": "^0.3.0"

@@ -47,0 +48,0 @@ },

@@ -44,5 +44,5 @@ # Mongo Change Stream

// Sync collection
const sync = initSync(redis)
sync.syncCollection(coll, processRecord)
const changeStream = await sync.processChangeStream(coll, processRecord)
const sync = initSync(redis, coll)
sync.syncCollection(processRecord)
const changeStream = await sync.processChangeStream(processRecord)
changeStream.start()

@@ -70,3 +70,2 @@ setTimeout(changeStream.stop, 30000)

const runInitialScan = async (
collection: Collection,
processRecords: ProcessRecords,

@@ -77,3 +76,2 @@ options?: QueueOptions & ScanOptions

const processChangeStream = async (
collection: Collection,
processRecord: ProcessRecord,

@@ -80,0 +78,0 @@ pipeline?: Document[]

@@ -7,2 +7,3 @@ import _ from 'lodash/fp.js'

ObjectId,
Db,
} from 'mongodb'

@@ -25,2 +26,4 @@ import changeStreamToIterator from './changeStreamToIterator.js'

} from './util.js'
import events from 'node:events'
import ms from 'ms'

@@ -34,3 +37,3 @@ const debug = _debug('mongochangestream')

*/
export const getKeys = (collection: Collection) => {
const getKeys = (collection: Collection) => {
const collectionKey = getCollectionKey(collection)

@@ -41,2 +44,3 @@ const scanPrefix = `${keyPrefix}:${collectionKey}`

const changeStreamTokenKey = `${keyPrefix}:${collectionKey}:changeStreamToken`
const schemaKey = `${keyPrefix}:${collectionKey}:schema`
return {

@@ -46,2 +50,3 @@ scanCompletedKey,

changeStreamTokenKey,
schemaKey,
}

@@ -56,3 +61,8 @@ }

export const initSync = (redis: Redis, options?: SyncOptions) => {
export const initSync = (
redis: Redis,
collection: Collection,
options?: SyncOptions
) => {
const keys = getKeys(collection)
const omit = options?.omit

@@ -65,3 +75,2 @@ const omitPipeline = omit ? generatePipelineFromOmit(omit) : []

const runInitialScan = async (
collection: Collection,
processRecords: ProcessRecords,

@@ -73,3 +82,3 @@ options?: QueueOptions & ScanOptions

// Redis keys
const { scanCompletedKey, lastScanIdKey } = getKeys(collection)
const { scanCompletedKey, lastScanIdKey } = keys
// Determine if initial scan has already completed

@@ -136,3 +145,2 @@ const scanCompleted = await redis.get(scanCompletedKey)

const processChangeStream = async (
collection: Collection,
processRecord: ProcessRecord,

@@ -143,3 +151,3 @@ pipeline: Document[] = []

// Redis keys
const { changeStreamTokenKey } = getKeys(collection)
const { changeStreamTokenKey } = keys
// Lookup change stream token

@@ -183,5 +191,4 @@ const token = await redis.get(changeStreamTokenKey)

*/
const reset = async (collection: Collection) => {
const keys = Object.values(getKeys(collection))
await redis.del(...keys)
const reset = async () => {
await redis.del(...Object.values(keys))
}

@@ -192,7 +199,61 @@

*/
const clearCompletedOn = async (collection: Collection) => {
const keys = getKeys(collection)
const clearCompletedOn = async () => {
await redis.del(keys.scanCompletedKey)
}
/**
* Get the existing JSON schema for the collection.
*/
const getCollectionSchema = async (db: Db): Promise<object | undefined> => {
const colls = await db
.listCollections({ name: collection.collectionName })
.toArray()
return _.get('0.options.validator.$jsonSchema', colls)
}
/**
* Get cached collection schema
*/
const getCachedCollectionSchema = () =>
redis.get(keys.schemaKey).then((val: any) => val && JSON.parse(val))
/**
* Check for schema changes every interval and emit 'change' event if found.
*/
const detectSchemaChange = async (db: Db, interval = ms('10s')) => {
const emitter = new events.EventEmitter()
let timer: NodeJS.Timer
// Check for a cached schema
let previousSchema = await getCachedCollectionSchema()
if (!previousSchema) {
const schema = await getCollectionSchema(db)
// Persist schema
await redis.setnx(keys.schemaKey, JSON.stringify(schema))
previousSchema = schema
}
// Check for a schema change
const checkForSchemaChange = async () => {
const currentSchema = await getCollectionSchema(db)
// Schemas are no longer the same
if (!_.isEqual(currentSchema, previousSchema)) {
debug('Schema change detected %O', currentSchema)
emitter.emit('change', { initialSchema: previousSchema, currentSchema })
// Persist schema
await redis.set(keys.schemaKey, JSON.stringify(currentSchema))
previousSchema = currentSchema
}
}
const start = () => {
debug('Started polling for schema changes')
// Perform an inital check
checkForSchemaChange()
// Check for schema changes every interval
timer = setInterval(checkForSchemaChange, interval)
}
const stop = () => {
debug('Stopped polling for schema changes')
clearInterval(timer)
}
return { start, stop, emitter }
}
return {

@@ -203,3 +264,6 @@ runInitialScan,

clearCompletedOn,
getCollectionSchema,
detectSchemaChange,
keys,
}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc