Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mongochangestream

Package Overview
Dependencies
Maintainers
1
Versions
59
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongochangestream - npm Package Compare versions

Comparing version 0.17.0 to 0.18.0

4

CHANGELOG.md

@@ -0,1 +1,5 @@

# 0.18.0
- Await processing of event when calling `stop`.
# 0.17.0

@@ -2,0 +6,0 @@

9

dist/changeStreamToIterator.d.ts

@@ -1,10 +0,7 @@

import { ChangeStreamDocument, ChangeStreamOptions, Collection, Document } from 'mongodb';
declare const changeStreamToIterator: (collection: Collection, pipeline: Document[], signal: AbortSignal, options: ChangeStreamOptions) => {
import { ChangeStreamOptions, Collection, Document } from 'mongodb';
declare const changeStreamToIterator: (collection: Collection, pipeline: Document[], options: ChangeStreamOptions) => {
[Symbol.asyncIterator](): {
next(): Promise<{
value: ChangeStreamDocument<Document>;
value: import("mongodb").ChangeStreamDocument<Document>;
done: boolean;
} | {
value: ChangeStreamDocument<Document>;
done: boolean;
}>;

@@ -11,0 +8,0 @@ };

@@ -7,12 +7,5 @@ "use strict";

const debug_1 = __importDefault(require("debug"));
const prom_utils_1 = require("prom-utils");
const debug = (0, debug_1.default)('mongochangestream');
const changeStreamToIterator = (collection, pipeline, signal, options) => {
const changeStreamToIterator = (collection, pipeline, options) => {
const changeStream = collection.watch(pipeline, options);
const deferred = (0, prom_utils_1.defer)();
signal.onabort = async () => {
deferred.done();
await changeStream.close();
debug('Closed change stream');
};
debug('Started change stream - pipeline %O options %O', pipeline, options);

@@ -23,9 +16,5 @@ return {

async next() {
return Promise.race([
deferred.promise.then(() => ({
value: {},
done: true,
})),
changeStream.next().then((data) => ({ value: data, done: false })),
]);
return changeStream
.next()
.then((data) => ({ value: data, done: false }));
},

@@ -32,0 +21,0 @@ };

@@ -17,3 +17,3 @@ /// <reference types="node" />

start: () => Promise<void>;
stop: () => void;
stop: () => Promise<void>;
}>;

@@ -20,0 +20,0 @@ reset: () => Promise<void>;

@@ -109,2 +109,3 @@ "use strict";

const abortController = new AbortController();
let deferred;
// Redis keys

@@ -119,6 +120,11 @@ const { changeStreamTokenKey } = keys;

// Get the change stream as an async iterator
const changeStream = (0, changeStreamToIterator_js_1.default)(collection, [...omitPipeline, ...pipeline], abortController.signal, options);
const changeStream = (0, changeStreamToIterator_js_1.default)(collection, [...omitPipeline, ...pipeline], options);
const start = async () => {
for await (let event of changeStream) {
debug('Change stream event %O', event);
// Don't process event if stopping
if (abortController.signal.aborted) {
return;
}
deferred = (0, prom_utils_1.defer)();
// Get resume token

@@ -135,2 +141,3 @@ const token = event?._id;

await redis.set(changeStreamTokenKey, JSON.stringify(token));
deferred.done();
}

@@ -140,2 +147,4 @@ };

abortController.abort();
// Wait for event to be processed
return deferred?.promise;
};

@@ -189,5 +198,7 @@ return { start, stop };

debug('Schema change detected %O', currentSchema);
emitter.emit('change', { initialSchema: previousSchema, currentSchema });
// Persist schema
await redis.set(keys.schemaKey, JSON.stringify(currentSchema));
// Emit change
emitter.emit('change', { previousSchema, currentSchema });
// Previous schema is now the current schema
previousSchema = currentSchema;

@@ -194,0 +205,0 @@ }

{
"name": "mongochangestream",
"version": "0.17.0",
"version": "0.18.0",
"description": "Sync MongoDB collections via change streams into any database.",

@@ -5,0 +5,0 @@ "author": "GovSpend",

@@ -1,9 +0,3 @@

import {
ChangeStreamDocument,
ChangeStreamOptions,
Collection,
Document,
} from 'mongodb'
import { ChangeStreamOptions, Collection, Document } from 'mongodb'
import _debug from 'debug'
import { defer } from 'prom-utils'

@@ -15,12 +9,5 @@ const debug = _debug('mongochangestream')

pipeline: Document[],
signal: AbortSignal,
options: ChangeStreamOptions
) => {
const changeStream = collection.watch(pipeline, options)
const deferred = defer()
signal.onabort = async () => {
deferred.done()
await changeStream.close()
debug('Closed change stream')
}
debug('Started change stream - pipeline %O options %O', pipeline, options)

@@ -31,9 +18,5 @@ return {

async next() {
return Promise.race([
deferred.promise.then(() => ({
value: {} as ChangeStreamDocument,
done: true,
})),
changeStream.next().then((data) => ({ value: data, done: false })),
])
return changeStream
.next()
.then((data) => ({ value: data, done: false }))
},

@@ -40,0 +23,0 @@ }

@@ -18,3 +18,3 @@ import _ from 'lodash/fp.js'

import type { default as Redis } from 'ioredis'
import { batchQueue, QueueOptions } from 'prom-utils'
import { batchQueue, defer, Deferred, QueueOptions } from 'prom-utils'
import {

@@ -142,2 +142,3 @@ generatePipelineFromOmit,

const abortController = new AbortController()
let deferred: Deferred
// Redis keys

@@ -155,3 +156,2 @@ const { changeStreamTokenKey } = keys

[...omitPipeline, ...pipeline],
abortController.signal,
options

@@ -162,2 +162,7 @@ )

debug('Change stream event %O', event)
// Don't process event if stopping
if (abortController.signal.aborted) {
return
}
deferred = defer()
// Get resume token

@@ -174,2 +179,3 @@ const token = event?._id

await redis.set(changeStreamTokenKey, JSON.stringify(token))
deferred.done()
}

@@ -179,2 +185,4 @@ }

abortController.abort()
// Wait for event to be processed
return deferred?.promise
}

@@ -233,5 +241,7 @@ return { start, stop }

debug('Schema change detected %O', currentSchema)
emitter.emit('change', { initialSchema: previousSchema, currentSchema })
// Persist schema
await redis.set(keys.schemaKey, JSON.stringify(currentSchema))
// Emit change
emitter.emit('change', { previousSchema, currentSchema })
// Previous schema is now the current schema
previousSchema = currentSchema

@@ -238,0 +248,0 @@ }

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc