Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mongochangestream

Package Overview
Dependencies
Maintainers
1
Versions
59
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongochangestream - npm Package Compare versions

Comparing version 0.15.0 to 0.16.0

5

CHANGELOG.md

@@ -0,1 +1,6 @@

# 0.16.0
- BREAKING CHANGE: Changed API for `processChangeStream`. You must explicitly call `start` now.
The change stream can be stopped by calling `stop`.
# 0.15.0

@@ -2,0 +7,0 @@

5

dist/changeStreamToIterator.d.ts
import { ChangeStreamDocument, ChangeStreamOptions, Collection, Document } from 'mongodb';
declare const changeStreamToIterator: (collection: Collection, pipeline: Document[], options: ChangeStreamOptions) => {
declare const changeStreamToIterator: (collection: Collection, pipeline: Document[], signal: AbortSignal, options: ChangeStreamOptions) => {
[Symbol.asyncIterator](): {

@@ -7,2 +7,5 @@ next(): Promise<{

done: boolean;
} | {
value: ChangeStreamDocument<Document>;
done: boolean;
}>;

@@ -9,0 +12,0 @@ };

24

dist/changeStreamToIterator.js

@@ -7,5 +7,12 @@ "use strict";

const debug_1 = __importDefault(require("debug"));
const debug = (0, debug_1.default)('mongoChangeStream');
const changeStreamToIterator = (collection, pipeline, options) => {
const prom_utils_1 = require("prom-utils");
const debug = (0, debug_1.default)('mongochangestream');
const changeStreamToIterator = (collection, pipeline, signal, options) => {
const changeStream = collection.watch(pipeline, options);
const deferred = (0, prom_utils_1.defer)();
signal.onabort = async () => {
deferred.done();
await changeStream.close();
debug('Closed change stream');
};
debug('Started change stream - pipeline %O options %O', pipeline, options);

@@ -16,8 +23,9 @@ return {

async next() {
if (changeStream.closed) {
return { value: {}, done: true };
}
return changeStream
.next()
.then((data) => ({ value: data, done: false }));
return Promise.race([
deferred.promise.then(() => ({
value: {},
done: true,
})),
changeStream.next().then((data) => ({ value: data, done: false })),
]);
},

@@ -24,0 +32,0 @@ };

@@ -21,5 +21,8 @@ import _ from 'lodash/fp.js';

runInitialScan: (collection: Collection, processRecords: ProcessRecords, options?: QueueOptions & ScanOptions) => Promise<void>;
processChangeStream: (collection: Collection, processRecord: ProcessRecord, pipeline?: Document[]) => Promise<void>;
processChangeStream: (collection: Collection, processRecord: ProcessRecord, pipeline?: Document[]) => Promise<{
start: () => Promise<void>;
stop: () => void;
}>;
reset: (collection: Collection) => Promise<void>;
clearCompletedOn: (collection: Collection) => Promise<void>;
};

@@ -99,4 +99,8 @@ "use strict";

* those fields will be prepended to the `pipeline` argument.
*
* Call `start` to start processing events and `stop` to close
* the change stream.
*/
const processChangeStream = async (collection, processRecord, pipeline = []) => {
const abortController = new AbortController();
// Redis keys

@@ -111,18 +115,23 @@ const { changeStreamTokenKey } = (0, exports.getKeys)(collection);

// Get the change stream as an async iterator
const changeStream = (0, changeStreamToIterator_js_1.default)(collection, [...omitPipeline, ...pipeline], options);
// Consume the events
for await (let event of changeStream) {
debug('Change stream event %O', event);
// Get resume token
const token = event?._id;
// Omit nested fields that are not handled by $unset.
// For example, if 'a' was omitted then 'a.b.c' should be omitted.
if (event.operationType === 'update' && omit) {
event = (0, util_js_1.omitFieldForUpdate)(omit)(event);
const changeStream = (0, changeStreamToIterator_js_1.default)(collection, [...omitPipeline, ...pipeline], abortController.signal, options);
const start = async () => {
for await (let event of changeStream) {
debug('Change stream event %O', event);
// Get resume token
const token = event?._id;
// Omit nested fields that are not handled by $unset.
// For example, if 'a' was omitted then 'a.b.c' should be omitted.
if (event.operationType === 'update' && omit) {
event = (0, util_js_1.omitFieldForUpdate)(omit)(event);
}
// Process record
await processRecord(event);
// Update change stream token
await redis.set(changeStreamTokenKey, JSON.stringify(token));
}
// Process record
await processRecord(event);
// Update change stream token
await redis.set(changeStreamTokenKey, JSON.stringify(token));
}
};
const stop = () => {
abortController.abort();
};
return { start, stop };
};

@@ -129,0 +138,0 @@ /**

{
"name": "mongochangestream",
"version": "0.15.0",
"version": "0.16.0",
"description": "Sync MongoDB collections via change streams into any database.",

@@ -39,4 +39,2 @@ "author": "GovSpend",

"eslint": "^8.21.0",
"ioredis": "^5.2.2",
"mongodb": "^4.8.1",
"prettier": "^2.7.1",

@@ -48,3 +46,3 @@ "typescript": "^4.7.4"

"lodash": "^4.17.21",
"prom-utils": "^0.2.0"
"prom-utils": "^0.3.0"
},

@@ -51,0 +49,0 @@ "peerDependencies": {

@@ -39,6 +39,12 @@ # Mongo Change Stream

}
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
console.dir(docs, { depth: 10 })
}
// Sync collection
const sync = initSync(redis)
await sync.syncCollection(coll, processRecord)
sync.syncCollection(coll, processRecord)
const changeStream = await sync.processChangeStream(coll, processRecord)
changeStream.start()
setTimeout(changeStream.stop, 30000)
```

@@ -57,4 +63,6 @@

export type ProcessRecord = (
doc: ChangeStreamDocument | ChangeStreamDocument[]
export type ProcessRecord = (doc: ChangeStreamDocument) => void | Promise<void>
export type ProcessRecords = (
doc: ChangeStreamInsertDocument[]
) => void | Promise<void>

@@ -64,5 +72,5 @@

collection: Collection,
processRecord: ProcessRecord,
processRecords: ProcessRecords,
options?: QueueOptions & ScanOptions
): Promise<void> => ...
)

@@ -73,5 +81,3 @@ const processChangeStream = async (

pipeline?: Document[]
): Promise<void> => ...
const reset = async (collection: Collection): Promise<void> => ...
)
```

@@ -78,0 +84,0 @@

@@ -8,4 +8,5 @@ import {

import _debug from 'debug'
import { defer } from 'prom-utils'
const debug = _debug('mongoChangeStream')
const debug = _debug('mongochangestream')

@@ -15,5 +16,12 @@ const changeStreamToIterator = (

pipeline: Document[],
signal: AbortSignal,
options: ChangeStreamOptions
) => {
const changeStream = collection.watch(pipeline, options)
const deferred = defer()
signal.onabort = async () => {
deferred.done()
await changeStream.close()
debug('Closed change stream')
}
debug('Started change stream - pipeline %O options %O', pipeline, options)

@@ -24,8 +32,9 @@ return {

async next() {
if (changeStream.closed) {
return { value: {} as ChangeStreamDocument, done: true }
}
return changeStream
.next()
.then((data) => ({ value: data, done: false }))
return Promise.race([
deferred.promise.then(() => ({
value: {} as ChangeStreamDocument,
done: true,
})),
changeStream.next().then((data) => ({ value: data, done: false })),
])
},

@@ -32,0 +41,0 @@ }

@@ -123,2 +123,5 @@ import _ from 'lodash/fp.js'

* those fields will be prepended to the `pipeline` argument.
*
* Call `start` to start processing events and `stop` to close
* the change stream.
*/

@@ -130,2 +133,3 @@ const processChangeStream = async (

) => {
const abortController = new AbortController()
// Redis keys

@@ -143,19 +147,25 @@ const { changeStreamTokenKey } = getKeys(collection)

[...omitPipeline, ...pipeline],
abortController.signal,
options
)
// Consume the events
for await (let event of changeStream) {
debug('Change stream event %O', event)
// Get resume token
const token = event?._id
// Omit nested fields that are not handled by $unset.
// For example, if 'a' was omitted then 'a.b.c' should be omitted.
if (event.operationType === 'update' && omit) {
event = omitFieldForUpdate(omit)(event)
const start = async () => {
for await (let event of changeStream) {
debug('Change stream event %O', event)
// Get resume token
const token = event?._id
// Omit nested fields that are not handled by $unset.
// For example, if 'a' was omitted then 'a.b.c' should be omitted.
if (event.operationType === 'update' && omit) {
event = omitFieldForUpdate(omit)(event)
}
// Process record
await processRecord(event)
// Update change stream token
await redis.set(changeStreamTokenKey, JSON.stringify(token))
}
// Process record
await processRecord(event)
// Update change stream token
await redis.set(changeStreamTokenKey, JSON.stringify(token))
}
const stop = () => {
abortController.abort()
}
return { start, stop }
}

@@ -162,0 +172,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc