Socket
Socket
Sign inDemoInstall

leo-connector-common

Package Overview
Dependencies
Maintainers
4
Versions
108
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

leo-connector-common - npm Package Compare versions

Comparing version 1.6.0 to 2.0.0-189-g593f905

base.js

46

checksum/index.js
let leo = require("leo-sdk");
const aws = require("aws-sdk");
const checksum = require("./lib/checksumNibbler.js");
let dynamodb = leo.aws.dynamodb;
const leoaws = require('leo-aws');
let cron = leo.bot;

@@ -23,15 +23,7 @@

function saveProgress(systemId, botId, data) {
return new Promise((resolve, reject) => {
dynamodb.merge(tableName, botId, {
checksum: data,
system: {
id: systemId
}
}, function(err, result) {
if (err) {
reject(err);
} else {
resolve(data);
}
});
return leoaws.dynamodb.merge(tableName, botId, {
checksum: data,
system: {
id: systemId
}
});

@@ -88,20 +80,14 @@ }

} else {
return new Promise((resolve, reject) => {
logger.log("Getting Session", systemId, botId);
dynamodb.get(tableName, botId, function(err, result) {
if (err) {
reject(err);
} else {
try {
let session = emptySession;
if (result && result.checksum && result.checksum.restart !== true && result.checksum.status !== 'complete') {
session = result.checksum;
}
resolve(session);
} catch (err) {
reject(err);
}
logger.log("Getting Session", systemId, botId);
return leoaws.dynamodb.get(tableName, botId)
.then(result => {
let session = emptySession;
if (result && result.checksum && result.checksum.restart !== true && result.checksum.status !== 'complete') {
session = result.checksum;
}
return session;
});
});
}

@@ -108,0 +94,0 @@ },

@@ -5,4 +5,3 @@ "use strict";

const moment = require("moment");
require("moment-timezone");
const logger = require("leo-sdk/lib/logger")("leo-checksum.basic");
const logger = require("leo-logger")("leo-checksum.basic");
const Stream = require('stream').Stream;

@@ -9,0 +8,0 @@

@@ -5,3 +5,3 @@ "use strict";

let logger = require("leo-sdk/lib/logger")("leo-checksum.nibbler");
let logger = require("leo-logger")("leo-checksum.nibbler");

@@ -8,0 +8,0 @@ module.exports = function(local, remote, opts) {

@@ -1,3 +0,3 @@

var async = require("async");
let logger = require("leo-sdk/lib/logger")("leo-nibbler");
const async = require("async");
const logger = require("leo-logger")("leo-nibbler");

@@ -4,0 +4,0 @@ /**

1019

dol.js

@@ -0,10 +1,45 @@

'use strict';
const leo = require("leo-sdk");
const ls = leo.streams;
const async = require("async");
const logger = require('leo-logger');
module.exports = function DomainObjectLoader(client) {
let self = this;
return {
translateIds: function(translations, opts) {
opts = Object.assign({
function respectDomainIdOrder(domainIdColumn, id) {
if (typeof id === 'object') {
return domainIdColumn.reduce((treatedId, curId) => {
treatedId[curId] = id[curId];
return treatedId;
}, {});
}
return id;
}
module.exports = class Dol {
constructor(client) {
this.client = client;
}
translateIds(translations, domainIdColumn, opts) {
if (typeof domainIdColumn === 'object' && !Array.isArray(domainIdColumn) && !opts) {
opts = Object.assign({}, domainIdColumn);
domainIdColumn = undefined;
}
opts = Object.assign({
count: 1000,
time: {
milliseconds: 200
}
}, opts);
return ls.pipeline(
this.translateIdsStartStream(translations, domainIdColumn),
ls.batch({
count: opts.count,
time: opts.time
}),
this.translateIdsLookupStream(translations),
ls.batch({
count: 1000,

@@ -14,303 +49,313 @@ time: {

}
}, opts);
return ls.pipeline(
translateIdsStartStream(translations),
ls.batch({
count: opts.count,
time: opts.time
}),
translateIdsLookupStream(client, translations),
ls.batch({
count: 1000,
time: {
milliseconds: 200
}
}),
translateIdsCombineStream()
);
},
domainObjectTransform: function(domainObject) {
if (typeof domainObject.get === "function") {
domainObject = domainObject.get();
}
domainObject = Object.assign({
domainIdColumn: "_domain_id",
query: "select * from dual limit 1",
joins: {}
}, domainObject);
}),
this.translateIdsCombineStream()
);
}
domainObject.sql = queryToFunction(domainObject.sql || domainObject.query, ["data"]);
Object.values(domainObject.joins).map(v => {
v.sql = queryToFunction(v.sql || v.query, ["data"]);
});
domainObjectTransform(domainObject) {
if (typeof domainObject.get === "function") {
domainObject = domainObject.get();
}
domainObject = Object.assign({
domainIdColumn: "_domain_id",
query: "select * from dual limit 1",
joins: {}
}, domainObject);
//We can jump in at this point if we just have a join table to use without going through the above...otherwise it assumes we have a list of ids
//Lets do actual Domain lookups
return ls.through((obj, done, push) => {
let addCorrelation = (result, final = false, units) => {
if (final) {
result.correlation_id = {
source: obj.correlation_id.source,
start: obj.correlation_id.start || obj.correlation_id.end,
end: obj.correlation_id.end,
units: units
};
} else {
result.correlation_id = {
source: obj.correlation_id.source,
partial_start: obj.correlation_id.start || obj.correlation_id.end || obj.correlation_id.partial,
partial_end: obj.correlation_id.start ? (obj.correlation_id.partial || obj.correlation_id.end) : obj.correlation_id.end,
units: units
};
}
push(result);
};
domainObject.sql = this.queryToFunction(domainObject.sql || domainObject.query, ["data"]);
Object.values(domainObject.joins).map(v => {
v.sql = this.queryToFunction(v.sql || v.query, ["data"]);
});
if (obj.joinTable) {
done("joinTable is not implemented");
} else if (obj.ids) {
//Query for these domain Objects
//We can jump in at this point if we just have a join table to use without going through the above...otherwise it assumes we have a list of ids
//Lets do actual Domain lookups
return ls.through((obj, done, push) => {
let addCorrelation = (result, final = false, units) => {
if (final) {
result.correlation_id = {
source: obj.correlation_id.source,
start: obj.correlation_id.start || obj.correlation_id.end,
end: obj.correlation_id.end,
units: units
};
} else {
result.correlation_id = {
source: obj.correlation_id.source,
partial_start: obj.correlation_id.start || obj.correlation_id.end || obj.correlation_id.partial,
partial_end: obj.correlation_id.start ? (obj.correlation_id.partial || obj.correlation_id.end) : obj.correlation_id.end,
units: units
};
}
push(result);
};
buildDomainObject(client, domainObject, obj.ids, addCorrelation, (err, results = []) => {
if (err) {
console.log(JSON.stringify(obj, null, 2));
return done(err);
}
if (results.length) {
let i = 0;
for (; i < results.length - 1; i++) {
let result = results[i];
push(result);
}
if (obj.joinTable) {
done("joinTable is not implemented");
} else if (obj.ids) {
//Query for these domain Objects
let lastResult = results[i];
addCorrelation(lastResult, true);
addCorrelation({
dont_write: true,
payload: {}
}, true, -1);
} else {
addCorrelation({
dont_write: true,
payload: {}
}, true);
addCorrelation({
dont_write: true,
payload: {}
}, true, -1);
this.buildDomainObject(domainObject, obj.ids, addCorrelation, (err, results = []) => {
if (err) {
logger.error(JSON.stringify(obj, null, 2));
return done(err);
}
if (results.length) {
let i = 0;
for (; i < results.length - 1; i++) {
let result = results[i];
push(result);
}
done();
});
} else {
addCorrelation({
dont_write: true,
payload: {}
}, true);
let lastResult = results[i];
addCorrelation(lastResult, true);
addCorrelation({
dont_write: true,
payload: {}
}, true, -1);
} else {
addCorrelation({
dont_write: true,
payload: {}
}, true);
addCorrelation({
dont_write: true,
payload: {}
}, true, -1);
}
done();
}
});
},
DomainObject: function(query, domainIdColumn = "_domain_id", transform) {
if (typeof domainIdColumn === "function") {
transform = domainIdColumn;
domainIdColumn = "_domain_id";
});
} else {
addCorrelation({
dont_write: true,
payload: {}
}, true);
done();
}
this.query = query;
this.domainIdColumn = domainIdColumn;
this.transform = transform;
this.joins = {};
this.hasMany = function(name, query, domainIdColumn = "_domain_id", transform) {
if (typeof query === "object") {
this.joins[name] = query;
return;
}
});
}
if (typeof domainIdColumn === "function") {
transform = domainIdColumn;
domainIdColumn = "_domain_id";
}
domainObject(query, domainIdColumn = "_domain_id", transform) {
if (typeof domainIdColumn === "function") {
transform = domainIdColumn;
domainIdColumn = "_domain_id";
}
this.query = query;
this.domainIdColumn = domainIdColumn;
this.transform = transform;
this.joins = {};
this.joins[name] = {
query,
domainIdColumn,
transform,
type: "one_to_many"
};
return this;
};
return this;
}
hasMany (name, query, domainIdColumn = "_domain_id", transform) {
if (typeof query === "object") {
this.joins[name] = query;
return;
}
};
};
if (typeof domainIdColumn === "function") {
transform = domainIdColumn;
domainIdColumn = "_domain_id";
}
this.joins[name] = {
query,
domainIdColumn,
transform,
type: "one_to_many"
};
return this;
}
function translateIdsStartStream(idTranslation) {
// let bufferIds = {};
// let lastFullEid = null;
/**
* Translate id's from a database listener
* formats:
* mysql: payload.update.database.table.ids
* payload.delete.database.table.ids
* other databases: payload.table.ids - converted to payload.update.__database__.table.ids
* @param idTranslation
*/
translateIdsStartStream(idTranslation, domainIdColumn) {
return ls.through((obj, done, push) => {
if (!obj.payload.update) {
return done(null, {
correlation_id: {
source: obj.event,
start: obj.eid
}
});
}
let last = null;
let count = 0;
let updates = obj.payload.update;
for (let schema in updates) {
for (var t in idTranslation) {
let ids = updates[schema][t];
if (!ids) {
continue;
}
ids = Array.from(new Set(ids)); // Dedub the ids
for (var i = 0; i < ids.length; i++) {
if (count) push(last);
last = {
s: schema,
t,
id: ids[i],
return ls.through((obj, done, push) => {
if (!obj.payload.update) {
// handle data from listeners other than mysql
if (obj.payload && !obj.payload.delete && Object.keys(obj.payload).length) {
obj.payload = {
update: {
// add a dummy database, which will be removed on the domain object final output
__database__: obj.payload
}
};
} else {
return done(null, {
correlation_id: {
source: obj.event,
//start: lastFullEid
partial: obj.eid,
units: 1
start: obj.eid
}
};
count++;
});
}
}
}
// lastFullEid = obj.eid;
if (last) {
count++;
last.correlation_id = {
source: obj.event,
start: obj.eid,
units: 1
};
done(null, last);
} else {
done(null, {
correlation_id: {
let last = null;
let count = 0;
let updates = obj.payload.update;
for (let schema in updates) {
for (let t in idTranslation) {
let ids = updates[schema][t];
if (!ids) {
continue;
}
ids = Array.from(new Set(ids)); // Dedup the ids
for (let i = 0; i < ids.length; i++) {
const respectfulId = respectDomainIdOrder(domainIdColumn, ids[i]);
if (count) push(last);
last = {
s: schema,
t,
id: respectfulId,
correlation_id: {
source: obj.event,
partial: obj.eid,
units: 1
}
};
count++;
}
}
}
if (last) {
last.correlation_id = {
source: obj.event,
start: obj.eid,
units: 1
}
});
}
});
}
function translateIdsLookupStream(client, idTranslation) {
// let lastFullEid = null;
let handlers = {};
Object.keys(idTranslation).map(v => {
let translation = idTranslation[v];
if (translation === true) {
handlers[v] = (data, done) => {
done(null, data.ids);
};
} else if (typeof translation === "string" || (typeof translation === "function" && translation.length <= 1)) {
let queryFn = queryToFunction(translation, ["data"]);
handlers[v] = function(data, done) {
let query = queryFn.call(this, data);
this.client.query(query, [data.ids], (err, rows) => {
done(err, rows && rows.map(r => r[0]));
}, {
inRowMode: true
};
done(null, last);
} else {
done(null, {
correlation_id: {
source: obj.event,
start: obj.eid,
units: 1
}
});
};
} else if (typeof translation === "function") {
handlers[v] = translation;
}
});
}
});
}
return ls.through((obj, done, push) => {
let ids = {};
let p = obj.payload;
let startEid;
let endEid;
let partialEid;
for (let i = 0; i < p.length; i++) {
let record = p[i];
if (record.id) {
if (!(record.s in ids)) {
ids[record.s] = Object.keys(idTranslation).reduce((acc, v) => (acc[v] = new Set()) && acc, {});
}
ids[record.s][record.t].add(record.id);
translateIdsLookupStream(idTranslation) {
let handlers = {};
Object.keys(idTranslation).map(v => {
let translation = idTranslation[v];
if (translation === true) {
handlers[v] = (data, done) => {
done(null, data.ids);
};
} else if (typeof translation === "string" || (typeof translation === "function" && translation.length <= 1)) {
let queryFn = this.queryToFunction(translation, ["data"]);
handlers[v] = function (data, done) {
let query = queryFn.call(this, data);
this.client.query(query, [data.ids], (err, rows) => {
done(err, rows && rows.map(r => r[0]));
}, {
inRowMode: true
});
};
} else if (typeof translation === "function") {
handlers[v] = translation;
}
});
return ls.through((obj, done, push) => {
let ids = {};
let p = obj.payload;
let startEid;
let endEid;
let partialEid;
if (record.correlation_id.start) {
if (!startEid) {
startEid = record.correlation_id.start;
for (let i = 0; i < p.length; i++) {
let record = p[i];
if (record.id) {
if (!(record.s in ids)) {
ids[record.s] = Object.keys(idTranslation).reduce((acc, v) => (acc[v] = new Set()) && acc, {});
}
ids[record.s][record.t].add(record.id);
}
partialEid = record.correlation_id.partial;
endEid = record.correlation_id.start || endEid;
} else if (record.correlation_id.partial) {
partialEid = record.correlation_id.partial;
if (record.correlation_id.start) {
if (!startEid) {
startEid = record.correlation_id.start;
}
partialEid = record.correlation_id.partial;
endEid = record.correlation_id.start || endEid;
} else if (record.correlation_id.partial) {
partialEid = record.correlation_id.partial;
}
}
//console.log((record.correlation_id.start ? "start" : ""), record.correlation_id.partial ? "partial" : "", ":", record.correlation_id.start, record.correlation_id.partial);
}
let tasks = [];
let domainIds = [];
Object.keys(ids).forEach(schema => {
tasks.push(done => {
let subTasks = [];
Object.keys(ids[schema]).forEach(t => {
let lookupIds = Array.from(ids[schema][t]);
if (lookupIds && lookupIds.length) {
let handler = handlers[t] || ((ids, d) => d());
let tasks = [];
let domainIds = [];
Object.keys(ids).forEach(schema => {
tasks.push(done => {
let subTasks = [];
Object.keys(ids[schema]).forEach(t => {
let lookupIds = Array.from(ids[schema][t]);
if (lookupIds && lookupIds.length) {
let handler = handlers[t] || ((ids, d) => d());
subTasks.push(done => {
let context = {
database: schema,
schema: schema,
table: t,
client: this.client,
ids: lookupIds,
done: done
};
subTasks.push(done => {
let context = {
database: schema,
schema: schema,
table: t,
client: client,
ids: lookupIds,
done: done
};
handler.call(context, context, (err, ids = []) => {
if (err) {
done(err);
} else {
ids.map(id => {
domainIds.push({
s: schema,
id: id
handler.call(context, context, (err, ids = []) => {
if (err) {
done(err);
} else {
ids.map(id => {
domainIds.push({
s: schema,
id: id
});
});
});
done();
}
done();
}
});
});
});
}
}
});
async.parallelLimit(subTasks, 10, (err) => {
done(err);
});
});
async.parallelLimit(subTasks, 10, (err) => {
done(err);
});
});
});
async.parallelLimit(tasks, 4, (err) => {
if (err) {
return done(err);
} else if (!domainIds.length) {
async.parallelLimit(tasks, 4, (err) => {
if (err) {
return done(err);
} else if (!domainIds.length) {
// TODO: should we set lastFullEid?
return done(null, {
correlation_id: {
// TODO: should we set lastFullEid?
return done(null, {
correlation_id: {
source: obj.correlation_id.source,
start: startEid,
end: endEid,
partial: partialEid,
units: p.length
}
});
}
let i = 0;
for (; i < domainIds.length - 1; i++) {
domainIds[i].correlation_id = {
source: obj.correlation_id.source,
//start: obj.correlation_id.end || obj.correlation_id.start,
start: startEid,

@@ -320,12 +365,8 @@ end: endEid,

units: p.length
//units: obj.correlation_id.units
}
});
}
let i = 0;
for (; i < domainIds.length - 1; i++) {
};
push(domainIds[i]);
}
domainIds[i].correlation_id = {
source: obj.correlation_id.source,
// start: lastFullEid,
// units: obj.correlation_id.units
start: startEid,

@@ -336,239 +377,228 @@ end: endEid,

};
push(domainIds[i]);
}
// lastFullEid = obj.correlation_id.end || obj.correlation_id.start;
domainIds[i].correlation_id = {
source: obj.correlation_id.source,
//start: lastFullEid,
//units: obj.correlation_id.units || 1
start: startEid,
end: endEid,
partial: partialEid,
units: p.length
};
done(null, domainIds[i]);
done(null, domainIds[i]);
});
});
});
}
}
function translateIdsCombineStream() {
return ls.through((obj, done) => {
let startEid;
let endEid;
let partialEid;
//let units = 0;
translateIdsCombineStream() {
return ls.through((obj, done) => {
let startEid;
let endEid;
let partialEid;
let ids = {};
for (var i = 0; i < obj.payload.length; i++) {
let p = obj.payload[i];
if (p.s !== undefined && p.id !== undefined) {
if (!(p.s in ids)) {
ids[p.s] = new Set();
let ids = {};
for (let i = 0; i < obj.payload.length; i++) {
let p = obj.payload[i];
if (p.s !== undefined && p.id !== undefined) {
if (!(p.s in ids)) {
ids[p.s] = new Set();
}
ids[p.s].add(p.id);
}
ids[p.s].add(p.id);
}
let record = p;
if (record.correlation_id.start) {
if (!startEid) {
startEid = record.correlation_id.start;
let record = p;
if (record.correlation_id.start) {
if (!startEid) {
startEid = record.correlation_id.start;
}
partialEid = record.correlation_id.partial;
endEid = record.correlation_id.start || endEid;
} else if (record.correlation_id.partial) {
partialEid = record.correlation_id.partial;
}
partialEid = record.correlation_id.partial;
endEid = record.correlation_id.start || endEid;
//units += (record.correlation_id.units || record.correlation_id.records || 1);
} else if (record.correlation_id.partial) {
partialEid = record.correlation_id.partial;
}
Object.keys(ids).map(k => {
ids[k] = Array.from(ids[k]);
});
//console.log("combine", (record.correlation_id.start ? "start" : ""), record.correlation_id.partial ? "partial" : "", ":", record.correlation_id.start, record.correlation_id.partial);
}
Object.keys(ids).map(k => {
ids[k] = Array.from(ids[k]);
done(null, {
ids: ids,
correlation_id: {
source: obj.correlation_id.source,
start: startEid,
end: endEid,
partial: partialEid,
units: obj.payload.length
}
});
});
}
done(null, {
ids: ids,
correlation_id: {
source: obj.correlation_id.source,
//start: obj.correlation_id.start || obj.correlation_id.end,
//units: obj.correlation_id.units
start: startEid,
end: endEid,
partial: partialEid,
units: obj.payload.length
}
});
});
}
buildDomainObject(domainObject, ids, push, callback) {
let opts = {};
let sqlClient = this.client;
async.eachLimit(Object.entries(ids), 5, ([schema, ids], callback) => {
let tasks = [];
let domains = {};
let queryIds = [];
logger.log(`Processing ${ids.length} from ${schema}`);
ids.forEach(id => {
// if the id is an object, it's a composite key
if (typeof id === 'object') {
// change the id to be a string with keys separated by a dash
queryIds.push(Object.values(id));
id = Object.values(id).join('-');
} else {
queryIds.push(id);
}
function buildDomainObject(client, domainObject, ids, push, callback) {
let opts = {};
let sqlClient = client;
let joinsCount = Object.keys(domainObject.joins).length;
async.eachLimit(Object.entries(ids), 5, ([schema, ids], callback) => {
let tasks = [];
let domains = {};
console.log(`Processing ${ids.length} from ${schema}`);
ids.forEach(id => {
domains[id] = {};
Object.keys(domainObject.joins).forEach(name => {
let t = domainObject.joins[name];
if (t.type === "one_to_many") {
domains[id] = {};
Object.keys(domainObject.joins).forEach(name => {
domains[id][name] = [];
} else {
domains[id][name] = {};
}
});
});
});
function mapResults(results, fields, each) {
let mappings = [];
tasks.push(done => {
if (!domainObject.domainIdColumn) {
logger.error('[FATAL ERROR]: No ID specified');
}
let last = null;
fields.forEach((f, i) => {
if (last == null) {
last = {
path: null,
start: i
};
mappings.push(last);
} else if (f.name.match(/^prefix_/)) {
last.end = i;
last = {
path: f.name.replace(/^prefix_/, ''),
start: i + 1
};
mappings.push(last);
}
});
last.end = fields.length;
results.forEach(r => {
//Convert back to object now
let row = {};
mappings.forEach(m => {
if (m.path === null) {
r.slice(m.start, m.end).forEach((value, i) => {
row[fields[m.start + i].name] = value;
});
} else if (m.path) {
row[m.path] = r.slice(m.start, m.end).reduce((acc, value, i) => {
acc[fields[m.start + i].name] = value;
return acc;
}, {});
}
let query = domainObject.sql.call({
database: schema,
schema: schema,
client: sqlClient
}, {
ids: queryIds,
database: schema,
schema: schema,
client: sqlClient
});
each(row);
});
}
tasks.push(done => {
if (!domainObject.domainIdColumn) {
console.log('[FATAL ERROR]: No ID specified');
}
let query = domainObject.sql.call({
database: schema,
schema: schema,
client: sqlClient
}, {
ids: ids,
database: schema,
schema: schema,
client: sqlClient
this.buildDomainQuery(domainObject, domains, query, [queryIds], done);
});
sqlClient.query(query, [ids], (err, results, fields) => {
if (err) return done(err);
mapResults(results, fields, row => {
let domainId = row[domainObject.domainIdColumn];
if (domainObject.transform) {
row = domainObject.transform(row);
}
delete row._domain_id;
//row._schema = schema;
if (!domainId) {
console.error('ID: "' + domainObject.domainIdColumn + '" not found in object:');
} else if (!domains[domainId]) {
console.error('ID: "' + domainObject.domainIdColumn + '" with a value of: "' + domainId + '" does not match any ID in the domain object. This could be caused by using a WHERE clause on an ID that differs from the SELECT ID');
} else {
//We need to keep the domain relationships in tact
domains[domainId] = Object.assign(domains[domainId], row);
}
Object.keys(domainObject.joins).forEach(name => {
let t = domainObject.joins[name];
t.domainIdColumn = t.domainIdColumn || "_domain_id";
let query = t.sql.call({
database: schema,
schema: schema,
client: sqlClient
}, {
ids: queryIds,
database: schema,
schema: schema,
client: sqlClient
});
done();
}, {
inRowMode: true
tasks.push(done => {
this.buildJoinQuery(t, name, domains, query, [queryIds], done);
});
});
this.processTasks(ids, domains, tasks, domainObject, schema, opts, push, callback);
}, (err) => {
callback(err);
});
}
mapResults(results, fields, each) {
let mappings = [];
Object.keys(domainObject.joins).forEach(name => {
let t = domainObject.joins[name];
t.domainIdColumn = t.domainIdColumn || "_domain_id";
let query = t.sql.call({
database: schema,
schema: schema,
client: sqlClient
}, {
ids: ids,
database: schema,
schema: schema,
client: sqlClient
let last = null;
fields.forEach((f, i) => {
if (last == null) {
last = {
path: null,
start: i
};
mappings.push(last);
} else if (f.name.match(/^prefix_/)) {
last.end = i;
last = {
path: f.name.replace(/^prefix_/, ''),
start: i + 1
};
mappings.push(last);
}
});
last.end = fields.length;
results.forEach(r => {
//Convert back to object now
let row = {};
mappings.forEach(m => {
if (m.path === null) {
r.slice(m.start, m.end).forEach((value, i) => {
row[fields[m.start + i].name] = value;
});
} else if (m.path) {
row[m.path] = r.slice(m.start, m.end).reduce((acc, value, i) => {
acc[fields[m.start + i].name] = value;
return acc;
}, {});
}
});
if (t.type === "one_to_many") {
tasks.push(done => {
sqlClient.query(query, [ids], (err, results, fields) => {
if (err) {
return done(err);
} else if (!results.length) {
return done();
}
each(row);
});
}
mapResults(results, fields, row => {
let domainId = row[t.domainIdColumn];
delete row._domain_id;
processDomainQuery({ transform = a => a, domainIdColumn }, domains, done, err, results, fields) {
if (err) return done(err);
if (t.transform) {
row = t.transform(row);
}
this.mapResults(results, fields, row => {
let domainId = Array.isArray(domainIdColumn)? domainIdColumn.map(i => row[i]).join('-') : row[domainIdColumn];
row = transform(row);
delete row._domain_id;
domains[domainId][name].push(row);
});
done();
}, {
inRowMode: true
});
});
if (!domainId) {
logger.error('ID: "' + domainIdColumn + '" not found in object:');
} else if (!domains[domainId]) {
logger.error('ID: "' + domainIdColumn + '" with a value of: "' + domainId + '" does not match any ID in the domain object. This could be caused by using a WHERE clause on an ID that differs from the SELECT ID');
} else {
tasks.push(done => {
sqlClient.query(query, [ids], (err, results, fields) => {
if (err) {
return done(err);
} else if (!results.length) {
return done();
}
mapResults(results, fields, row => {
if (row.length) {
let domainId = row[t.domainIdColumn];
delete row._domain_id;
if (t.transform) {
row = t.transform(row);
}
domains[domainId][name] = row;
}
});
done();
}, {
inRowMode: true
});
});
//We need to keep the domain relationships in tact
domains[domainId] = Object.assign(domains[domainId], row);
}
});
done();
}
buildDomainQuery(domainObject, domains, query, queryIds, done) {
this.client.query(query, queryIds, (err, results, fields) => {
this.processDomainQuery(domainObject, domains, done, err, results, fields);
}, {
inRowMode: true
});
}
processJoinQuery({ transform = a => a, domainIdColumn }, name, domains, done, err, results, fields) {
if (err) {
return done(err);
} else if (!results.length) {
return done();
}
this.mapResults(results, fields, row => {
let domainId = Array.isArray(domainIdColumn)? domainIdColumn.map(i => row[i]).join('-') : row[domainIdColumn];
delete row._domain_id;
row = transform(row);
domains[domainId][name].push(row);
});
done();
}
buildJoinQuery(joinObject, name, domains, query, queryIds, done) {
this.client.query(query, queryIds, (err, results, fields) => {
this.processJoinQuery(joinObject, name, domains, done, err, results, fields);
}, {
inRowMode: true
});
}
/**
* Process the tasks
* @param ids
* @param domains Array
* @param tasks
* @param domainObject
* @param schema
* @param opts
* @param push
* @param callback
*/
processTasks(ids, domains, tasks, domainObject, schema, opts, push, callback) {
let limit = 5;
let joinsCount = Object.keys(domainObject.joins).length;
async.parallelLimit(tasks, limit, (err) => {

@@ -578,8 +608,13 @@ if (err) {

} else {
// let getEid = opts.getEid || ((id, obj, stats) => stats.end);
ids.forEach(id => {
// if the id is an object, it's a composite key
if (typeof id === 'object') {
// change the id to be a string with keys separated by a dash
id = Object.values(id).join('-');
}
// skip the domain if there is no data with it
let keyCount = Object.keys(domains[id]).length;
if (keyCount === 0) {
console.log('[INFO] Skipping domain id due to empty object. #: ' + id);
logger.log('[INFO] Skipping domain id due to empty object. #: ' + id);
return;

@@ -591,3 +626,3 @@ } else if (keyCount <= joinsCount) {

if (!valid) {
console.log('[INFO] Skipping domain id due to empty object. #: ' + id);
logger.log('[INFO] Skipping domain id due to empty object. #: ' + id);
return;

@@ -605,19 +640,25 @@ }

};
// remove the schema if it's a dummy we added in translate ids
if (event.schema === '__database__') {
delete event.schema;
}
push(event);
});
callback();
}
});
}, (err) => {
callback(err);
});
}
}
function queryToFunction(query, params) {
if (typeof query === "function") {
return query;
} else if (typeof query === "string") {
params.push(`return \`${query}\`;`);
return Function.apply(this, params);
queryToFunction(query, params) {
if (typeof query === "function") {
return query;
} else if (typeof query === "string") {
params.push(`return \`${query}\`;`);
return Function.apply(this, params);
}
}
}
};
{
"name": "leo-connector-common",
"version": "1.6.0",
"version": "2.0.0-189-g593f905",
"description": "Common package for all Leo Platform database connectors",

@@ -27,7 +27,8 @@ "main": "index.js",

"async": "^2.6.1",
"leo-sdk": "^2.0.1",
"leo-aws": "^1.4.0",
"leo-logger": ">=1.0.0",
"leo-sdk": "^2.2.0",
"moment": "^2.22.2",
"sqlstring": "^2.3.1",
"leo-logger": ">=1.0.0"
"sqlstring": "^2.3.1"
}
}

@@ -0,55 +1,108 @@

'use strict';
const leo = require("leo-sdk");
const ls = leo.streams;
const leoaws = require('leo-aws');
const ls = require('leo-streams');
const leostream = leo.streams;
const moment = require("moment");
const merge = require('lodash.merge');
const logger = require('leo-logger');
const nibbler = require("./nibbler.js");
const loader = require("./loader.js");
const loaderJoin = require("./loaderJoinTable.js");
const moment = require("moment");
let dynamodb = leo.aws.dynamodb;
const tableName = leo.configuration.resources.LeoCron;
module.exports = function(botId, client, table, id, domain, opts, callback) {
opts = Object.assign({
event: table,
}, opts || {});
module.exports = class Snapshotter {
constructor(connector) {
this.params = {
connector: connector,
nibble: null,
logTimeout: null,
runUntilComplete: true,
timestamp: moment(),
botId: process.__config.registry.context.botId
};
}
let nibble = null;
var logTimeout = null;
/**
* Start snapshot reading
* @param settings
* @returns {*}
*/
read(settings) {
// add the settings parameters
this.params = merge(this.params, settings);
if (!this.params.table) {
throw new Error('No `table` specified.');
} else if (!this.params.pk) {
throw new Error('No `pk` specified for the specified table.');
} else if (!this.params.botId) {
throw new Error('NO botId specified.');
}
let stream = ls.passthrough({
objectMode: true,
});
leoaws.dynamodb.get(tableName, this.params.botId)
.then(result => {
ls.pipe(this.nibble(result), this.format(), stream);
})
.catch(err => {
throw new Error(err);
});
return stream;
}
/*****************************************
* Start fancy console log status thingy *
*****************************************/
//@todo: Update all this to use the log-update node module
function clearLog() {
clearLog() {
process.stdout.write("\r\x1b[K");
if (logTimeout) clearInterval(logTimeout);
if (this.params.logTimeout) clearInterval(this.params.logTimeout);
}
var log = function() {
clearLog();
var percent = (nibble.progress / nibble.total) * 100;
var fixed = percent.toFixed(2);
if (fixed == "100.00" && percent < 100) {
log() {
this.clearLog();
let percent = (this.params.nibble.progress / this.params.nibble.total) * 100;
let fixed = percent.toFixed(2);
if (fixed === "100.00" && percent < 100) {
fixed = "99.99";
}
console.log(fixed + "% :", Object.keys(arguments).map(k => arguments[k]).join(", "));
logger.log(fixed + "% :", Object.keys(arguments).map(k => arguments[k]).join(", "));
};
function timeLog(message) {
clearLog();
var time = new Date();
timeLog(message) {
this.clearLog();
let time = new Date();
function writeMessage() {
process.stdout.write("\r\x1b[K");
process.stdout.write(((new Date() - time) / 1000).toFixed(1) + "s : " + message);
}
writeMessage();
logTimeout = setInterval(writeMessage, 200);
this.writeMessage(time);
this.params.logTimeout = setInterval(() => {
this.writeMessage(time);
}, 200);
}
function normalLog(message) {
clearLog();
console.log(message);
writeMessage(time) {
process.stdout.write("\r\x1b[K");
process.stdout.write(((new Date() - time) / 1000).toFixed(1) + "s : " + message);
}
normalLog(message) {
this.clearLog();
logger.log(message);
}
/***************************************
* End fancy console log status thingy *
***************************************/
function saveProgress(data, timestamp) {
dynamodb.merge(tableName, botId, {
/**
* Save the current progress data for the snapshot
* @param data
* @param timestamp
*/
saveProgress(data, timestamp) {
leoaws.dynamodb.merge(tableName, this.params.botId, {
snapshot: Object.assign({

@@ -59,114 +112,157 @@ last_run: moment.now(),

}, data)
}, function(err, result) {
if (err) {
console.log(err);
callback(err);
process.exit();
}
}).catch(err => {
throw new Error(err);
});
}
dynamodb.get(tableName, botId, function(err, result) {
if (err) {
callback(err);
} else {
// reuse an existing bucket key if we’re resuming, otherwise create a new one.
let timestamp = moment(result && result.snapshot && !result.snapshot.complete && result.snapshot.bucket_timestamp || undefined),
bucketKey = timestamp.format('YYYY/MM_DD_') + timestamp.valueOf();
let stream = nibbler(client, table, id, {
limit: 5000,
resume: result && result.snapshot && !result.snapshot.complete && result.snapshot
});
stream.destroy = stream.destroy || stream.close || (() => {});
/**
* nibble through the database
* @param result
* @returns {pass}
*/
nibble(result) {
let self = this;
// reuse an existing bucket key if we’re resuming, otherwise create a new one.
this.params.timestamp = moment(result && result.snapshot && !result.snapshot.complete && result.snapshot.bucket_timestamp || undefined);
stream.on("ranged", function(n) {
nibble = n;
saveProgress(nibble, timestamp);
});
let transform;
if (Array.isArray(id)) {
transform = loaderJoin(client, id, null, domain, {
this.params.resume = result && result.snapshot && !result.snapshot.complete && result.snapshot;
let stream = nibbler(this.params.connector, this.params.table, this.params.pk, this.params);
stream.destroy = stream.destroy || stream.close || (() => {});
stream.on("ranged", function (n) {
self.params.nibble = n;
self.saveProgress(self.params.nibble, self.params.timestamp);
});
stream.on('end', () => {
clearTimeout(self.params.streamTimeout);
});
this.params.streamTimeout = setTimeout(() => {
stream.stop();
}, Math.min(2147483647, moment.duration(self.params.duration || {
seconds: process.__config.registry.context.getRemainingTimeInMillis() * 0.8
}).asMilliseconds()));
return stream;
}
/**
* Format the payload, include a dummy database.
* @todo support multiple databases.
* @returns {*}
*/
format() {
return ls.through((obj, done) => {
let payload = obj.payload[this.params.table];
// turn this into a format the domainObjectTransform is expecting, complete with the dummy database name
obj = {
ids: {
__database__: payload
},
correlation_id: {
source: 'snapshot',
isSnapshot: true,
id: botId
units: payload.length,
start: payload[0],
end: payload[payload.length - 1]
}
};
done(null, obj);
});
}
// write out the results
/**
*
* @param {string} botId
* @param {string} destination
* @returns {*}
*/
write(botId, destination) {
this.params.destination = destination;
return ls.pipeline(
ls.through((event, done)=>{
event.id = botId;
event.timestamp = Date.now();
event.event_source_timestamp = Date.now();
done(null, event);
}),
this.writeToS3(),
this.writeToLeo(),
this.writeCheckpoint()
);
}
/**
* Write the events to a gzip file and upload to S3
* @returns {*}
*/
writeToS3() {
let bucketKey = this.params.timestamp.format('YYYY/MM_DD_') + this.params.timestamp.valueOf();
return leostream.toS3GzipChunks(this.params.destination, {
useS3Mode: true,
time: {
minutes: 1
},
prefix: "_snapshot/" + bucketKey,
sectionCount: 30
});
}
/**
* Write the snapshot event to leo
* @returns {*}
*/
writeToLeo() {
return leostream.toLeo('snapshotter', {
snapshot: this.params.timestamp.valueOf()
});
}
/**
* Send a checkpoint command to the kinesis reader to checkpoint at the end.
*/
writeCheckpoint() {
return leostream.cmd("checkpoint", (obj, done) => {
if (obj.correlations) {
let records = 0;
obj.correlations.forEach(c => {
this.params.nibble.start = c.snapshot.end;
this.params.nibble.progress += c.snapshot.records;
records += c.snapshot.records;
});
} else {
transform = loader(client, (obj) => {
return [obj[table]];
}, domain, {
source: 'snapshot',
isSnapshot: true,
id: botId
});
}
transform.destroy = transform.destroy || transform.close || (() => {});
this.log(`Processed ${records} ${this.params.nibble.progress}/${this.params.nibble.total}. Remaining ${this.params.nibble.total - this.params.nibble.progress}`);
if (this.params.nibble.end == this.params.nibble.start) {
this.params.nibble.complete = true;
this.saveProgress(this.params.nibble);
let timeout = setTimeout(() => {
stream.stop();
}, moment.duration({
seconds: 180
}).asMilliseconds());
let closeStream = ls.pipeline(leostream.toLeo("snapshotter", {
snapshot: this.params.timestamp.valueOf()
}), ls.devnull());
ls.pipe(stream,
transform,
ls.toS3GzipChunks(opts.event, {
useS3Mode: true,
time: {
minutes: 1
},
prefix: "_snapshot/" + bucketKey,
sectionCount: 30
}),
ls.toLeo("snapshotter", {
snapshot: timestamp.valueOf()
}),
ls.cmd("checkpoint", (obj, done) => {
if (obj.correlations) {
let records = 0;
obj.correlations.forEach(c => {
nibble.start = c.snapshot.end;
nibble.progress += c.snapshot.records;
records += c.snapshot.records;
});
log(`Processed ${records} ${nibble.progress}/${nibble.total}. Remaining ${nibble.total-nibble.progress}`);
if (nibble.end == nibble.start) {
nibble.complete = true;
saveProgress(nibble);
let closeStream = ls.pipeline(ls.toLeo("snapshotter", {
snapshot: timestamp.valueOf()
}), ls.devnull());
closeStream.write({
_cmd: 'registerSnapshot',
event: opts.event,
start: timestamp.valueOf(),
next: timestamp.clone().startOf('day').valueOf(),
id: botId
});
closeStream.on("finish", done);
closeStream.end();
} else {
saveProgress(nibble, timestamp);
done();
}
} else {
done();
closeStream.write({
_cmd: 'registerSnapshot',
event: this.params.destination,
start: this.params.timestamp.valueOf(),
next: this.params.timestamp.clone().startOf('day').valueOf(),
id: this.params.botId
});
closeStream.on("finish", done);
closeStream.end();
} else {
if (this.params.runUntilComplete) {
leo.bot.runAgain();
}
}),
ls.devnull(),
(err) => {
clearTimeout(timeout);
if (err) {
console.log(err);
stream.destroy();
transform.destroy();
callback(err);
} else {
if (!nibble.complete) {
leo.bot.runAgain();
}
callback(null);
}
});
}
});
this.saveProgress(this.params.nibble, this.params.timestamp);
done();
}
} else {
done();
}
});
}
};
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc