mongochangestream
Advanced tools
Comparing version 0.15.0 to 0.16.0
@@ -0,1 +1,6 @@ | ||
# 0.16.0 | ||
- BREAKING CHANGE: Changed API for `processChangeStream`. You must explicitly call `start` now. | ||
The change stream can be stopped by calling `stop`. | ||
# 0.15.0 | ||
@@ -2,0 +7,0 @@ |
import { ChangeStreamDocument, ChangeStreamOptions, Collection, Document } from 'mongodb'; | ||
declare const changeStreamToIterator: (collection: Collection, pipeline: Document[], options: ChangeStreamOptions) => { | ||
declare const changeStreamToIterator: (collection: Collection, pipeline: Document[], signal: AbortSignal, options: ChangeStreamOptions) => { | ||
[Symbol.asyncIterator](): { | ||
@@ -7,2 +7,5 @@ next(): Promise<{ | ||
done: boolean; | ||
} | { | ||
value: ChangeStreamDocument<Document>; | ||
done: boolean; | ||
}>; | ||
@@ -9,0 +12,0 @@ }; |
@@ -7,5 +7,12 @@ "use strict"; | ||
const debug_1 = __importDefault(require("debug")); | ||
const debug = (0, debug_1.default)('mongoChangeStream'); | ||
const changeStreamToIterator = (collection, pipeline, options) => { | ||
const prom_utils_1 = require("prom-utils"); | ||
const debug = (0, debug_1.default)('mongochangestream'); | ||
const changeStreamToIterator = (collection, pipeline, signal, options) => { | ||
const changeStream = collection.watch(pipeline, options); | ||
const deferred = (0, prom_utils_1.defer)(); | ||
signal.onabort = async () => { | ||
deferred.done(); | ||
await changeStream.close(); | ||
debug('Closed change stream'); | ||
}; | ||
debug('Started change stream - pipeline %O options %O', pipeline, options); | ||
@@ -16,8 +23,9 @@ return { | ||
async next() { | ||
if (changeStream.closed) { | ||
return { value: {}, done: true }; | ||
} | ||
return changeStream | ||
.next() | ||
.then((data) => ({ value: data, done: false })); | ||
return Promise.race([ | ||
deferred.promise.then(() => ({ | ||
value: {}, | ||
done: true, | ||
})), | ||
changeStream.next().then((data) => ({ value: data, done: false })), | ||
]); | ||
}, | ||
@@ -24,0 +32,0 @@ }; |
@@ -21,5 +21,8 @@ import _ from 'lodash/fp.js'; | ||
runInitialScan: (collection: Collection, processRecords: ProcessRecords, options?: QueueOptions & ScanOptions) => Promise<void>; | ||
processChangeStream: (collection: Collection, processRecord: ProcessRecord, pipeline?: Document[]) => Promise<void>; | ||
processChangeStream: (collection: Collection, processRecord: ProcessRecord, pipeline?: Document[]) => Promise<{ | ||
start: () => Promise<void>; | ||
stop: () => void; | ||
}>; | ||
reset: (collection: Collection) => Promise<void>; | ||
clearCompletedOn: (collection: Collection) => Promise<void>; | ||
}; |
@@ -99,4 +99,8 @@ "use strict"; | ||
* those fields will be prepended to the `pipeline` argument. | ||
* | ||
* Call `start` to start processing events and `stop` to close | ||
* the change stream. | ||
*/ | ||
const processChangeStream = async (collection, processRecord, pipeline = []) => { | ||
const abortController = new AbortController(); | ||
// Redis keys | ||
@@ -111,18 +115,23 @@ const { changeStreamTokenKey } = (0, exports.getKeys)(collection); | ||
// Get the change stream as an async iterator | ||
const changeStream = (0, changeStreamToIterator_js_1.default)(collection, [...omitPipeline, ...pipeline], options); | ||
// Consume the events | ||
for await (let event of changeStream) { | ||
debug('Change stream event %O', event); | ||
// Get resume token | ||
const token = event?._id; | ||
// Omit nested fields that are not handled by $unset. | ||
// For example, if 'a' was omitted then 'a.b.c' should be omitted. | ||
if (event.operationType === 'update' && omit) { | ||
event = (0, util_js_1.omitFieldForUpdate)(omit)(event); | ||
const changeStream = (0, changeStreamToIterator_js_1.default)(collection, [...omitPipeline, ...pipeline], abortController.signal, options); | ||
const start = async () => { | ||
for await (let event of changeStream) { | ||
debug('Change stream event %O', event); | ||
// Get resume token | ||
const token = event?._id; | ||
// Omit nested fields that are not handled by $unset. | ||
// For example, if 'a' was omitted then 'a.b.c' should be omitted. | ||
if (event.operationType === 'update' && omit) { | ||
event = (0, util_js_1.omitFieldForUpdate)(omit)(event); | ||
} | ||
// Process record | ||
await processRecord(event); | ||
// Update change stream token | ||
await redis.set(changeStreamTokenKey, JSON.stringify(token)); | ||
} | ||
// Process record | ||
await processRecord(event); | ||
// Update change stream token | ||
await redis.set(changeStreamTokenKey, JSON.stringify(token)); | ||
} | ||
}; | ||
const stop = () => { | ||
abortController.abort(); | ||
}; | ||
return { start, stop }; | ||
}; | ||
@@ -129,0 +138,0 @@ /** |
{ | ||
"name": "mongochangestream", | ||
"version": "0.15.0", | ||
"version": "0.16.0", | ||
"description": "Sync MongoDB collections via change streams into any database.", | ||
@@ -39,4 +39,2 @@ "author": "GovSpend", | ||
"eslint": "^8.21.0", | ||
"ioredis": "^5.2.2", | ||
"mongodb": "^4.8.1", | ||
"prettier": "^2.7.1", | ||
@@ -48,3 +46,3 @@ "typescript": "^4.7.4" | ||
"lodash": "^4.17.21", | ||
"prom-utils": "^0.2.0" | ||
"prom-utils": "^0.3.0" | ||
}, | ||
@@ -51,0 +49,0 @@ "peerDependencies": { |
@@ -39,6 +39,12 @@ # Mongo Change Stream | ||
} | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
console.dir(docs, { depth: 10 }) | ||
} | ||
// Sync collection | ||
const sync = initSync(redis) | ||
await sync.syncCollection(coll, processRecord) | ||
sync.syncCollection(coll, processRecord) | ||
const changeStream = await sync.processChangeStream(coll, processRecord) | ||
changeStream.start() | ||
setTimeout(changeStream.stop, 30000) | ||
``` | ||
@@ -57,4 +63,6 @@ | ||
export type ProcessRecord = ( | ||
doc: ChangeStreamDocument | ChangeStreamDocument[] | ||
export type ProcessRecord = (doc: ChangeStreamDocument) => void | Promise<void> | ||
export type ProcessRecords = ( | ||
doc: ChangeStreamInsertDocument[] | ||
) => void | Promise<void> | ||
@@ -64,5 +72,5 @@ | ||
collection: Collection, | ||
processRecord: ProcessRecord, | ||
processRecords: ProcessRecords, | ||
options?: QueueOptions & ScanOptions | ||
): Promise<void> => ... | ||
) | ||
@@ -73,5 +81,3 @@ const processChangeStream = async ( | ||
pipeline?: Document[] | ||
): Promise<void> => ... | ||
const reset = async (collection: Collection): Promise<void> => ... | ||
) | ||
``` | ||
@@ -78,0 +84,0 @@ |
@@ -8,4 +8,5 @@ import { | ||
import _debug from 'debug' | ||
import { defer } from 'prom-utils' | ||
const debug = _debug('mongoChangeStream') | ||
const debug = _debug('mongochangestream') | ||
@@ -15,5 +16,12 @@ const changeStreamToIterator = ( | ||
pipeline: Document[], | ||
signal: AbortSignal, | ||
options: ChangeStreamOptions | ||
) => { | ||
const changeStream = collection.watch(pipeline, options) | ||
const deferred = defer() | ||
signal.onabort = async () => { | ||
deferred.done() | ||
await changeStream.close() | ||
debug('Closed change stream') | ||
} | ||
debug('Started change stream - pipeline %O options %O', pipeline, options) | ||
@@ -24,8 +32,9 @@ return { | ||
async next() { | ||
if (changeStream.closed) { | ||
return { value: {} as ChangeStreamDocument, done: true } | ||
} | ||
return changeStream | ||
.next() | ||
.then((data) => ({ value: data, done: false })) | ||
return Promise.race([ | ||
deferred.promise.then(() => ({ | ||
value: {} as ChangeStreamDocument, | ||
done: true, | ||
})), | ||
changeStream.next().then((data) => ({ value: data, done: false })), | ||
]) | ||
}, | ||
@@ -32,0 +41,0 @@ } |
@@ -123,2 +123,5 @@ import _ from 'lodash/fp.js' | ||
* those fields will be prepended to the `pipeline` argument. | ||
* | ||
* Call `start` to start processing events and `stop` to close | ||
* the change stream. | ||
*/ | ||
@@ -130,2 +133,3 @@ const processChangeStream = async ( | ||
) => { | ||
const abortController = new AbortController() | ||
// Redis keys | ||
@@ -143,19 +147,25 @@ const { changeStreamTokenKey } = getKeys(collection) | ||
[...omitPipeline, ...pipeline], | ||
abortController.signal, | ||
options | ||
) | ||
// Consume the events | ||
for await (let event of changeStream) { | ||
debug('Change stream event %O', event) | ||
// Get resume token | ||
const token = event?._id | ||
// Omit nested fields that are not handled by $unset. | ||
// For example, if 'a' was omitted then 'a.b.c' should be omitted. | ||
if (event.operationType === 'update' && omit) { | ||
event = omitFieldForUpdate(omit)(event) | ||
const start = async () => { | ||
for await (let event of changeStream) { | ||
debug('Change stream event %O', event) | ||
// Get resume token | ||
const token = event?._id | ||
// Omit nested fields that are not handled by $unset. | ||
// For example, if 'a' was omitted then 'a.b.c' should be omitted. | ||
if (event.operationType === 'update' && omit) { | ||
event = omitFieldForUpdate(omit)(event) | ||
} | ||
// Process record | ||
await processRecord(event) | ||
// Update change stream token | ||
await redis.set(changeStreamTokenKey, JSON.stringify(token)) | ||
} | ||
// Process record | ||
await processRecord(event) | ||
// Update change stream token | ||
await redis.set(changeStreamTokenKey, JSON.stringify(token)) | ||
} | ||
const stop = () => { | ||
abortController.abort() | ||
} | ||
return { start, stop } | ||
} | ||
@@ -162,0 +172,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
28580
6
594
166
+ Addedprom-utils@0.3.0(transitive)
- Removedprom-utils@0.2.0(transitive)
Updatedprom-utils@^0.3.0