Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mongochangestream

Package Overview
Dependencies
Maintainers
1
Versions
59
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongochangestream - npm Package Compare versions

Comparing version 0.8.0 to 0.9.0

4

CHANGELOG.md

@@ -0,1 +1,5 @@

# 0.9.0
* Moved `omit` to `initSync` so that it can be used in the `processChangeStream` pipeline.
# 0.8.0

@@ -2,0 +6,0 @@

export * from './mongoChangeStream.js';
export * from './types.js';
export * from './util.js';

@@ -19,1 +19,2 @@ "use strict";

__exportStar(require("./types.js"), exports);
__exportStar(require("./util.js"), exports);

4

dist/mongoChangeStream.d.ts
import _ from 'lodash/fp.js';
import { Collection, ObjectId } from 'mongodb';
import { ProcessRecord, ProcessRecords, ScanOptions } from './types.js';
import { Options, ProcessRecord, ProcessRecords, ScanOptions } from './types.js';
import type { default as Redis } from 'ioredis';

@@ -19,3 +19,3 @@ import { QueueOptions } from 'prom-utils';

};
export declare const initSync: (redis: Redis) => {
export declare const initSync: (redis: Redis, options?: Options) => {
runInitialScan: (collection: Collection, processRecords: ProcessRecords, options?: QueueOptions & ScanOptions) => Promise<void>;

@@ -22,0 +22,0 @@ processChangeStream: (collection: Collection, processRecord: ProcessRecord, pipeline?: Document[]) => Promise<void>;

@@ -37,3 +37,5 @@ "use strict";

};
const initSync = (redis) => {
const initSync = (redis, options) => {
const omit = options?.omit;
const defaultPipeline = omit ? (0, util_js_1.generatePipelineFromOmit)(omit) : [];
/**

@@ -46,3 +48,2 @@ * Run initial collection scan. `options.batchSize` defaults to 500.

const sortField = options?.sortField || exports.defaultSortField;
const omit = options?.omit;
// Redis keys

@@ -100,3 +101,3 @@ const { scanCompletedKey, lastScanIdKey } = (0, exports.getKeys)(collection);

*/
const processChangeStream = async (collection, processRecord, pipeline = []) => {
const processChangeStream = async (collection, processRecord, pipeline) => {
// Redis keys

@@ -111,3 +112,3 @@ const { changeStreamTokenKey } = (0, exports.getKeys)(collection);

// Get the change stream as an async iterator
const changeStream = (0, changeStreamToIterator_js_1.default)(collection, pipeline, options);
const changeStream = (0, changeStreamToIterator_js_1.default)(collection, pipeline || defaultPipeline, options);
// Consume the events

@@ -114,0 +115,0 @@ for await (const event of changeStream) {

import { ChangeStreamDocument, ChangeStreamInsertDocument } from 'mongodb';
export declare type ProcessRecord = (doc: ChangeStreamDocument) => void | Promise<void>;
export declare type ProcessRecords = (doc: ChangeStreamInsertDocument[]) => void | Promise<void>;
export interface Options {
omit?: string[];
}
export interface ScanOptions<T = any> {

@@ -10,3 +13,2 @@ sortField?: {

};
omit?: string[];
}
export declare const setDefaults: (keys: string[], val: any) => Record<string, any>;
export declare const generatePipelineFromOmit: (omit: string[]) => {
$unset: string[];
}[];
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.setDefaults = void 0;
exports.generatePipelineFromOmit = exports.setDefaults = void 0;
const setDefaults = (keys, val) => {

@@ -12,1 +12,9 @@ const obj = {};

exports.setDefaults = setDefaults;
const generatePipelineFromOmit = (omit) => {
const fields = omit.flatMap((field) => [
`fullDocument.${field}`,
`updateDescription.updatedFields.${field}`,
]);
return [{ $unset: fields }];
};
exports.generatePipelineFromOmit = generatePipelineFromOmit;
{
"name": "mongochangestream",
"version": "0.8.0",
"version": "0.9.0",
"description": "Sync MongoDB collections via change streams into any database.",

@@ -5,0 +5,0 @@ "author": "GovSpend",

@@ -39,3 +39,3 @@ # Mongo Change Stream

Below are the available methods.
Below are the available methods.

@@ -58,3 +58,3 @@ The `processChangeStream` method will never complete, but `runInitialScan` will complete

processRecord: ProcessRecord,
options?: QueueOptions
options?: QueueOptions & ScanOptions
): Promise<void> => ...

@@ -65,3 +65,3 @@

processRecord: ProcessRecord,
pipeline: Document[] = []
pipeline?: Document[]
): Promise<void> => ...

@@ -68,0 +68,0 @@

export * from './mongoChangeStream.js'
export * from './types.js'
export * from './util.js'
import _ from 'lodash/fp.js'
import { ChangeStreamInsertDocument, Collection, ObjectId } from 'mongodb'
import changeStreamToIterator from './changeStreamToIterator.js'
import { ProcessRecord, ProcessRecords, ScanOptions } from './types.js'
import { Options, ProcessRecord, ProcessRecords, ScanOptions } from './types.js'
import _debug from 'debug'
import type { default as Redis } from 'ioredis'
import { batchQueue, QueueOptions } from 'prom-utils'
import { setDefaults } from './util.js'
import { generatePipelineFromOmit, setDefaults } from './util.js'

@@ -39,3 +39,5 @@ const debug = _debug('mongoChangeStream')

export const initSync = (redis: Redis) => {
export const initSync = (redis: Redis, options?: Options) => {
const omit = options?.omit
const defaultPipeline = omit ? generatePipelineFromOmit(omit) : []
/**

@@ -52,3 +54,2 @@ * Run initial collection scan. `options.batchSize` defaults to 500.

const sortField = options?.sortField || defaultSortField
const omit = options?.omit
// Redis keys

@@ -114,3 +115,3 @@ const { scanCompletedKey, lastScanIdKey } = getKeys(collection)

processRecord: ProcessRecord,
pipeline: Document[] = []
pipeline?: Document[]
) => {

@@ -126,3 +127,7 @@ // Redis keys

// Get the change stream as an async iterator
const changeStream = changeStreamToIterator(collection, pipeline, options)
const changeStream = changeStreamToIterator(
collection,
pipeline || defaultPipeline,
options
)
// Consume the events

@@ -149,3 +154,3 @@ for await (const event of changeStream) {

/**
* Delete completed on key in Redis for the given collection.
* Delete completed on key in Redis for the given collection.
*/

@@ -152,0 +157,0 @@ const clearCompletedOn = async (collection: Collection) => {

@@ -9,2 +9,6 @@ import { ChangeStreamDocument, ChangeStreamInsertDocument } from 'mongodb'

export interface Options {
omit?: string[]
}
export interface ScanOptions<T = any> {

@@ -16,3 +20,2 @@ sortField?: {

}
omit?: string[]
}

@@ -8,1 +8,9 @@ export const setDefaults = (keys: string[], val: any) => {

}
export const generatePipelineFromOmit = (omit: string[]) => {
const fields = omit.flatMap((field) => [
`fullDocument.${field}`,
`updateDescription.updatedFields.${field}`,
])
return [{ $unset: fields }]
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc