mongochangestream
Advanced tools
Comparing version 0.17.0 to 0.18.0
@@ -0,1 +1,5 @@ | ||
# 0.18.0 | ||
- Await processing of event when calling `stop`. | ||
# 0.17.0 | ||
@@ -2,0 +6,0 @@ |
@@ -1,10 +0,7 @@ | ||
import { ChangeStreamDocument, ChangeStreamOptions, Collection, Document } from 'mongodb'; | ||
declare const changeStreamToIterator: (collection: Collection, pipeline: Document[], signal: AbortSignal, options: ChangeStreamOptions) => { | ||
import { ChangeStreamOptions, Collection, Document } from 'mongodb'; | ||
declare const changeStreamToIterator: (collection: Collection, pipeline: Document[], options: ChangeStreamOptions) => { | ||
[Symbol.asyncIterator](): { | ||
next(): Promise<{ | ||
value: ChangeStreamDocument<Document>; | ||
value: import("mongodb").ChangeStreamDocument<Document>; | ||
done: boolean; | ||
} | { | ||
value: ChangeStreamDocument<Document>; | ||
done: boolean; | ||
}>; | ||
@@ -11,0 +8,0 @@ }; |
@@ -7,12 +7,5 @@ "use strict"; | ||
const debug_1 = __importDefault(require("debug")); | ||
const prom_utils_1 = require("prom-utils"); | ||
const debug = (0, debug_1.default)('mongochangestream'); | ||
const changeStreamToIterator = (collection, pipeline, signal, options) => { | ||
const changeStreamToIterator = (collection, pipeline, options) => { | ||
const changeStream = collection.watch(pipeline, options); | ||
const deferred = (0, prom_utils_1.defer)(); | ||
signal.onabort = async () => { | ||
deferred.done(); | ||
await changeStream.close(); | ||
debug('Closed change stream'); | ||
}; | ||
debug('Started change stream - pipeline %O options %O', pipeline, options); | ||
@@ -23,9 +16,5 @@ return { | ||
async next() { | ||
return Promise.race([ | ||
deferred.promise.then(() => ({ | ||
value: {}, | ||
done: true, | ||
})), | ||
changeStream.next().then((data) => ({ value: data, done: false })), | ||
]); | ||
return changeStream | ||
.next() | ||
.then((data) => ({ value: data, done: false })); | ||
}, | ||
@@ -32,0 +21,0 @@ }; |
@@ -17,3 +17,3 @@ /// <reference types="node" /> | ||
start: () => Promise<void>; | ||
stop: () => void; | ||
stop: () => Promise<void>; | ||
}>; | ||
@@ -20,0 +20,0 @@ reset: () => Promise<void>; |
@@ -109,2 +109,3 @@ "use strict"; | ||
const abortController = new AbortController(); | ||
let deferred; | ||
// Redis keys | ||
@@ -119,6 +120,11 @@ const { changeStreamTokenKey } = keys; | ||
// Get the change stream as an async iterator | ||
const changeStream = (0, changeStreamToIterator_js_1.default)(collection, [...omitPipeline, ...pipeline], abortController.signal, options); | ||
const changeStream = (0, changeStreamToIterator_js_1.default)(collection, [...omitPipeline, ...pipeline], options); | ||
const start = async () => { | ||
for await (let event of changeStream) { | ||
debug('Change stream event %O', event); | ||
// Don't process event if stopping | ||
if (abortController.signal.aborted) { | ||
return; | ||
} | ||
deferred = (0, prom_utils_1.defer)(); | ||
// Get resume token | ||
@@ -135,2 +141,3 @@ const token = event?._id; | ||
await redis.set(changeStreamTokenKey, JSON.stringify(token)); | ||
deferred.done(); | ||
} | ||
@@ -140,2 +147,4 @@ }; | ||
abortController.abort(); | ||
// Wait for event to be processed | ||
return deferred?.promise; | ||
}; | ||
@@ -189,5 +198,7 @@ return { start, stop }; | ||
debug('Schema change detected %O', currentSchema); | ||
emitter.emit('change', { initialSchema: previousSchema, currentSchema }); | ||
// Persist schema | ||
await redis.set(keys.schemaKey, JSON.stringify(currentSchema)); | ||
// Emit change | ||
emitter.emit('change', { previousSchema, currentSchema }); | ||
// Previous schema is now the current schema | ||
previousSchema = currentSchema; | ||
@@ -194,0 +205,0 @@ } |
{ | ||
"name": "mongochangestream", | ||
"version": "0.17.0", | ||
"version": "0.18.0", | ||
"description": "Sync MongoDB collections via change streams into any database.", | ||
@@ -5,0 +5,0 @@ "author": "GovSpend", |
@@ -1,9 +0,3 @@ | ||
import { | ||
ChangeStreamDocument, | ||
ChangeStreamOptions, | ||
Collection, | ||
Document, | ||
} from 'mongodb' | ||
import { ChangeStreamOptions, Collection, Document } from 'mongodb' | ||
import _debug from 'debug' | ||
import { defer } from 'prom-utils' | ||
@@ -15,12 +9,5 @@ const debug = _debug('mongochangestream') | ||
pipeline: Document[], | ||
signal: AbortSignal, | ||
options: ChangeStreamOptions | ||
) => { | ||
const changeStream = collection.watch(pipeline, options) | ||
const deferred = defer() | ||
signal.onabort = async () => { | ||
deferred.done() | ||
await changeStream.close() | ||
debug('Closed change stream') | ||
} | ||
debug('Started change stream - pipeline %O options %O', pipeline, options) | ||
@@ -31,9 +18,5 @@ return { | ||
async next() { | ||
return Promise.race([ | ||
deferred.promise.then(() => ({ | ||
value: {} as ChangeStreamDocument, | ||
done: true, | ||
})), | ||
changeStream.next().then((data) => ({ value: data, done: false })), | ||
]) | ||
return changeStream | ||
.next() | ||
.then((data) => ({ value: data, done: false })) | ||
}, | ||
@@ -40,0 +23,0 @@ } |
@@ -18,3 +18,3 @@ import _ from 'lodash/fp.js' | ||
import type { default as Redis } from 'ioredis' | ||
import { batchQueue, QueueOptions } from 'prom-utils' | ||
import { batchQueue, defer, Deferred, QueueOptions } from 'prom-utils' | ||
import { | ||
@@ -142,2 +142,3 @@ generatePipelineFromOmit, | ||
const abortController = new AbortController() | ||
let deferred: Deferred | ||
// Redis keys | ||
@@ -155,3 +156,2 @@ const { changeStreamTokenKey } = keys | ||
[...omitPipeline, ...pipeline], | ||
abortController.signal, | ||
options | ||
@@ -162,2 +162,7 @@ ) | ||
debug('Change stream event %O', event) | ||
// Don't process event if stopping | ||
if (abortController.signal.aborted) { | ||
return | ||
} | ||
deferred = defer() | ||
// Get resume token | ||
@@ -174,2 +179,3 @@ const token = event?._id | ||
await redis.set(changeStreamTokenKey, JSON.stringify(token)) | ||
deferred.done() | ||
} | ||
@@ -179,2 +185,4 @@ } | ||
abortController.abort() | ||
// Wait for event to be processed | ||
return deferred?.promise | ||
} | ||
@@ -233,5 +241,7 @@ return { start, stop } | ||
debug('Schema change detected %O', currentSchema) | ||
emitter.emit('change', { initialSchema: previousSchema, currentSchema }) | ||
// Persist schema | ||
await redis.set(keys.schemaKey, JSON.stringify(currentSchema)) | ||
// Emit change | ||
emitter.emit('change', { previousSchema, currentSchema }) | ||
// Previous schema is now the current schema | ||
previousSchema = currentSchema | ||
@@ -238,0 +248,0 @@ } |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
32858
709