Socket
Socket
Sign inDemoInstall

mongo2elastic

Package Overview
Dependencies
248
Maintainers
1
Versions
49
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.35.0 to 0.36.0

5

CHANGELOG.md

@@ -0,1 +1,6 @@

# 0.36.0
- `processChangeStream` now batches records. Default timeout before the queue is automatically
flushed is 30 seconds.
# 0.35.0

@@ -2,0 +7,0 @@

4

dist/syncData.d.ts

@@ -10,4 +10,6 @@ import type { Collection } from 'mongodb';

* Process MongoDB change stream for the given collection.
* `options.batchSize` defaults to 500.
* `options.timeout` defaults to 30 seconds.
*/
processChangeStream: (options?: ChangeStreamOptions) => Promise<{
processChangeStream: (options?: QueueOptions & ChangeStreamOptions) => Promise<{
start: () => Promise<void>;

@@ -14,0 +16,0 @@ stop: () => Promise<void>;

@@ -5,2 +5,9 @@ import _ from 'lodash/fp.js';

import { convertSchema } from './convertSchema.js';
/**
* Filter errors from a bulk response
*/
const getBulkErrors = (response) => response.items.filter((item) => item.create?.error ||
item.delete?.error ||
item.index?.error ||
item.update?.error);
export const initSync = (redis, collection, elastic, options = {}) => {

@@ -36,29 +43,45 @@ const mapper = options.mapper || _.omit(['_id']);

/**
* Process a change stream event.
* Process change stream events.
*/
const processRecord = async (doc) => {
const processChangeStreamRecords = async (docs) => {
try {
if (doc.operationType === 'insert') {
await elastic.create({
index,
id: doc.fullDocument._id.toString(),
document: mapper(doc.fullDocument),
});
const operations = [];
for (const doc of docs) {
if (doc.operationType === 'insert') {
operations.push([
{ create: { _index: index, _id: doc.fullDocument._id.toString() } },
mapper(doc.fullDocument),
]);
}
else if (doc.operationType === 'update' ||
doc.operationType === 'replace') {
const document = doc.fullDocument ? mapper(doc.fullDocument) : {};
operations.push([
{ index: { _index: index, _id: doc.documentKey._id.toString() } },
document,
]);
}
else if (doc.operationType === 'delete') {
operations.push([
{ delete: { _index: index, _id: doc.documentKey._id.toString() } },
]);
}
}
else if (doc.operationType === 'update' ||
doc.operationType === 'replace') {
const document = doc.fullDocument ? mapper(doc.fullDocument) : {};
await elastic.index({
index,
id: doc.documentKey._id.toString(),
document,
const response = await elastic.bulk({
operations: operations.flat(),
});
// There were errors
if (response.errors) {
const errors = getBulkErrors(response);
const numErrors = errors.length;
emit('process', {
success: docs.length - numErrors,
fail: numErrors,
errors,
changeStream: true,
});
}
else if (doc.operationType === 'delete') {
await elastic.delete({
index,
id: doc.documentKey._id.toString(),
});
else {
emit('process', { success: docs.length, changeStream: true });
}
emit('process', { success: 1, changeStream: true });
}

@@ -80,4 +103,5 @@ catch (e) {

});
// There were errors
if (response.errors) {
const errors = response.items.filter((doc) => doc.create?.error);
const errors = getBulkErrors(response);
const numErrors = errors.length;

@@ -99,3 +123,3 @@ emit('process', {

};
const processChangeStream = (options) => sync.processChangeStream(processRecord, options);
const processChangeStream = (options) => sync.processChangeStream(processChangeStreamRecords, options);
const runInitialScan = (options) => sync.runInitialScan(processRecords, options);

@@ -106,2 +130,4 @@ return {

* Process MongoDB change stream for the given collection.
* `options.batchSize` defaults to 500.
* `options.timeout` defaults to 30 seconds.
*/

@@ -108,0 +134,0 @@ processChangeStream,

{
"name": "mongo2elastic",
"version": "0.35.0",
"version": "0.36.0",
"description": "Sync MongoDB collections to Elasticsearch",

@@ -57,6 +57,6 @@ "main": "dist/index.js",

"minimatch": "^6.2.0",
"mongochangestream": "^0.41.0",
"mongochangestream": "^0.42.0",
"obj-walker": "^1.7.0",
"p-retry": "^5.1.1",
"prom-utils": "^0.4.0"
"prom-utils": "^0.5.0"
},

@@ -63,0 +63,0 @@ "prettier": {

@@ -17,3 +17,16 @@ import _ from 'lodash/fp.js'

import { convertSchema } from './convertSchema.js'
import { BulkResponse } from '@elastic/elasticsearch/lib/api/typesWithBodyKey.js'
/**
* Filter errors from a bulk response
*/
const getBulkErrors = (response: BulkResponse) =>
response.items.filter(
(item) =>
item.create?.error ||
item.delete?.error ||
item.index?.error ||
item.update?.error
)
export const initSync = (

@@ -62,29 +75,44 @@ redis: Redis,

/**
* Process a change stream event.
* Process change stream events.
*/
const processRecord = async (doc: ChangeStreamDocument) => {
const processChangeStreamRecords = async (docs: ChangeStreamDocument[]) => {
try {
if (doc.operationType === 'insert') {
await elastic.create({
index,
id: doc.fullDocument._id.toString(),
document: mapper(doc.fullDocument),
const operations = []
for (const doc of docs) {
if (doc.operationType === 'insert') {
operations.push([
{ create: { _index: index, _id: doc.fullDocument._id.toString() } },
mapper(doc.fullDocument),
])
} else if (
doc.operationType === 'update' ||
doc.operationType === 'replace'
) {
const document = doc.fullDocument ? mapper(doc.fullDocument) : {}
operations.push([
{ index: { _index: index, _id: doc.documentKey._id.toString() } },
document,
])
} else if (doc.operationType === 'delete') {
operations.push([
{ delete: { _index: index, _id: doc.documentKey._id.toString() } },
])
}
}
const response = await elastic.bulk({
operations: operations.flat(),
})
// There were errors
if (response.errors) {
const errors = getBulkErrors(response)
const numErrors = errors.length
emit('process', {
success: docs.length - numErrors,
fail: numErrors,
errors,
changeStream: true,
})
} else if (
doc.operationType === 'update' ||
doc.operationType === 'replace'
) {
const document = doc.fullDocument ? mapper(doc.fullDocument) : {}
await elastic.index({
index,
id: doc.documentKey._id.toString(),
document,
})
} else if (doc.operationType === 'delete') {
await elastic.delete({
index,
id: doc.documentKey._id.toString(),
})
} else {
emit('process', { success: docs.length, changeStream: true })
}
emit('process', { success: 1, changeStream: true })
} catch (e) {

@@ -105,4 +133,5 @@ emit('error', { error: e, changeStream: true })

})
// There were errors
if (response.errors) {
const errors = response.items.filter((doc) => doc.create?.error)
const errors = getBulkErrors(response)
const numErrors = errors.length

@@ -123,4 +152,4 @@ emit('process', {

const processChangeStream = (options?: ChangeStreamOptions) =>
sync.processChangeStream(processRecord, options)
const processChangeStream = (options?: QueueOptions & ChangeStreamOptions) =>
sync.processChangeStream(processChangeStreamRecords, options)
const runInitialScan = (options?: QueueOptions & ScanOptions) =>

@@ -133,2 +162,4 @@ sync.runInitialScan(processRecords, options)

* Process MongoDB change stream for the given collection.
* `options.batchSize` defaults to 500.
* `options.timeout` defaults to 30 seconds.
*/

@@ -135,0 +166,0 @@ processChangeStream,

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc