mongo2elastic
Advanced tools
Comparing version 0.35.0 to 0.36.0
@@ -0,1 +1,6 @@ | ||
# 0.36.0 | ||
- `processChangeStream` now batches records. Default timeout before the queue is automatically | ||
flushed is 30 seconds. | ||
# 0.35.0 | ||
@@ -2,0 +7,0 @@ |
@@ -10,4 +10,6 @@ import type { Collection } from 'mongodb'; | ||
* Process MongoDB change stream for the given collection. | ||
* `options.batchSize` defaults to 500. | ||
* `options.timeout` defaults to 30 seconds. | ||
*/ | ||
processChangeStream: (options?: ChangeStreamOptions) => Promise<{ | ||
processChangeStream: (options?: QueueOptions & ChangeStreamOptions) => Promise<{ | ||
start: () => Promise<void>; | ||
@@ -14,0 +16,0 @@ stop: () => Promise<void>; |
@@ -5,2 +5,9 @@ import _ from 'lodash/fp.js'; | ||
import { convertSchema } from './convertSchema.js'; | ||
/** | ||
* Filter errors from a bulk response | ||
*/ | ||
const getBulkErrors = (response) => response.items.filter((item) => item.create?.error || | ||
item.delete?.error || | ||
item.index?.error || | ||
item.update?.error); | ||
export const initSync = (redis, collection, elastic, options = {}) => { | ||
@@ -36,29 +43,45 @@ const mapper = options.mapper || _.omit(['_id']); | ||
/** | ||
* Process a change stream event. | ||
* Process change stream events. | ||
*/ | ||
const processRecord = async (doc) => { | ||
const processChangeStreamRecords = async (docs) => { | ||
try { | ||
if (doc.operationType === 'insert') { | ||
await elastic.create({ | ||
index, | ||
id: doc.fullDocument._id.toString(), | ||
document: mapper(doc.fullDocument), | ||
}); | ||
const operations = []; | ||
for (const doc of docs) { | ||
if (doc.operationType === 'insert') { | ||
operations.push([ | ||
{ create: { _index: index, _id: doc.fullDocument._id.toString() } }, | ||
mapper(doc.fullDocument), | ||
]); | ||
} | ||
else if (doc.operationType === 'update' || | ||
doc.operationType === 'replace') { | ||
const document = doc.fullDocument ? mapper(doc.fullDocument) : {}; | ||
operations.push([ | ||
{ index: { _index: index, _id: doc.documentKey._id.toString() } }, | ||
document, | ||
]); | ||
} | ||
else if (doc.operationType === 'delete') { | ||
operations.push([ | ||
{ delete: { _index: index, _id: doc.documentKey._id.toString() } }, | ||
]); | ||
} | ||
} | ||
else if (doc.operationType === 'update' || | ||
doc.operationType === 'replace') { | ||
const document = doc.fullDocument ? mapper(doc.fullDocument) : {}; | ||
await elastic.index({ | ||
index, | ||
id: doc.documentKey._id.toString(), | ||
document, | ||
const response = await elastic.bulk({ | ||
operations: operations.flat(), | ||
}); | ||
// There were errors | ||
if (response.errors) { | ||
const errors = getBulkErrors(response); | ||
const numErrors = errors.length; | ||
emit('process', { | ||
success: docs.length - numErrors, | ||
fail: numErrors, | ||
errors, | ||
changeStream: true, | ||
}); | ||
} | ||
else if (doc.operationType === 'delete') { | ||
await elastic.delete({ | ||
index, | ||
id: doc.documentKey._id.toString(), | ||
}); | ||
else { | ||
emit('process', { success: docs.length, changeStream: true }); | ||
} | ||
emit('process', { success: 1, changeStream: true }); | ||
} | ||
@@ -80,4 +103,5 @@ catch (e) { | ||
}); | ||
// There were errors | ||
if (response.errors) { | ||
const errors = response.items.filter((doc) => doc.create?.error); | ||
const errors = getBulkErrors(response); | ||
const numErrors = errors.length; | ||
@@ -99,3 +123,3 @@ emit('process', { | ||
}; | ||
const processChangeStream = (options) => sync.processChangeStream(processRecord, options); | ||
const processChangeStream = (options) => sync.processChangeStream(processChangeStreamRecords, options); | ||
const runInitialScan = (options) => sync.runInitialScan(processRecords, options); | ||
@@ -106,2 +130,4 @@ return { | ||
* Process MongoDB change stream for the given collection. | ||
* `options.batchSize` defaults to 500. | ||
* `options.timeout` defaults to 30 seconds. | ||
*/ | ||
@@ -108,0 +134,0 @@ processChangeStream, |
{ | ||
"name": "mongo2elastic", | ||
"version": "0.35.0", | ||
"version": "0.36.0", | ||
"description": "Sync MongoDB collections to Elasticsearch", | ||
@@ -57,6 +57,6 @@ "main": "dist/index.js", | ||
"minimatch": "^6.2.0", | ||
"mongochangestream": "^0.41.0", | ||
"mongochangestream": "^0.42.0", | ||
"obj-walker": "^1.7.0", | ||
"p-retry": "^5.1.1", | ||
"prom-utils": "^0.4.0" | ||
"prom-utils": "^0.5.0" | ||
}, | ||
@@ -63,0 +63,0 @@ "prettier": { |
@@ -17,3 +17,16 @@ import _ from 'lodash/fp.js' | ||
import { convertSchema } from './convertSchema.js' | ||
import { BulkResponse } from '@elastic/elasticsearch/lib/api/typesWithBodyKey.js' | ||
/** | ||
* Filter errors from a bulk response | ||
*/ | ||
const getBulkErrors = (response: BulkResponse) => | ||
response.items.filter( | ||
(item) => | ||
item.create?.error || | ||
item.delete?.error || | ||
item.index?.error || | ||
item.update?.error | ||
) | ||
export const initSync = ( | ||
@@ -62,29 +75,44 @@ redis: Redis, | ||
/** | ||
* Process a change stream event. | ||
* Process change stream events. | ||
*/ | ||
const processRecord = async (doc: ChangeStreamDocument) => { | ||
const processChangeStreamRecords = async (docs: ChangeStreamDocument[]) => { | ||
try { | ||
if (doc.operationType === 'insert') { | ||
await elastic.create({ | ||
index, | ||
id: doc.fullDocument._id.toString(), | ||
document: mapper(doc.fullDocument), | ||
const operations = [] | ||
for (const doc of docs) { | ||
if (doc.operationType === 'insert') { | ||
operations.push([ | ||
{ create: { _index: index, _id: doc.fullDocument._id.toString() } }, | ||
mapper(doc.fullDocument), | ||
]) | ||
} else if ( | ||
doc.operationType === 'update' || | ||
doc.operationType === 'replace' | ||
) { | ||
const document = doc.fullDocument ? mapper(doc.fullDocument) : {} | ||
operations.push([ | ||
{ index: { _index: index, _id: doc.documentKey._id.toString() } }, | ||
document, | ||
]) | ||
} else if (doc.operationType === 'delete') { | ||
operations.push([ | ||
{ delete: { _index: index, _id: doc.documentKey._id.toString() } }, | ||
]) | ||
} | ||
} | ||
const response = await elastic.bulk({ | ||
operations: operations.flat(), | ||
}) | ||
// There were errors | ||
if (response.errors) { | ||
const errors = getBulkErrors(response) | ||
const numErrors = errors.length | ||
emit('process', { | ||
success: docs.length - numErrors, | ||
fail: numErrors, | ||
errors, | ||
changeStream: true, | ||
}) | ||
} else if ( | ||
doc.operationType === 'update' || | ||
doc.operationType === 'replace' | ||
) { | ||
const document = doc.fullDocument ? mapper(doc.fullDocument) : {} | ||
await elastic.index({ | ||
index, | ||
id: doc.documentKey._id.toString(), | ||
document, | ||
}) | ||
} else if (doc.operationType === 'delete') { | ||
await elastic.delete({ | ||
index, | ||
id: doc.documentKey._id.toString(), | ||
}) | ||
} else { | ||
emit('process', { success: docs.length, changeStream: true }) | ||
} | ||
emit('process', { success: 1, changeStream: true }) | ||
} catch (e) { | ||
@@ -105,4 +133,5 @@ emit('error', { error: e, changeStream: true }) | ||
}) | ||
// There were errors | ||
if (response.errors) { | ||
const errors = response.items.filter((doc) => doc.create?.error) | ||
const errors = getBulkErrors(response) | ||
const numErrors = errors.length | ||
@@ -123,4 +152,4 @@ emit('process', { | ||
const processChangeStream = (options?: ChangeStreamOptions) => | ||
sync.processChangeStream(processRecord, options) | ||
const processChangeStream = (options?: QueueOptions & ChangeStreamOptions) => | ||
sync.processChangeStream(processChangeStreamRecords, options) | ||
const runInitialScan = (options?: QueueOptions & ScanOptions) => | ||
@@ -133,2 +162,4 @@ sync.runInitialScan(processRecords, options) | ||
* Process MongoDB change stream for the given collection. | ||
* `options.batchSize` defaults to 500. | ||
* `options.timeout` defaults to 30 seconds. | ||
*/ | ||
@@ -135,0 +166,0 @@ processChangeStream, |
44460
1157
+ Addedmongochangestream@0.42.0(transitive)
+ Addedprom-utils@0.5.0(transitive)
- Removedmongochangestream@0.41.0(transitive)
- Removedprom-utils@0.4.0(transitive)
Updatedmongochangestream@^0.42.0
Updatedprom-utils@^0.5.0