Socket
Socket
Sign inDemoInstall

mongo2elastic

Package Overview
Dependencies
248
Maintainers
1
Versions
49
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.38.0 to 0.39.0

7

CHANGELOG.md

@@ -0,1 +1,6 @@

# 0.39.0
- Removed `options.mapper` in favor of `options.rename` which takes an object of dotted path to renamed dotted path.
The `rename` option is available for syncing and converting a schema to a table.`_id` mapped to `_mongoId` by default.
# 0.38.0

@@ -12,3 +17,3 @@

- `processChangeStream` now batches records. Default timeout before the queue is automatically
flushed is 30 seconds.
flushed is 30 seconds.

@@ -15,0 +20,0 @@ # 0.35.0

4

dist/convertSchema.d.ts
import { estypes } from '@elastic/elasticsearch';
import { ConvertOptions } from './types.js';
import makeError from 'make-error';
import { JSONSchema } from 'mongochangestream';
export declare const Mongo2ElasticError: makeError.Constructor<makeError.BaseError>;
/**

@@ -9,2 +11,2 @@ * Convert MongoDB JSON schema to Elasticsearch mapping.

*/
export declare const convertSchema: (jsonSchema: JSONSchema, options?: ConvertOptions) => Record<string, estypes.MappingProperty>;
export declare const convertSchema: (jsonSchema: JSONSchema, options?: ConvertOptions) => estypes.MappingPropertyBase;
import _ from 'lodash/fp.js';
import { map } from 'obj-walker';
import { map, walker } from 'obj-walker';
import { minimatch } from 'minimatch';
import makeError from 'make-error';
import { traverseSchema } from 'mongochangestream';
import { arrayStartsWith } from './util.js';
export const Mongo2ElasticError = makeError('Mongo2ElasticError');
const bsonTypeToElastic = {

@@ -57,8 +61,6 @@ number: 'long',

const overrides = options.overrides || [];
const mapper = (node) => {
const { key, parents, path } = node;
let { val } = node;
const omitNodes = (node) => {
const { val, path } = node;
if (val?.bsonType) {
const cleanPath = cleanupPath(path);
const stringPath = cleanPath.join('.');
// Optionally omit field

@@ -68,6 +70,2 @@ if (omit.find(_.isEqual(cleanPath))) {

}
// Ignore top-level _id field
if (key === '_id' && parents.length === 2) {
return;
}
// Use the first type if multi-valued

@@ -77,2 +75,44 @@ if (Array.isArray(val.bsonType)) {

}
}
return val;
};
const handleRename = (schema, rename) => {
for (const dottedPath in rename) {
const oldPath = dottedPath.split('.');
const newPath = rename[dottedPath].split('.');
// Only allow renames such that nodes still keep the same parent
if (!arrayStartsWith(oldPath, newPath.slice(0, -1))) {
throw new Mongo2ElasticError(`Rename path prefix does not match: ${dottedPath}`);
}
}
// Walk every subschema, renaming properties
walker(schema, ({ val: { bsonType, properties }, path }) => {
// Only objects can have their properties renamed
if (bsonType !== 'object' || !properties) {
return;
}
const cleanPath = _.pull('_items', path);
for (const key in properties) {
const childPath = [...cleanPath, key].join('.');
// Property name to which property `key` should be renamed to
const newProperty = _.last(rename[childPath]?.split('.'));
// Make sure we don't overwrite existing properties
if (newProperty !== undefined && newProperty in properties) {
throw new Mongo2ElasticError(`Renaming ${childPath} to ${rename[childPath]} will overwrite property "${newProperty}"`);
}
// Actually rename property
if (newProperty) {
const child = properties[key];
delete properties[key];
properties[newProperty] = child;
}
}
}, { traverse: traverseSchema });
};
const overrideNodes = (node) => {
let { val } = node;
const { path } = node;
if (val?.bsonType) {
const cleanPath = cleanupPath(path);
const stringPath = cleanPath.join('.');
// Optionally override field

@@ -94,3 +134,5 @@ const overrideMatch = overrides.find(({ path }) => minimatch(stringPath, path));

// Recursively convert the schema
return map(jsonSchema, mapper);
const schema = map(jsonSchema, omitNodes);
handleRename(schema, { _id: '_mongoId', ...options.rename });
return map(schema, overrideNodes);
};
import _ from 'lodash/fp.js';
import mongoChangeStream from 'mongochangestream';
import { indexFromCollection } from './util.js';
import { renameKeys, indexFromCollection } from './util.js';
import { convertSchema } from './convertSchema.js';

@@ -21,3 +21,3 @@ /**

export const initSync = (redis, collection, elastic, options = {}) => {
const mapper = options.mapper || _.omit(['_id']);
const mapper = (doc) => renameKeys(doc, { _id: '_mongoId', ...options.rename });
const index = options.index || indexFromCollection(collection);

@@ -24,0 +24,0 @@ // Initialize sync

import type { Document } from 'mongodb';
import { JSONSchema } from 'mongochangestream';
export interface SyncOptions {
interface RenameOption {
/** Dotted path to renamed dotted path */
rename?: Record<string, string>;
}
export interface SyncOptions extends RenameOption {
mapper?: (doc: Document) => Document;

@@ -11,3 +15,3 @@ index?: string;

}
export interface ConvertOptions {
export interface ConvertOptions extends RenameOption {
omit?: string[];

@@ -18,1 +22,2 @@ overrides?: Override[];

export type Events = 'process' | 'error';
export {};

@@ -1,3 +0,9 @@

import { Collection } from 'mongodb';
import { Document, Collection } from 'mongodb';
export declare const indexFromCollection: (collection: Collection) => string;
export declare const indexFromDbAndCollection: (collection: Collection) => string;
/**
* Does arr start with startsWith array.
*/
export declare const arrayStartsWith: (arr: any[], startsWith: any[]) => boolean;
export declare const renameKey: (doc: Document, key: string, newKey: string) => any;
export declare const renameKeys: (doc: Document, keys: Record<string, string>) => Document;

@@ -0,2 +1,25 @@

import _ from 'lodash/fp.js';
export const indexFromCollection = (collection) => collection.collectionName.toLowerCase();
export const indexFromDbAndCollection = (collection) => `${collection.dbName.toLowerCase()}_${collection.collectionName.toLowerCase()}`;
/**
* Does arr start with startsWith array.
*/
export const arrayStartsWith = (arr, startsWith) => {
for (let i = 0; i < startsWith.length; i++) {
if (arr[i] !== startsWith[i]) {
return false;
}
}
return true;
};
export const renameKey = (doc, key, newKey) => _.flow(_.set(newKey, _.get(key, doc)), _.omit([key]))(doc);
export const renameKeys = (doc, keys) => {
let newDoc = doc;
for (const key in keys) {
if (_.has(key, doc)) {
const newKey = keys[key];
newDoc = renameKey(newDoc, key, newKey);
}
}
return newDoc;
};
{
"name": "mongo2elastic",
"version": "0.38.0",
"version": "0.39.0",
"description": "Sync MongoDB collections to Elasticsearch",

@@ -43,4 +43,4 @@ "main": "dist/index.js",

"jest": "^28.1.3",
"prettier": "^2.8.8",
"ts-jest": "^28.0.8",
"prettier": "^2.8.8",
"typescript": "^5.0.4"

@@ -57,2 +57,3 @@ },

"lodash": "^4.17.21",
"make-error": "^1.3.6",
"minimatch": "^6.2.0",

@@ -59,0 +60,0 @@ "mongochangestream": "^0.43.0",

@@ -122,2 +122,3 @@ import { convertSchema } from './convertSchema.js'

properties: {
_mongoId: { type: 'keyword' },
parentId: { type: 'keyword' },

@@ -208,2 +209,3 @@ name: {

properties: {
_mongoId: { type: 'keyword' },
parentId: { type: 'keyword' },

@@ -307,2 +309,3 @@ name: {

properties: {
_mongoId: { type: 'keyword' },
parentId: { type: 'keyword' },

@@ -400,2 +403,3 @@ name: {

properties: {
_mongoId: { type: 'keyword', copy_to: 'all' },
parentId: { type: 'keyword', copy_to: 'all' },

@@ -484,2 +488,117 @@ name: {

})
test('Should rename fields in the schema', () => {
const options = {
omit: ['integrations', 'permissions'],
overrides: [{ path: '*', copy_to: 'all' }],
passthrough: ['copy_to'],
rename: {
numberOfEmployees: 'numEmployees',
'addresses.address.address1': 'addresses.address.street',
},
}
const result = convertSchema(schema, options)
expect(result).toEqual({
properties: {
_mongoId: { type: 'keyword', copy_to: 'all' },
parentId: { type: 'keyword', copy_to: 'all' },
name: {
type: 'text',
fields: { keyword: { type: 'keyword', ignore_above: 256 } },
copy_to: 'all',
},
subType: {
type: 'text',
fields: { keyword: { type: 'keyword', ignore_above: 256 } },
copy_to: 'all',
},
numEmployees: { type: 'keyword', copy_to: 'all' },
addresses: {
properties: {
address: {
properties: {
street: {
type: 'text',
fields: { keyword: { type: 'keyword', ignore_above: 256 } },
copy_to: 'all',
},
address2: {
type: 'text',
fields: { keyword: { type: 'keyword', ignore_above: 256 } },
copy_to: 'all',
},
city: {
type: 'text',
fields: { keyword: { type: 'keyword', ignore_above: 256 } },
copy_to: 'all',
},
county: {
type: 'text',
fields: { keyword: { type: 'keyword', ignore_above: 256 } },
copy_to: 'all',
},
state: {
type: 'text',
fields: { keyword: { type: 'keyword', ignore_above: 256 } },
copy_to: 'all',
},
zip: {
type: 'text',
fields: { keyword: { type: 'keyword', ignore_above: 256 } },
copy_to: 'all',
},
country: {
type: 'text',
fields: { keyword: { type: 'keyword', ignore_above: 256 } },
copy_to: 'all',
},
latitude: { type: 'long', copy_to: 'all' },
longitude: { type: 'long', copy_to: 'all' },
timezone: {
type: 'text',
fields: { keyword: { type: 'keyword', ignore_above: 256 } },
copy_to: 'all',
},
},
},
name: {
type: 'text',
fields: { keyword: { type: 'keyword', ignore_above: 256 } },
copy_to: 'all',
},
isPrimary: { type: 'boolean', copy_to: 'all' },
},
},
logo: {
type: 'text',
fields: { keyword: { type: 'keyword', ignore_above: 256 } },
copy_to: 'all',
},
verified: { type: 'boolean', copy_to: 'all' },
partner: {
type: 'text',
fields: { keyword: { type: 'keyword', ignore_above: 256 } },
copy_to: 'all',
},
createdAt: { type: 'date', copy_to: 'all' },
},
})
})
test('Should throw an exception if a rename field path prefix is different', () => {
expect(() =>
convertSchema(schema, {
rename: {
'integrations.stripe': 'foo.bar',
},
})
).toThrow('Rename path prefix does not match: integrations.stripe')
})
test('Should throw an exception if a rename results in duplicate paths', () => {
expect(() =>
convertSchema(schema, {
rename: {
parentId: 'name',
},
})
).toThrow('Renaming parentId to name will overwrite property "name"')
})
})
import _ from 'lodash/fp.js'
import { map, Node } from 'obj-walker'
import { map, Node, walker } from 'obj-walker'
import { estypes } from '@elastic/elasticsearch'
import { ConvertOptions } from './types.js'
import { minimatch } from 'minimatch'
import { JSONSchema } from 'mongochangestream'
import makeError from 'make-error'
import { JSONSchema, traverseSchema } from 'mongochangestream'
import { arrayStartsWith } from './util.js'
export const Mongo2ElasticError = makeError('Mongo2ElasticError')
const bsonTypeToElastic: Record<string, string> = {

@@ -69,8 +73,6 @@ number: 'long',

const mapper = (node: Node) => {
const { key, parents, path } = node
let { val } = node
const omitNodes = (node: Node) => {
const { val, path } = node
if (val?.bsonType) {
const cleanPath = cleanupPath(path)
const stringPath = cleanPath.join('.')
// Optionally omit field

@@ -80,6 +82,2 @@ if (omit.find(_.isEqual(cleanPath))) {

}
// Ignore top-level _id field
if (key === '_id' && parents.length === 2) {
return
}
// Use the first type if multi-valued

@@ -89,2 +87,61 @@ if (Array.isArray(val.bsonType)) {

}
}
return val
}
const handleRename = (schema: JSONSchema, rename: Record<string, string>) => {
for (const dottedPath in rename) {
const oldPath = dottedPath.split('.')
const newPath = rename[dottedPath].split('.')
// Only allow renames such that nodes still keep the same parent
if (!arrayStartsWith(oldPath, newPath.slice(0, -1))) {
throw new Mongo2ElasticError(
`Rename path prefix does not match: ${dottedPath}`
)
}
}
// Walk every subschema, renaming properties
walker(
schema,
({ val: { bsonType, properties }, path }) => {
// Only objects can have their properties renamed
if (bsonType !== 'object' || !properties) {
return
}
const cleanPath = _.pull('_items', path)
for (const key in properties) {
const childPath = [...cleanPath, key].join('.')
// Property name to which property `key` should be renamed to
const newProperty = _.last(rename[childPath]?.split('.')) as
| string
| undefined
// Make sure we don't overwrite existing properties
if (newProperty !== undefined && newProperty in properties) {
throw new Mongo2ElasticError(
`Renaming ${childPath} to ${rename[childPath]} will overwrite property "${newProperty}"`
)
}
// Actually rename property
if (newProperty) {
const child = properties[key]
delete properties[key]
properties[newProperty] = child
}
}
},
{ traverse: traverseSchema }
)
}
const overrideNodes = (node: Node) => {
let { val } = node
const { path } = node
if (val?.bsonType) {
const cleanPath = cleanupPath(path)
const stringPath = cleanPath.join('.')
// Optionally override field

@@ -107,4 +164,7 @@ const overrideMatch = overrides.find(({ path }) =>

}
// Recursively convert the schema
return map(jsonSchema, mapper) as Record<string, estypes.MappingProperty>
const schema = map(jsonSchema, omitNodes) as JSONSchema
handleRename(schema, { _id: '_mongoId', ...options.rename })
return map(schema, overrideNodes) as estypes.MappingPropertyBase
}

@@ -6,2 +6,3 @@ import _ from 'lodash/fp.js'

Collection,
Document,
} from 'mongodb'

@@ -16,3 +17,3 @@ import type { Redis } from 'ioredis'

import { SyncOptions, Events, ConvertOptions } from './types.js'
import { indexFromCollection } from './util.js'
import { renameKeys, indexFromCollection } from './util.js'
import { convertSchema } from './convertSchema.js'

@@ -48,3 +49,4 @@ import { BulkResponse } from '@elastic/elasticsearch/lib/api/typesWithBodyKey.js'

) => {
const mapper = options.mapper || _.omit(['_id'])
const mapper = (doc: Document) =>
renameKeys(doc, { _id: '_mongoId', ...options.rename })
const index = options.index || indexFromCollection(collection)

@@ -51,0 +53,0 @@ // Initialize sync

import type { Document } from 'mongodb'
import { JSONSchema } from 'mongochangestream'
export interface SyncOptions {
interface RenameOption {
/** Dotted path to renamed dotted path */
rename?: Record<string, string>
}
export interface SyncOptions extends RenameOption {
mapper?: (doc: Document) => Document

@@ -14,3 +19,3 @@ index?: string

export interface ConvertOptions {
export interface ConvertOptions extends RenameOption {
omit?: string[]

@@ -17,0 +22,0 @@ overrides?: Override[]

@@ -1,2 +0,3 @@

import { Collection } from 'mongodb'
import _ from 'lodash/fp.js'
import { Document, Collection } from 'mongodb'

@@ -8,1 +9,27 @@ export const indexFromCollection = (collection: Collection) =>

`${collection.dbName.toLowerCase()}_${collection.collectionName.toLowerCase()}`
/**
* Does arr start with startsWith array.
*/
export const arrayStartsWith = (arr: any[], startsWith: any[]) => {
for (let i = 0; i < startsWith.length; i++) {
if (arr[i] !== startsWith[i]) {
return false
}
}
return true
}
export const renameKey = (doc: Document, key: string, newKey: string) =>
_.flow(_.set(newKey, _.get(key, doc)), _.omit([key]))(doc)
export const renameKeys = (doc: Document, keys: Record<string, string>) => {
let newDoc = doc
for (const key in keys) {
if (_.has(key, doc)) {
const newKey = keys[key]
newDoc = renameKey(newDoc, key, newKey)
}
}
return newDoc
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc