mongo2elastic
Advanced tools
Comparing version 0.38.0 to 0.39.0
@@ -0,1 +1,6 @@ | ||
# 0.39.0 | ||
- Removed `options.mapper` in favor of `options.rename` which takes an object of dotted path to renamed dotted path. | ||
The `rename` option is available for syncing and converting a schema to a table.`_id` mapped to `_mongoId` by default. | ||
# 0.38.0 | ||
@@ -12,3 +17,3 @@ | ||
- `processChangeStream` now batches records. Default timeout before the queue is automatically | ||
flushed is 30 seconds. | ||
flushed is 30 seconds. | ||
@@ -15,0 +20,0 @@ # 0.35.0 |
import { estypes } from '@elastic/elasticsearch'; | ||
import { ConvertOptions } from './types.js'; | ||
import makeError from 'make-error'; | ||
import { JSONSchema } from 'mongochangestream'; | ||
export declare const Mongo2ElasticError: makeError.Constructor<makeError.BaseError>; | ||
/** | ||
@@ -9,2 +11,2 @@ * Convert MongoDB JSON schema to Elasticsearch mapping. | ||
*/ | ||
export declare const convertSchema: (jsonSchema: JSONSchema, options?: ConvertOptions) => Record<string, estypes.MappingProperty>; | ||
export declare const convertSchema: (jsonSchema: JSONSchema, options?: ConvertOptions) => estypes.MappingPropertyBase; |
import _ from 'lodash/fp.js'; | ||
import { map } from 'obj-walker'; | ||
import { map, walker } from 'obj-walker'; | ||
import { minimatch } from 'minimatch'; | ||
import makeError from 'make-error'; | ||
import { traverseSchema } from 'mongochangestream'; | ||
import { arrayStartsWith } from './util.js'; | ||
export const Mongo2ElasticError = makeError('Mongo2ElasticError'); | ||
const bsonTypeToElastic = { | ||
@@ -57,8 +61,6 @@ number: 'long', | ||
const overrides = options.overrides || []; | ||
const mapper = (node) => { | ||
const { key, parents, path } = node; | ||
let { val } = node; | ||
const omitNodes = (node) => { | ||
const { val, path } = node; | ||
if (val?.bsonType) { | ||
const cleanPath = cleanupPath(path); | ||
const stringPath = cleanPath.join('.'); | ||
// Optionally omit field | ||
@@ -68,6 +70,2 @@ if (omit.find(_.isEqual(cleanPath))) { | ||
} | ||
// Ignore top-level _id field | ||
if (key === '_id' && parents.length === 2) { | ||
return; | ||
} | ||
// Use the first type if multi-valued | ||
@@ -77,2 +75,44 @@ if (Array.isArray(val.bsonType)) { | ||
} | ||
} | ||
return val; | ||
}; | ||
const handleRename = (schema, rename) => { | ||
for (const dottedPath in rename) { | ||
const oldPath = dottedPath.split('.'); | ||
const newPath = rename[dottedPath].split('.'); | ||
// Only allow renames such that nodes still keep the same parent | ||
if (!arrayStartsWith(oldPath, newPath.slice(0, -1))) { | ||
throw new Mongo2ElasticError(`Rename path prefix does not match: ${dottedPath}`); | ||
} | ||
} | ||
// Walk every subschema, renaming properties | ||
walker(schema, ({ val: { bsonType, properties }, path }) => { | ||
// Only objects can have their properties renamed | ||
if (bsonType !== 'object' || !properties) { | ||
return; | ||
} | ||
const cleanPath = _.pull('_items', path); | ||
for (const key in properties) { | ||
const childPath = [...cleanPath, key].join('.'); | ||
// Property name to which property `key` should be renamed to | ||
const newProperty = _.last(rename[childPath]?.split('.')); | ||
// Make sure we don't overwrite existing properties | ||
if (newProperty !== undefined && newProperty in properties) { | ||
throw new Mongo2ElasticError(`Renaming ${childPath} to ${rename[childPath]} will overwrite property "${newProperty}"`); | ||
} | ||
// Actually rename property | ||
if (newProperty) { | ||
const child = properties[key]; | ||
delete properties[key]; | ||
properties[newProperty] = child; | ||
} | ||
} | ||
}, { traverse: traverseSchema }); | ||
}; | ||
const overrideNodes = (node) => { | ||
let { val } = node; | ||
const { path } = node; | ||
if (val?.bsonType) { | ||
const cleanPath = cleanupPath(path); | ||
const stringPath = cleanPath.join('.'); | ||
// Optionally override field | ||
@@ -94,3 +134,5 @@ const overrideMatch = overrides.find(({ path }) => minimatch(stringPath, path)); | ||
// Recursively convert the schema | ||
return map(jsonSchema, mapper); | ||
const schema = map(jsonSchema, omitNodes); | ||
handleRename(schema, { _id: '_mongoId', ...options.rename }); | ||
return map(schema, overrideNodes); | ||
}; |
import _ from 'lodash/fp.js'; | ||
import mongoChangeStream from 'mongochangestream'; | ||
import { indexFromCollection } from './util.js'; | ||
import { renameKeys, indexFromCollection } from './util.js'; | ||
import { convertSchema } from './convertSchema.js'; | ||
@@ -21,3 +21,3 @@ /** | ||
export const initSync = (redis, collection, elastic, options = {}) => { | ||
const mapper = options.mapper || _.omit(['_id']); | ||
const mapper = (doc) => renameKeys(doc, { _id: '_mongoId', ...options.rename }); | ||
const index = options.index || indexFromCollection(collection); | ||
@@ -24,0 +24,0 @@ // Initialize sync |
import type { Document } from 'mongodb'; | ||
import { JSONSchema } from 'mongochangestream'; | ||
export interface SyncOptions { | ||
interface RenameOption { | ||
/** Dotted path to renamed dotted path */ | ||
rename?: Record<string, string>; | ||
} | ||
export interface SyncOptions extends RenameOption { | ||
mapper?: (doc: Document) => Document; | ||
@@ -11,3 +15,3 @@ index?: string; | ||
} | ||
export interface ConvertOptions { | ||
export interface ConvertOptions extends RenameOption { | ||
omit?: string[]; | ||
@@ -18,1 +22,2 @@ overrides?: Override[]; | ||
export type Events = 'process' | 'error'; | ||
export {}; |
@@ -1,3 +0,9 @@ | ||
import { Collection } from 'mongodb'; | ||
import { Document, Collection } from 'mongodb'; | ||
export declare const indexFromCollection: (collection: Collection) => string; | ||
export declare const indexFromDbAndCollection: (collection: Collection) => string; | ||
/** | ||
* Does arr start with startsWith array. | ||
*/ | ||
export declare const arrayStartsWith: (arr: any[], startsWith: any[]) => boolean; | ||
export declare const renameKey: (doc: Document, key: string, newKey: string) => any; | ||
export declare const renameKeys: (doc: Document, keys: Record<string, string>) => Document; |
@@ -0,2 +1,25 @@ | ||
import _ from 'lodash/fp.js'; | ||
export const indexFromCollection = (collection) => collection.collectionName.toLowerCase(); | ||
export const indexFromDbAndCollection = (collection) => `${collection.dbName.toLowerCase()}_${collection.collectionName.toLowerCase()}`; | ||
/** | ||
* Does arr start with startsWith array. | ||
*/ | ||
export const arrayStartsWith = (arr, startsWith) => { | ||
for (let i = 0; i < startsWith.length; i++) { | ||
if (arr[i] !== startsWith[i]) { | ||
return false; | ||
} | ||
} | ||
return true; | ||
}; | ||
export const renameKey = (doc, key, newKey) => _.flow(_.set(newKey, _.get(key, doc)), _.omit([key]))(doc); | ||
export const renameKeys = (doc, keys) => { | ||
let newDoc = doc; | ||
for (const key in keys) { | ||
if (_.has(key, doc)) { | ||
const newKey = keys[key]; | ||
newDoc = renameKey(newDoc, key, newKey); | ||
} | ||
} | ||
return newDoc; | ||
}; |
{ | ||
"name": "mongo2elastic", | ||
"version": "0.38.0", | ||
"version": "0.39.0", | ||
"description": "Sync MongoDB collections to Elasticsearch", | ||
@@ -43,4 +43,4 @@ "main": "dist/index.js", | ||
"jest": "^28.1.3", | ||
"prettier": "^2.8.8", | ||
"ts-jest": "^28.0.8", | ||
"prettier": "^2.8.8", | ||
"typescript": "^5.0.4" | ||
@@ -57,2 +57,3 @@ }, | ||
"lodash": "^4.17.21", | ||
"make-error": "^1.3.6", | ||
"minimatch": "^6.2.0", | ||
@@ -59,0 +60,0 @@ "mongochangestream": "^0.43.0", |
@@ -122,2 +122,3 @@ import { convertSchema } from './convertSchema.js' | ||
properties: { | ||
_mongoId: { type: 'keyword' }, | ||
parentId: { type: 'keyword' }, | ||
@@ -208,2 +209,3 @@ name: { | ||
properties: { | ||
_mongoId: { type: 'keyword' }, | ||
parentId: { type: 'keyword' }, | ||
@@ -307,2 +309,3 @@ name: { | ||
properties: { | ||
_mongoId: { type: 'keyword' }, | ||
parentId: { type: 'keyword' }, | ||
@@ -400,2 +403,3 @@ name: { | ||
properties: { | ||
_mongoId: { type: 'keyword', copy_to: 'all' }, | ||
parentId: { type: 'keyword', copy_to: 'all' }, | ||
@@ -484,2 +488,117 @@ name: { | ||
}) | ||
test('Should rename fields in the schema', () => { | ||
const options = { | ||
omit: ['integrations', 'permissions'], | ||
overrides: [{ path: '*', copy_to: 'all' }], | ||
passthrough: ['copy_to'], | ||
rename: { | ||
numberOfEmployees: 'numEmployees', | ||
'addresses.address.address1': 'addresses.address.street', | ||
}, | ||
} | ||
const result = convertSchema(schema, options) | ||
expect(result).toEqual({ | ||
properties: { | ||
_mongoId: { type: 'keyword', copy_to: 'all' }, | ||
parentId: { type: 'keyword', copy_to: 'all' }, | ||
name: { | ||
type: 'text', | ||
fields: { keyword: { type: 'keyword', ignore_above: 256 } }, | ||
copy_to: 'all', | ||
}, | ||
subType: { | ||
type: 'text', | ||
fields: { keyword: { type: 'keyword', ignore_above: 256 } }, | ||
copy_to: 'all', | ||
}, | ||
numEmployees: { type: 'keyword', copy_to: 'all' }, | ||
addresses: { | ||
properties: { | ||
address: { | ||
properties: { | ||
street: { | ||
type: 'text', | ||
fields: { keyword: { type: 'keyword', ignore_above: 256 } }, | ||
copy_to: 'all', | ||
}, | ||
address2: { | ||
type: 'text', | ||
fields: { keyword: { type: 'keyword', ignore_above: 256 } }, | ||
copy_to: 'all', | ||
}, | ||
city: { | ||
type: 'text', | ||
fields: { keyword: { type: 'keyword', ignore_above: 256 } }, | ||
copy_to: 'all', | ||
}, | ||
county: { | ||
type: 'text', | ||
fields: { keyword: { type: 'keyword', ignore_above: 256 } }, | ||
copy_to: 'all', | ||
}, | ||
state: { | ||
type: 'text', | ||
fields: { keyword: { type: 'keyword', ignore_above: 256 } }, | ||
copy_to: 'all', | ||
}, | ||
zip: { | ||
type: 'text', | ||
fields: { keyword: { type: 'keyword', ignore_above: 256 } }, | ||
copy_to: 'all', | ||
}, | ||
country: { | ||
type: 'text', | ||
fields: { keyword: { type: 'keyword', ignore_above: 256 } }, | ||
copy_to: 'all', | ||
}, | ||
latitude: { type: 'long', copy_to: 'all' }, | ||
longitude: { type: 'long', copy_to: 'all' }, | ||
timezone: { | ||
type: 'text', | ||
fields: { keyword: { type: 'keyword', ignore_above: 256 } }, | ||
copy_to: 'all', | ||
}, | ||
}, | ||
}, | ||
name: { | ||
type: 'text', | ||
fields: { keyword: { type: 'keyword', ignore_above: 256 } }, | ||
copy_to: 'all', | ||
}, | ||
isPrimary: { type: 'boolean', copy_to: 'all' }, | ||
}, | ||
}, | ||
logo: { | ||
type: 'text', | ||
fields: { keyword: { type: 'keyword', ignore_above: 256 } }, | ||
copy_to: 'all', | ||
}, | ||
verified: { type: 'boolean', copy_to: 'all' }, | ||
partner: { | ||
type: 'text', | ||
fields: { keyword: { type: 'keyword', ignore_above: 256 } }, | ||
copy_to: 'all', | ||
}, | ||
createdAt: { type: 'date', copy_to: 'all' }, | ||
}, | ||
}) | ||
}) | ||
test('Should throw an exception if a rename field path prefix is different', () => { | ||
expect(() => | ||
convertSchema(schema, { | ||
rename: { | ||
'integrations.stripe': 'foo.bar', | ||
}, | ||
}) | ||
).toThrow('Rename path prefix does not match: integrations.stripe') | ||
}) | ||
test('Should throw an exception if a rename results in duplicate paths', () => { | ||
expect(() => | ||
convertSchema(schema, { | ||
rename: { | ||
parentId: 'name', | ||
}, | ||
}) | ||
).toThrow('Renaming parentId to name will overwrite property "name"') | ||
}) | ||
}) |
import _ from 'lodash/fp.js' | ||
import { map, Node } from 'obj-walker' | ||
import { map, Node, walker } from 'obj-walker' | ||
import { estypes } from '@elastic/elasticsearch' | ||
import { ConvertOptions } from './types.js' | ||
import { minimatch } from 'minimatch' | ||
import { JSONSchema } from 'mongochangestream' | ||
import makeError from 'make-error' | ||
import { JSONSchema, traverseSchema } from 'mongochangestream' | ||
import { arrayStartsWith } from './util.js' | ||
export const Mongo2ElasticError = makeError('Mongo2ElasticError') | ||
const bsonTypeToElastic: Record<string, string> = { | ||
@@ -69,8 +73,6 @@ number: 'long', | ||
const mapper = (node: Node) => { | ||
const { key, parents, path } = node | ||
let { val } = node | ||
const omitNodes = (node: Node) => { | ||
const { val, path } = node | ||
if (val?.bsonType) { | ||
const cleanPath = cleanupPath(path) | ||
const stringPath = cleanPath.join('.') | ||
// Optionally omit field | ||
@@ -80,6 +82,2 @@ if (omit.find(_.isEqual(cleanPath))) { | ||
} | ||
// Ignore top-level _id field | ||
if (key === '_id' && parents.length === 2) { | ||
return | ||
} | ||
// Use the first type if multi-valued | ||
@@ -89,2 +87,61 @@ if (Array.isArray(val.bsonType)) { | ||
} | ||
} | ||
return val | ||
} | ||
const handleRename = (schema: JSONSchema, rename: Record<string, string>) => { | ||
for (const dottedPath in rename) { | ||
const oldPath = dottedPath.split('.') | ||
const newPath = rename[dottedPath].split('.') | ||
// Only allow renames such that nodes still keep the same parent | ||
if (!arrayStartsWith(oldPath, newPath.slice(0, -1))) { | ||
throw new Mongo2ElasticError( | ||
`Rename path prefix does not match: ${dottedPath}` | ||
) | ||
} | ||
} | ||
// Walk every subschema, renaming properties | ||
walker( | ||
schema, | ||
({ val: { bsonType, properties }, path }) => { | ||
// Only objects can have their properties renamed | ||
if (bsonType !== 'object' || !properties) { | ||
return | ||
} | ||
const cleanPath = _.pull('_items', path) | ||
for (const key in properties) { | ||
const childPath = [...cleanPath, key].join('.') | ||
// Property name to which property `key` should be renamed to | ||
const newProperty = _.last(rename[childPath]?.split('.')) as | ||
| string | ||
| undefined | ||
// Make sure we don't overwrite existing properties | ||
if (newProperty !== undefined && newProperty in properties) { | ||
throw new Mongo2ElasticError( | ||
`Renaming ${childPath} to ${rename[childPath]} will overwrite property "${newProperty}"` | ||
) | ||
} | ||
// Actually rename property | ||
if (newProperty) { | ||
const child = properties[key] | ||
delete properties[key] | ||
properties[newProperty] = child | ||
} | ||
} | ||
}, | ||
{ traverse: traverseSchema } | ||
) | ||
} | ||
const overrideNodes = (node: Node) => { | ||
let { val } = node | ||
const { path } = node | ||
if (val?.bsonType) { | ||
const cleanPath = cleanupPath(path) | ||
const stringPath = cleanPath.join('.') | ||
// Optionally override field | ||
@@ -107,4 +164,7 @@ const overrideMatch = overrides.find(({ path }) => | ||
} | ||
// Recursively convert the schema | ||
return map(jsonSchema, mapper) as Record<string, estypes.MappingProperty> | ||
const schema = map(jsonSchema, omitNodes) as JSONSchema | ||
handleRename(schema, { _id: '_mongoId', ...options.rename }) | ||
return map(schema, overrideNodes) as estypes.MappingPropertyBase | ||
} |
@@ -6,2 +6,3 @@ import _ from 'lodash/fp.js' | ||
Collection, | ||
Document, | ||
} from 'mongodb' | ||
@@ -16,3 +17,3 @@ import type { Redis } from 'ioredis' | ||
import { SyncOptions, Events, ConvertOptions } from './types.js' | ||
import { indexFromCollection } from './util.js' | ||
import { renameKeys, indexFromCollection } from './util.js' | ||
import { convertSchema } from './convertSchema.js' | ||
@@ -48,3 +49,4 @@ import { BulkResponse } from '@elastic/elasticsearch/lib/api/typesWithBodyKey.js' | ||
) => { | ||
const mapper = options.mapper || _.omit(['_id']) | ||
const mapper = (doc: Document) => | ||
renameKeys(doc, { _id: '_mongoId', ...options.rename }) | ||
const index = options.index || indexFromCollection(collection) | ||
@@ -51,0 +53,0 @@ // Initialize sync |
import type { Document } from 'mongodb' | ||
import { JSONSchema } from 'mongochangestream' | ||
export interface SyncOptions { | ||
interface RenameOption { | ||
/** Dotted path to renamed dotted path */ | ||
rename?: Record<string, string> | ||
} | ||
export interface SyncOptions extends RenameOption { | ||
mapper?: (doc: Document) => Document | ||
@@ -14,3 +19,3 @@ index?: string | ||
export interface ConvertOptions { | ||
export interface ConvertOptions extends RenameOption { | ||
omit?: string[] | ||
@@ -17,0 +22,0 @@ overrides?: Override[] |
@@ -1,2 +0,3 @@ | ||
import { Collection } from 'mongodb' | ||
import _ from 'lodash/fp.js' | ||
import { Document, Collection } from 'mongodb' | ||
@@ -8,1 +9,27 @@ export const indexFromCollection = (collection: Collection) => | ||
`${collection.dbName.toLowerCase()}_${collection.collectionName.toLowerCase()}` | ||
/** | ||
* Does arr start with startsWith array. | ||
*/ | ||
export const arrayStartsWith = (arr: any[], startsWith: any[]) => { | ||
for (let i = 0; i < startsWith.length; i++) { | ||
if (arr[i] !== startsWith[i]) { | ||
return false | ||
} | ||
} | ||
return true | ||
} | ||
export const renameKey = (doc: Document, key: string, newKey: string) => | ||
_.flow(_.set(newKey, _.get(key, doc)), _.omit([key]))(doc) | ||
export const renameKeys = (doc: Document, keys: Record<string, string>) => { | ||
let newDoc = doc | ||
for (const key in keys) { | ||
if (_.has(key, doc)) { | ||
const newKey = keys[key] | ||
newDoc = renameKey(newDoc, key, newKey) | ||
} | ||
} | ||
return newDoc | ||
} |
56642
1481
12
+ Addedmake-error@^1.3.6
+ Added@types/whatwg-url@11.0.5(transitive)
- Removed@types/whatwg-url@11.0.4(transitive)