mongo2elastic
Advanced tools
Comparing version 0.37.0 to 0.38.0
@@ -0,1 +1,5 @@ | ||
# 0.38.0 | ||
- Added `operationCounts` to the `process` event. | ||
# 0.37.0 | ||
@@ -2,0 +6,0 @@ |
@@ -12,2 +12,10 @@ import _ from 'lodash/fp.js'; | ||
item.update?.error); | ||
const getInitialCounts = () => { | ||
const operationTypes = ['insert', 'update', 'replace', 'delete']; | ||
const counts = {}; | ||
for (const operationType of operationTypes) { | ||
counts[operationType] = 0; | ||
} | ||
return counts; | ||
}; | ||
export const initSync = (redis, collection, elastic, options = {}) => { | ||
@@ -48,4 +56,6 @@ const mapper = options.mapper || _.omit(['_id']); | ||
const operations = []; | ||
const operationCounts = getInitialCounts(); | ||
for (const doc of docs) { | ||
if (doc.operationType === 'insert') { | ||
operationCounts[doc.operationType]++; | ||
operations.push([ | ||
@@ -58,2 +68,3 @@ { create: { _index: index, _id: doc.fullDocument._id.toString() } }, | ||
doc.operationType === 'replace') { | ||
operationCounts[doc.operationType]++; | ||
const document = doc.fullDocument ? mapper(doc.fullDocument) : {}; | ||
@@ -66,2 +77,3 @@ operations.push([ | ||
else if (doc.operationType === 'delete') { | ||
operationCounts[doc.operationType]++; | ||
operations.push([ | ||
@@ -84,6 +96,11 @@ { delete: { _index: index, _id: doc.documentKey._id.toString() } }, | ||
changeStream: true, | ||
operationCounts, | ||
}); | ||
} | ||
else { | ||
emit('process', { success: docs.length, changeStream: true }); | ||
emit('process', { | ||
success: docs.length, | ||
changeStream: true, | ||
operationCounts, | ||
}); | ||
} | ||
@@ -99,2 +116,3 @@ } | ||
const processRecords = async (docs) => { | ||
const operationCounts = { insert: docs.length }; | ||
try { | ||
@@ -116,6 +134,11 @@ const response = await elastic.bulk({ | ||
initialScan: true, | ||
operationCounts, | ||
}); | ||
} | ||
else { | ||
emit('process', { success: docs.length, initialScan: true }); | ||
emit('process', { | ||
success: docs.length, | ||
initialScan: true, | ||
operationCounts, | ||
}); | ||
} | ||
@@ -122,0 +145,0 @@ } |
{ | ||
"name": "mongo2elastic", | ||
"version": "0.37.0", | ||
"version": "0.38.0", | ||
"description": "Sync MongoDB collections to Elasticsearch", | ||
@@ -5,0 +5,0 @@ "main": "dist/index.js", |
@@ -31,2 +31,11 @@ import _ from 'lodash/fp.js' | ||
const getInitialCounts = () => { | ||
const operationTypes = ['insert', 'update', 'replace', 'delete'] | ||
const counts: Record<string, number> = {} | ||
for (const operationType of operationTypes) { | ||
counts[operationType] = 0 | ||
} | ||
return counts | ||
} | ||
export const initSync = ( | ||
@@ -80,4 +89,6 @@ redis: Redis, | ||
const operations = [] | ||
const operationCounts = getInitialCounts() | ||
for (const doc of docs) { | ||
if (doc.operationType === 'insert') { | ||
operationCounts[doc.operationType]++ | ||
operations.push([ | ||
@@ -91,2 +102,3 @@ { create: { _index: index, _id: doc.fullDocument._id.toString() } }, | ||
) { | ||
operationCounts[doc.operationType]++ | ||
const document = doc.fullDocument ? mapper(doc.fullDocument) : {} | ||
@@ -98,2 +110,3 @@ operations.push([ | ||
} else if (doc.operationType === 'delete') { | ||
operationCounts[doc.operationType]++ | ||
operations.push([ | ||
@@ -116,5 +129,10 @@ { delete: { _index: index, _id: doc.documentKey._id.toString() } }, | ||
changeStream: true, | ||
operationCounts, | ||
}) | ||
} else { | ||
emit('process', { success: docs.length, changeStream: true }) | ||
emit('process', { | ||
success: docs.length, | ||
changeStream: true, | ||
operationCounts, | ||
}) | ||
} | ||
@@ -129,2 +147,3 @@ } catch (e) { | ||
const processRecords = async (docs: ChangeStreamInsertDocument[]) => { | ||
const operationCounts = { insert: docs.length } | ||
try { | ||
@@ -146,5 +165,10 @@ const response = await elastic.bulk({ | ||
initialScan: true, | ||
operationCounts, | ||
}) | ||
} else { | ||
emit('process', { success: docs.length, initialScan: true }) | ||
emit('process', { | ||
success: docs.length, | ||
initialScan: true, | ||
operationCounts, | ||
}) | ||
} | ||
@@ -151,0 +175,0 @@ } catch (e) { |
46079
1203