Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mongochangestream

Package Overview
Dependencies
Maintainers
1
Versions
60
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongochangestream - npm Package Compare versions

Comparing version 0.22.0 to 0.23.0

4

CHANGELOG.md

@@ -0,1 +1,5 @@

# 0.23.0
- `JSONSchema` type.
# 0.22.0

@@ -2,0 +6,0 @@

/// <reference types="node" />
import _ from 'lodash/fp.js';
import { Collection, Document, ObjectId, Db } from 'mongodb';
import { SyncOptions, ProcessRecord, ProcessRecords, ScanOptions, ChangeOptions } from './types.js';
import { SyncOptions, ProcessRecord, ProcessRecords, ScanOptions, ChangeOptions, JSONSchema } from './types.js';
import type { Redis } from 'ioredis';

@@ -14,2 +14,9 @@ import { QueueOptions } from 'prom-utils';

export declare const initSync: (redis: Redis, collection: Collection, options?: SyncOptions) => {
/**
* Run initial collection scan. `options.batchSize` defaults to 500.
* Sorting defaults to `_id`.
*
* Call `start` to start processing documents and `stop` to close
* the cursor.
*/
runInitialScan: (processRecords: ProcessRecords, options?: QueueOptions & ScanOptions) => Promise<{

@@ -19,2 +26,10 @@ start: () => Promise<void>;

}>;
/**
* Process MongoDB change stream for the collection.
* If omit is passed to `initSync` a pipeline stage that removes
* those fields will be prepended to the `pipeline` argument.
*
* Call `start` to start processing events and `stop` to close
* the change stream.
*/
processChangeStream: (processRecord: ProcessRecord, pipeline?: Document[]) => Promise<{

@@ -24,5 +39,18 @@ start: () => Promise<void>;

}>;
/**
* Delete all Redis keys for the collection.
*/
reset: () => Promise<void>;
clearCompletedOn: () => Promise<void>;
getCollectionSchema: (db: Db) => Promise<object>;
/**
* Get the existing JSON schema for the collection.
*/
getCollectionSchema: (db: Db) => Promise<JSONSchema>;
/**
* Check for schema changes every interval and emit 'change' event if found.
* Optionally, set interval and strip metadata (i.e., title and description)
* from the JSON schema.
*
* Call `start` to start polling for schema changes and `stop` to clear
* the timer.
*/
detectSchemaChange: (db: Db, options?: ChangeOptions) => Promise<{

@@ -33,2 +61,5 @@ start: () => Promise<void>;

}>;
/**
* Redis keys used for the collection.
*/
keys: {

@@ -35,0 +66,0 @@ scanCompletedKey: string;

68

dist/mongoChangeStream.js

@@ -18,3 +18,3 @@ "use strict";

/**
* Get Redis keys used for the given collection.
* Get Redis keys used for the collection.
*/

@@ -40,10 +40,6 @@ const getKeys = (collection) => {

};
const initSync = (redis, collection, options) => {
const initSync = (redis, collection, options = {}) => {
const keys = getKeys(collection);
const omit = options?.omit;
const omit = options.omit;
const omitPipeline = omit ? (0, util_js_1.generatePipelineFromOmit)(omit) : [];
/**
* Run initial collection scan. `options.batchSize` defaults to 500.
* Sorting defaults to `_id`.
*/
const runInitialScan = async (processRecords, options = {}) => {

@@ -121,10 +117,2 @@ let deferred;

const defaultOptions = { fullDocument: 'updateLookup' };
/**
* Process MongoDB change stream for the given collection.
* If omit is passed to `initSync` a pipeline stage that removes
* those fields will be prepended to the `pipeline` argument.
*
* Call `start` to start processing events and `stop` to close
* the change stream.
*/
const processChangeStream = async (processRecord, pipeline = []) => {

@@ -172,17 +160,5 @@ let deferred;

};
/**
* Delete all Redis keys for the given collection.
*/
const reset = async () => {
await redis.del(...Object.values(keys));
};
/**
* Delete completed on key in Redis for the given collection.
*/
const clearCompletedOn = async () => {
await redis.del(keys.scanCompletedKey);
};
/**
* Get the existing JSON schema for the collection.
*/
const getCollectionSchema = async (db) => {

@@ -198,7 +174,2 @@ const colls = await db

const getCachedCollectionSchema = () => redis.get(keys.schemaKey).then((val) => val && JSON.parse(val));
/**
* Check for schema changes every interval and emit 'change' event if found.
* Optionally, set interval and strip metadata (i.e., title and description)
* from the JSON schema.
*/
const detectSchemaChange = async (db, options = {}) => {

@@ -247,8 +218,39 @@ const interval = options.interval || (0, ms_1.default)('1m');

return {
/**
* Run initial collection scan. `options.batchSize` defaults to 500.
* Sorting defaults to `_id`.
*
* Call `start` to start processing documents and `stop` to close
* the cursor.
*/
runInitialScan,
/**
* Process MongoDB change stream for the collection.
* If omit is passed to `initSync` a pipeline stage that removes
* those fields will be prepended to the `pipeline` argument.
*
* Call `start` to start processing events and `stop` to close
* the change stream.
*/
processChangeStream,
/**
* Delete all Redis keys for the collection.
*/
reset,
clearCompletedOn,
/**
* Get the existing JSON schema for the collection.
*/
getCollectionSchema,
/**
* Check for schema changes every interval and emit 'change' event if found.
* Optionally, set interval and strip metadata (i.e., title and description)
* from the JSON schema.
*
* Call `start` to start polling for schema changes and `stop` to clear
* the timer.
*/
detectSchemaChange,
/**
* Redis keys used for the collection.
*/
keys,

@@ -255,0 +257,0 @@ };

import { ChangeStreamDocument, ChangeStreamInsertDocument } from 'mongodb';
import { JSONSchema4, JSONSchema6, JSONSchema7 } from 'json-schema';
export declare type ProcessRecord = (doc: ChangeStreamDocument) => void | Promise<void>;

@@ -18,1 +19,2 @@ export declare type ProcessRecords = (doc: ChangeStreamInsertDocument[]) => void | Promise<void>;

}
export declare type JSONSchema = JSONSchema4 | JSONSchema6 | JSONSchema7;
import { Collection } from 'mongodb';
import _ from 'lodash/fp.js';
import { JSONSchema } from './types';
export declare const setDefaults: (keys: string[], val: any) => Record<string, any>;

@@ -10,6 +11,15 @@ export declare const generatePipelineFromOmit: (omit: string[]) => {

export declare const getCollectionKey: (collection: Collection) => string;
export declare const traverseSchema: (x: JSONSchema) => false | {
[k: string]: import("json-schema").JSONSchema4;
} | {
[k: string]: import("json-schema").JSONSchema6Definition;
} | {
[key: string]: import("json-schema").JSONSchema7Definition;
} | {
_items: true | import("json-schema").JSONSchema4 | import("json-schema").JSONSchema4[] | import("json-schema").JSONSchema6 | import("json-schema").JSONSchema6Definition[] | import("json-schema").JSONSchema7 | import("json-schema").JSONSchema7Definition[];
} | undefined;
/**
* Remove title and description from a JSON schema.
*/
export declare const removeMetadata: (schema: object) => object;
export declare function when<T>(condition: any, fn: (x: T) => any): (x: T) => any;
export declare const removeMetadata: (schema: JSONSchema) => object;
export declare function when<T, R>(condition: any, fn: (x: T) => R): (x: T) => T | R;

@@ -6,3 +6,3 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.when = exports.removeMetadata = exports.getCollectionKey = exports.omitFieldForUpdate = exports.omitFields = exports.generatePipelineFromOmit = exports.setDefaults = void 0;
exports.when = exports.removeMetadata = exports.traverseSchema = exports.getCollectionKey = exports.omitFieldForUpdate = exports.omitFields = exports.generatePipelineFromOmit = exports.setDefaults = void 0;
const fp_js_1 = __importDefault(require("lodash/fp.js"));

@@ -32,2 +32,4 @@ const obj_walker_1 = require("obj-walker");

exports.getCollectionKey = getCollectionKey;
const traverseSchema = (x) => x.properties || (x.items && { _items: x.items });
exports.traverseSchema = traverseSchema;
/**

@@ -37,3 +39,2 @@ * Remove title and description from a JSON schema.

const removeMetadata = (schema) => {
const traverse = (x) => x.properties || (x.items && { items: x.items });
const walkFn = ({ val }) => {

@@ -47,3 +48,3 @@ if ('title' in val) {

};
return (0, obj_walker_1.walkie)(schema, walkFn, { traverse });
return (0, obj_walker_1.walkie)(schema, walkFn, { traverse: exports.traverseSchema });
};

@@ -50,0 +51,0 @@ exports.removeMetadata = removeMetadata;

{
"name": "mongochangestream",
"version": "0.22.0",
"version": "0.23.0",
"description": "Sync MongoDB collections via change streams into any database.",

@@ -43,2 +43,3 @@ "author": "GovSpend",

"dependencies": {
"@types/json-schema": "^7.0.11",
"debug": "^4.3.4",

@@ -45,0 +46,0 @@ "lodash": "^4.17.21",

@@ -17,2 +17,3 @@ import _ from 'lodash/fp.js'

ChangeOptions,
JSONSchema,
} from './types.js'

@@ -38,3 +39,3 @@ import _debug from 'debug'

/**
* Get Redis keys used for the given collection.
* Get Redis keys used for the collection.
*/

@@ -65,11 +66,8 @@ const getKeys = (collection: Collection) => {

collection: Collection,
options?: SyncOptions
options: SyncOptions = {}
) => {
const keys = getKeys(collection)
const omit = options?.omit
const omit = options.omit
const omitPipeline = omit ? generatePipelineFromOmit(omit) : []
/**
* Run initial collection scan. `options.batchSize` defaults to 500.
* Sorting defaults to `_id`.
*/
const runInitialScan = async (

@@ -158,10 +156,2 @@ processRecords: ProcessRecords,

/**
* Process MongoDB change stream for the given collection.
* If omit is passed to `initSync` a pipeline stage that removes
* those fields will be prepended to the `pipeline` argument.
*
* Call `start` to start processing events and `stop` to close
* the change stream.
*/
const processChangeStream = async (

@@ -216,5 +206,2 @@ processRecord: ProcessRecord,

/**
* Delete all Redis keys for the given collection.
*/
const reset = async () => {

@@ -224,13 +211,3 @@ await redis.del(...Object.values(keys))

/**
* Delete completed on key in Redis for the given collection.
*/
const clearCompletedOn = async () => {
await redis.del(keys.scanCompletedKey)
}
/**
* Get the existing JSON schema for the collection.
*/
const getCollectionSchema = async (db: Db): Promise<object> => {
const getCollectionSchema = async (db: Db): Promise<JSONSchema> => {
const colls = await db

@@ -247,7 +224,3 @@ .listCollections({ name: collection.collectionName })

redis.get(keys.schemaKey).then((val: any) => val && JSON.parse(val))
/**
* Check for schema changes every interval and emit 'change' event if found.
* Optionally, set interval and strip metadata (i.e., title and description)
* from the JSON schema.
*/
const detectSchemaChange = async (db: Db, options: ChangeOptions = {}) => {

@@ -302,10 +275,41 @@ const interval = options.interval || ms('1m')

return {
/**
* Run initial collection scan. `options.batchSize` defaults to 500.
* Sorting defaults to `_id`.
*
* Call `start` to start processing documents and `stop` to close
* the cursor.
*/
runInitialScan,
/**
* Process MongoDB change stream for the collection.
* If omit is passed to `initSync` a pipeline stage that removes
* those fields will be prepended to the `pipeline` argument.
*
* Call `start` to start processing events and `stop` to close
* the change stream.
*/
processChangeStream,
/**
* Delete all Redis keys for the collection.
*/
reset,
clearCompletedOn,
/**
* Get the existing JSON schema for the collection.
*/
getCollectionSchema,
/**
* Check for schema changes every interval and emit 'change' event if found.
* Optionally, set interval and strip metadata (i.e., title and description)
* from the JSON schema.
*
* Call `start` to start polling for schema changes and `stop` to clear
* the timer.
*/
detectSchemaChange,
/**
* Redis keys used for the collection.
*/
keys,
}
}
import { ChangeStreamDocument, ChangeStreamInsertDocument } from 'mongodb'
import { JSONSchema4, JSONSchema6, JSONSchema7 } from 'json-schema'

@@ -25,1 +26,3 @@ export type ProcessRecord = (doc: ChangeStreamDocument) => void | Promise<void>

}
export type JSONSchema = JSONSchema4 | JSONSchema6 | JSONSchema7
import { Collection } from 'mongodb'
import _ from 'lodash/fp.js'
import { walkie, Node } from 'obj-walker'
import { Node, walkie } from 'obj-walker'
import { JSONSchema } from './types'

@@ -32,7 +33,9 @@ export const setDefaults = (keys: string[], val: any) => {

export const traverseSchema = (x: JSONSchema) =>
x.properties || (x.items && { _items: x.items })
/**
* Remove title and description from a JSON schema.
*/
export const removeMetadata = (schema: object) => {
const traverse = (x: any) => x.properties || (x.items && { items: x.items })
export const removeMetadata = (schema: JSONSchema) => {
const walkFn = ({ val }: Node) => {

@@ -46,9 +49,9 @@ if ('title' in val) {

}
return walkie(schema, walkFn, { traverse })
return walkie(schema, walkFn, { traverse: traverseSchema })
}
export function when<T>(condition: any, fn: (x: T) => any) {
return function(x: T) {
export function when<T, R>(condition: any, fn: (x: T) => R) {
return function (x: T) {
return condition ? fn(x) : x
}
}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc