mongo2elastic
Advanced tools
Comparing version 0.21.0 to 0.22.0
@@ -0,1 +1,6 @@ | ||
# 0.22.0 | ||
- Latest `mongochangestream` - generic emitter. | ||
- Use emitter from `mongochangestream` which now emits two events on its own. | ||
# 0.21.0 | ||
@@ -2,0 +7,0 @@ |
@@ -6,4 +6,3 @@ import type { Collection } from 'mongodb'; | ||
import { QueueOptions } from 'prom-utils'; | ||
import { SyncOptions, Events, ConvertOptions } from './types.js'; | ||
import EventEmitter from 'eventemitter3'; | ||
import { SyncOptions, ConvertOptions } from './types.js'; | ||
export declare const initSync: (redis: Redis, collection: Collection, elastic: elasticsearch.Client, options?: SyncOptions & mongoChangeStream.SyncOptions) => { | ||
@@ -48,5 +47,4 @@ /** | ||
stop: () => void; | ||
emitter: EventEmitter<"change", any>; | ||
}>; | ||
emitter: EventEmitter<Events, any>; | ||
emitter: import("eventemitter3")<string | symbol, any>; | ||
}; |
@@ -5,7 +5,12 @@ import _ from 'lodash/fp.js'; | ||
import { convertSchema } from './convertSchema.js'; | ||
import EventEmitter from 'eventemitter3'; | ||
export const initSync = (redis, collection, elastic, options = {}) => { | ||
const mapper = options.mapper || _.omit(['_id']); | ||
const index = options.index || indexFromCollection(collection); | ||
const emitter = new EventEmitter(); | ||
// Initialize sync | ||
const sync = mongoChangeStream.initSync(redis, collection, options); | ||
// Use emitter from mongochangestream | ||
const emitter = sync.emitter; | ||
const emit = (event, data) => { | ||
emitter.emit(event, { type: event, ...data }); | ||
}; | ||
const ignoreMalformed = async (settings = {}) => { | ||
@@ -57,6 +62,6 @@ const obj = { | ||
} | ||
emitter.emit('process', { type: 'process', success: 1 }); | ||
emit('process', { success: 1 }); | ||
} | ||
catch (e) { | ||
emitter.emit('error', { type: 'error', error: e }); | ||
emit('error', { error: e }); | ||
} | ||
@@ -78,4 +83,3 @@ }; | ||
const numErrors = errors.length; | ||
emitter.emit('process', { | ||
type: 'process', | ||
emit('process', { | ||
success: docs.length - numErrors, | ||
@@ -86,10 +90,9 @@ fail: numErrors, | ||
else { | ||
emitter.emit('process', { type: 'process', success: docs.length }); | ||
emit('process', { success: docs.length }); | ||
} | ||
} | ||
catch (e) { | ||
emitter.emit('error', { type: 'error', error: e }); | ||
emit('error', { error: e }); | ||
} | ||
}; | ||
const sync = mongoChangeStream.initSync(redis, collection, options); | ||
const processChangeStream = (options) => sync.processChangeStream(processRecord, options); | ||
@@ -96,0 +99,0 @@ const runInitialScan = (options) => sync.runInitialScan(processRecords, options); |
{ | ||
"name": "mongo2elastic", | ||
"version": "0.21.0", | ||
"version": "0.22.0", | ||
"description": "Sync MongoDB collections to Elasticsearch", | ||
@@ -56,3 +56,3 @@ "main": "dist/index.js", | ||
"lodash": "^4.17.21", | ||
"mongochangestream": "^0.26.1", | ||
"mongochangestream": "^0.29.0", | ||
"obj-walker": "^1.1.1", | ||
@@ -59,0 +59,0 @@ "p-retry": "^5.1.1", |
@@ -9,3 +9,6 @@ import _ from 'lodash/fp.js' | ||
import elasticsearch from '@elastic/elasticsearch' | ||
import mongoChangeStream, { ScanOptions, ChangeStreamOptions } from 'mongochangestream' | ||
import mongoChangeStream, { | ||
ScanOptions, | ||
ChangeStreamOptions, | ||
} from 'mongochangestream' | ||
import { QueueOptions } from 'prom-utils' | ||
@@ -15,3 +18,2 @@ import { SyncOptions, Events, ConvertOptions } from './types.js' | ||
import { convertSchema } from './convertSchema.js' | ||
import EventEmitter from 'eventemitter3' | ||
@@ -26,3 +28,9 @@ export const initSync = ( | ||
const index = options.index || indexFromCollection(collection) | ||
const emitter = new EventEmitter<Events>() | ||
// Initialize sync | ||
const sync = mongoChangeStream.initSync(redis, collection, options) | ||
// Use emitter from mongochangestream | ||
const emitter = sync.emitter | ||
const emit = (event: Events, data: object) => { | ||
emitter.emit(event, { type: event, ...data }) | ||
} | ||
@@ -82,5 +90,5 @@ const ignoreMalformed = async (settings: object = {}) => { | ||
} | ||
emitter.emit('process', { type: 'process', success: 1 }) | ||
emit('process', { success: 1 }) | ||
} catch (e) { | ||
emitter.emit('error', { type: 'error', error: e }) | ||
emit('error', { error: e }) | ||
} | ||
@@ -102,4 +110,3 @@ } | ||
const numErrors = errors.length | ||
emitter.emit('process', { | ||
type: 'process', | ||
emit('process', { | ||
success: docs.length - numErrors, | ||
@@ -109,10 +116,9 @@ fail: numErrors, | ||
} else { | ||
emitter.emit('process', { type: 'process', success: docs.length }) | ||
emit('process', { success: docs.length }) | ||
} | ||
} catch (e) { | ||
emitter.emit('error', { type: 'error', error: e }) | ||
emit('error', { error: e }) | ||
} | ||
} | ||
const sync = mongoChangeStream.initSync(redis, collection, options) | ||
const processChangeStream = (options?: ChangeStreamOptions) => | ||
@@ -119,0 +125,0 @@ sync.processChangeStream(processRecord, options) |
32619
879
+ Addedmongochangestream@0.29.0(transitive)
- Removedmongochangestream@0.26.1(transitive)
Updatedmongochangestream@^0.29.0