leo-connector-common
Advanced tools
Comparing version 1.6.0 to 2.0.0-189-g593f905
let leo = require("leo-sdk"); | ||
const aws = require("aws-sdk"); | ||
const checksum = require("./lib/checksumNibbler.js"); | ||
let dynamodb = leo.aws.dynamodb; | ||
const leoaws = require('leo-aws'); | ||
let cron = leo.bot; | ||
@@ -23,15 +23,7 @@ | ||
function saveProgress(systemId, botId, data) { | ||
return new Promise((resolve, reject) => { | ||
dynamodb.merge(tableName, botId, { | ||
checksum: data, | ||
system: { | ||
id: systemId | ||
} | ||
}, function(err, result) { | ||
if (err) { | ||
reject(err); | ||
} else { | ||
resolve(data); | ||
} | ||
}); | ||
return leoaws.dynamodb.merge(tableName, botId, { | ||
checksum: data, | ||
system: { | ||
id: systemId | ||
} | ||
}); | ||
@@ -88,20 +80,14 @@ } | ||
} else { | ||
return new Promise((resolve, reject) => { | ||
logger.log("Getting Session", systemId, botId); | ||
dynamodb.get(tableName, botId, function(err, result) { | ||
if (err) { | ||
reject(err); | ||
} else { | ||
try { | ||
let session = emptySession; | ||
if (result && result.checksum && result.checksum.restart !== true && result.checksum.status !== 'complete') { | ||
session = result.checksum; | ||
} | ||
resolve(session); | ||
} catch (err) { | ||
reject(err); | ||
} | ||
logger.log("Getting Session", systemId, botId); | ||
return leoaws.dynamodb.get(tableName, botId) | ||
.then(result => { | ||
let session = emptySession; | ||
if (result && result.checksum && result.checksum.restart !== true && result.checksum.status !== 'complete') { | ||
session = result.checksum; | ||
} | ||
return session; | ||
}); | ||
}); | ||
} | ||
@@ -108,0 +94,0 @@ }, |
@@ -5,4 +5,3 @@ "use strict"; | ||
const moment = require("moment"); | ||
require("moment-timezone"); | ||
const logger = require("leo-sdk/lib/logger")("leo-checksum.basic"); | ||
const logger = require("leo-logger")("leo-checksum.basic"); | ||
const Stream = require('stream').Stream; | ||
@@ -9,0 +8,0 @@ |
@@ -5,3 +5,3 @@ "use strict"; | ||
let logger = require("leo-sdk/lib/logger")("leo-checksum.nibbler"); | ||
let logger = require("leo-logger")("leo-checksum.nibbler"); | ||
@@ -8,0 +8,0 @@ module.exports = function(local, remote, opts) { |
@@ -1,3 +0,3 @@ | ||
var async = require("async"); | ||
let logger = require("leo-sdk/lib/logger")("leo-nibbler"); | ||
const async = require("async"); | ||
const logger = require("leo-logger")("leo-nibbler"); | ||
@@ -4,0 +4,0 @@ /** |
1019
dol.js
@@ -0,10 +1,45 @@ | ||
'use strict'; | ||
const leo = require("leo-sdk"); | ||
const ls = leo.streams; | ||
const async = require("async"); | ||
const logger = require('leo-logger'); | ||
module.exports = function DomainObjectLoader(client) { | ||
let self = this; | ||
return { | ||
translateIds: function(translations, opts) { | ||
opts = Object.assign({ | ||
function respectDomainIdOrder(domainIdColumn, id) { | ||
if (typeof id === 'object') { | ||
return domainIdColumn.reduce((treatedId, curId) => { | ||
treatedId[curId] = id[curId]; | ||
return treatedId; | ||
}, {}); | ||
} | ||
return id; | ||
} | ||
module.exports = class Dol { | ||
constructor(client) { | ||
this.client = client; | ||
} | ||
translateIds(translations, domainIdColumn, opts) { | ||
if (typeof domainIdColumn === 'object' && !Array.isArray(domainIdColumn) && !opts) { | ||
opts = Object.assign({}, domainIdColumn); | ||
domainIdColumn = undefined; | ||
} | ||
opts = Object.assign({ | ||
count: 1000, | ||
time: { | ||
milliseconds: 200 | ||
} | ||
}, opts); | ||
return ls.pipeline( | ||
this.translateIdsStartStream(translations, domainIdColumn), | ||
ls.batch({ | ||
count: opts.count, | ||
time: opts.time | ||
}), | ||
this.translateIdsLookupStream(translations), | ||
ls.batch({ | ||
count: 1000, | ||
@@ -14,303 +49,313 @@ time: { | ||
} | ||
}, opts); | ||
return ls.pipeline( | ||
translateIdsStartStream(translations), | ||
ls.batch({ | ||
count: opts.count, | ||
time: opts.time | ||
}), | ||
translateIdsLookupStream(client, translations), | ||
ls.batch({ | ||
count: 1000, | ||
time: { | ||
milliseconds: 200 | ||
} | ||
}), | ||
translateIdsCombineStream() | ||
); | ||
}, | ||
domainObjectTransform: function(domainObject) { | ||
if (typeof domainObject.get === "function") { | ||
domainObject = domainObject.get(); | ||
} | ||
domainObject = Object.assign({ | ||
domainIdColumn: "_domain_id", | ||
query: "select * from dual limit 1", | ||
joins: {} | ||
}, domainObject); | ||
}), | ||
this.translateIdsCombineStream() | ||
); | ||
} | ||
domainObject.sql = queryToFunction(domainObject.sql || domainObject.query, ["data"]); | ||
Object.values(domainObject.joins).map(v => { | ||
v.sql = queryToFunction(v.sql || v.query, ["data"]); | ||
}); | ||
domainObjectTransform(domainObject) { | ||
if (typeof domainObject.get === "function") { | ||
domainObject = domainObject.get(); | ||
} | ||
domainObject = Object.assign({ | ||
domainIdColumn: "_domain_id", | ||
query: "select * from dual limit 1", | ||
joins: {} | ||
}, domainObject); | ||
//We can jump in at this point if we just have a join table to use without going through the above...otherwise it assumes we have a list of ids | ||
//Lets do actual Domain lookups | ||
return ls.through((obj, done, push) => { | ||
let addCorrelation = (result, final = false, units) => { | ||
if (final) { | ||
result.correlation_id = { | ||
source: obj.correlation_id.source, | ||
start: obj.correlation_id.start || obj.correlation_id.end, | ||
end: obj.correlation_id.end, | ||
units: units | ||
}; | ||
} else { | ||
result.correlation_id = { | ||
source: obj.correlation_id.source, | ||
partial_start: obj.correlation_id.start || obj.correlation_id.end || obj.correlation_id.partial, | ||
partial_end: obj.correlation_id.start ? (obj.correlation_id.partial || obj.correlation_id.end) : obj.correlation_id.end, | ||
units: units | ||
}; | ||
} | ||
push(result); | ||
}; | ||
domainObject.sql = this.queryToFunction(domainObject.sql || domainObject.query, ["data"]); | ||
Object.values(domainObject.joins).map(v => { | ||
v.sql = this.queryToFunction(v.sql || v.query, ["data"]); | ||
}); | ||
if (obj.joinTable) { | ||
done("joinTable is not implemented"); | ||
} else if (obj.ids) { | ||
//Query for these domain Objects | ||
//We can jump in at this point if we just have a join table to use without going through the above...otherwise it assumes we have a list of ids | ||
//Lets do actual Domain lookups | ||
return ls.through((obj, done, push) => { | ||
let addCorrelation = (result, final = false, units) => { | ||
if (final) { | ||
result.correlation_id = { | ||
source: obj.correlation_id.source, | ||
start: obj.correlation_id.start || obj.correlation_id.end, | ||
end: obj.correlation_id.end, | ||
units: units | ||
}; | ||
} else { | ||
result.correlation_id = { | ||
source: obj.correlation_id.source, | ||
partial_start: obj.correlation_id.start || obj.correlation_id.end || obj.correlation_id.partial, | ||
partial_end: obj.correlation_id.start ? (obj.correlation_id.partial || obj.correlation_id.end) : obj.correlation_id.end, | ||
units: units | ||
}; | ||
} | ||
push(result); | ||
}; | ||
buildDomainObject(client, domainObject, obj.ids, addCorrelation, (err, results = []) => { | ||
if (err) { | ||
console.log(JSON.stringify(obj, null, 2)); | ||
return done(err); | ||
} | ||
if (results.length) { | ||
let i = 0; | ||
for (; i < results.length - 1; i++) { | ||
let result = results[i]; | ||
push(result); | ||
} | ||
if (obj.joinTable) { | ||
done("joinTable is not implemented"); | ||
} else if (obj.ids) { | ||
//Query for these domain Objects | ||
let lastResult = results[i]; | ||
addCorrelation(lastResult, true); | ||
addCorrelation({ | ||
dont_write: true, | ||
payload: {} | ||
}, true, -1); | ||
} else { | ||
addCorrelation({ | ||
dont_write: true, | ||
payload: {} | ||
}, true); | ||
addCorrelation({ | ||
dont_write: true, | ||
payload: {} | ||
}, true, -1); | ||
this.buildDomainObject(domainObject, obj.ids, addCorrelation, (err, results = []) => { | ||
if (err) { | ||
logger.error(JSON.stringify(obj, null, 2)); | ||
return done(err); | ||
} | ||
if (results.length) { | ||
let i = 0; | ||
for (; i < results.length - 1; i++) { | ||
let result = results[i]; | ||
push(result); | ||
} | ||
done(); | ||
}); | ||
} else { | ||
addCorrelation({ | ||
dont_write: true, | ||
payload: {} | ||
}, true); | ||
let lastResult = results[i]; | ||
addCorrelation(lastResult, true); | ||
addCorrelation({ | ||
dont_write: true, | ||
payload: {} | ||
}, true, -1); | ||
} else { | ||
addCorrelation({ | ||
dont_write: true, | ||
payload: {} | ||
}, true); | ||
addCorrelation({ | ||
dont_write: true, | ||
payload: {} | ||
}, true, -1); | ||
} | ||
done(); | ||
} | ||
}); | ||
}, | ||
DomainObject: function(query, domainIdColumn = "_domain_id", transform) { | ||
if (typeof domainIdColumn === "function") { | ||
transform = domainIdColumn; | ||
domainIdColumn = "_domain_id"; | ||
}); | ||
} else { | ||
addCorrelation({ | ||
dont_write: true, | ||
payload: {} | ||
}, true); | ||
done(); | ||
} | ||
this.query = query; | ||
this.domainIdColumn = domainIdColumn; | ||
this.transform = transform; | ||
this.joins = {}; | ||
this.hasMany = function(name, query, domainIdColumn = "_domain_id", transform) { | ||
if (typeof query === "object") { | ||
this.joins[name] = query; | ||
return; | ||
} | ||
}); | ||
} | ||
if (typeof domainIdColumn === "function") { | ||
transform = domainIdColumn; | ||
domainIdColumn = "_domain_id"; | ||
} | ||
domainObject(query, domainIdColumn = "_domain_id", transform) { | ||
if (typeof domainIdColumn === "function") { | ||
transform = domainIdColumn; | ||
domainIdColumn = "_domain_id"; | ||
} | ||
this.query = query; | ||
this.domainIdColumn = domainIdColumn; | ||
this.transform = transform; | ||
this.joins = {}; | ||
this.joins[name] = { | ||
query, | ||
domainIdColumn, | ||
transform, | ||
type: "one_to_many" | ||
}; | ||
return this; | ||
}; | ||
return this; | ||
} | ||
hasMany (name, query, domainIdColumn = "_domain_id", transform) { | ||
if (typeof query === "object") { | ||
this.joins[name] = query; | ||
return; | ||
} | ||
}; | ||
}; | ||
if (typeof domainIdColumn === "function") { | ||
transform = domainIdColumn; | ||
domainIdColumn = "_domain_id"; | ||
} | ||
this.joins[name] = { | ||
query, | ||
domainIdColumn, | ||
transform, | ||
type: "one_to_many" | ||
}; | ||
return this; | ||
} | ||
function translateIdsStartStream(idTranslation) { | ||
// let bufferIds = {}; | ||
// let lastFullEid = null; | ||
/** | ||
* Translate id's from a database listener | ||
* formats: | ||
* mysql: payload.update.database.table.ids | ||
* payload.delete.database.table.ids | ||
* other databases: payload.table.ids - converted to payload.update.__database__.table.ids | ||
* @param idTranslation | ||
*/ | ||
translateIdsStartStream(idTranslation, domainIdColumn) { | ||
return ls.through((obj, done, push) => { | ||
if (!obj.payload.update) { | ||
return done(null, { | ||
correlation_id: { | ||
source: obj.event, | ||
start: obj.eid | ||
} | ||
}); | ||
} | ||
let last = null; | ||
let count = 0; | ||
let updates = obj.payload.update; | ||
for (let schema in updates) { | ||
for (var t in idTranslation) { | ||
let ids = updates[schema][t]; | ||
if (!ids) { | ||
continue; | ||
} | ||
ids = Array.from(new Set(ids)); // Dedub the ids | ||
for (var i = 0; i < ids.length; i++) { | ||
if (count) push(last); | ||
last = { | ||
s: schema, | ||
t, | ||
id: ids[i], | ||
return ls.through((obj, done, push) => { | ||
if (!obj.payload.update) { | ||
// handle data from listeners other than mysql | ||
if (obj.payload && !obj.payload.delete && Object.keys(obj.payload).length) { | ||
obj.payload = { | ||
update: { | ||
// add a dummy database, which will be removed on the domain object final output | ||
__database__: obj.payload | ||
} | ||
}; | ||
} else { | ||
return done(null, { | ||
correlation_id: { | ||
source: obj.event, | ||
//start: lastFullEid | ||
partial: obj.eid, | ||
units: 1 | ||
start: obj.eid | ||
} | ||
}; | ||
count++; | ||
}); | ||
} | ||
} | ||
} | ||
// lastFullEid = obj.eid; | ||
if (last) { | ||
count++; | ||
last.correlation_id = { | ||
source: obj.event, | ||
start: obj.eid, | ||
units: 1 | ||
}; | ||
done(null, last); | ||
} else { | ||
done(null, { | ||
correlation_id: { | ||
let last = null; | ||
let count = 0; | ||
let updates = obj.payload.update; | ||
for (let schema in updates) { | ||
for (let t in idTranslation) { | ||
let ids = updates[schema][t]; | ||
if (!ids) { | ||
continue; | ||
} | ||
ids = Array.from(new Set(ids)); // Dedup the ids | ||
for (let i = 0; i < ids.length; i++) { | ||
const respectfulId = respectDomainIdOrder(domainIdColumn, ids[i]); | ||
if (count) push(last); | ||
last = { | ||
s: schema, | ||
t, | ||
id: respectfulId, | ||
correlation_id: { | ||
source: obj.event, | ||
partial: obj.eid, | ||
units: 1 | ||
} | ||
}; | ||
count++; | ||
} | ||
} | ||
} | ||
if (last) { | ||
last.correlation_id = { | ||
source: obj.event, | ||
start: obj.eid, | ||
units: 1 | ||
} | ||
}); | ||
} | ||
}); | ||
} | ||
function translateIdsLookupStream(client, idTranslation) { | ||
// let lastFullEid = null; | ||
let handlers = {}; | ||
Object.keys(idTranslation).map(v => { | ||
let translation = idTranslation[v]; | ||
if (translation === true) { | ||
handlers[v] = (data, done) => { | ||
done(null, data.ids); | ||
}; | ||
} else if (typeof translation === "string" || (typeof translation === "function" && translation.length <= 1)) { | ||
let queryFn = queryToFunction(translation, ["data"]); | ||
handlers[v] = function(data, done) { | ||
let query = queryFn.call(this, data); | ||
this.client.query(query, [data.ids], (err, rows) => { | ||
done(err, rows && rows.map(r => r[0])); | ||
}, { | ||
inRowMode: true | ||
}; | ||
done(null, last); | ||
} else { | ||
done(null, { | ||
correlation_id: { | ||
source: obj.event, | ||
start: obj.eid, | ||
units: 1 | ||
} | ||
}); | ||
}; | ||
} else if (typeof translation === "function") { | ||
handlers[v] = translation; | ||
} | ||
}); | ||
} | ||
}); | ||
} | ||
return ls.through((obj, done, push) => { | ||
let ids = {}; | ||
let p = obj.payload; | ||
let startEid; | ||
let endEid; | ||
let partialEid; | ||
for (let i = 0; i < p.length; i++) { | ||
let record = p[i]; | ||
if (record.id) { | ||
if (!(record.s in ids)) { | ||
ids[record.s] = Object.keys(idTranslation).reduce((acc, v) => (acc[v] = new Set()) && acc, {}); | ||
} | ||
ids[record.s][record.t].add(record.id); | ||
translateIdsLookupStream(idTranslation) { | ||
let handlers = {}; | ||
Object.keys(idTranslation).map(v => { | ||
let translation = idTranslation[v]; | ||
if (translation === true) { | ||
handlers[v] = (data, done) => { | ||
done(null, data.ids); | ||
}; | ||
} else if (typeof translation === "string" || (typeof translation === "function" && translation.length <= 1)) { | ||
let queryFn = this.queryToFunction(translation, ["data"]); | ||
handlers[v] = function (data, done) { | ||
let query = queryFn.call(this, data); | ||
this.client.query(query, [data.ids], (err, rows) => { | ||
done(err, rows && rows.map(r => r[0])); | ||
}, { | ||
inRowMode: true | ||
}); | ||
}; | ||
} else if (typeof translation === "function") { | ||
handlers[v] = translation; | ||
} | ||
}); | ||
return ls.through((obj, done, push) => { | ||
let ids = {}; | ||
let p = obj.payload; | ||
let startEid; | ||
let endEid; | ||
let partialEid; | ||
if (record.correlation_id.start) { | ||
if (!startEid) { | ||
startEid = record.correlation_id.start; | ||
for (let i = 0; i < p.length; i++) { | ||
let record = p[i]; | ||
if (record.id) { | ||
if (!(record.s in ids)) { | ||
ids[record.s] = Object.keys(idTranslation).reduce((acc, v) => (acc[v] = new Set()) && acc, {}); | ||
} | ||
ids[record.s][record.t].add(record.id); | ||
} | ||
partialEid = record.correlation_id.partial; | ||
endEid = record.correlation_id.start || endEid; | ||
} else if (record.correlation_id.partial) { | ||
partialEid = record.correlation_id.partial; | ||
if (record.correlation_id.start) { | ||
if (!startEid) { | ||
startEid = record.correlation_id.start; | ||
} | ||
partialEid = record.correlation_id.partial; | ||
endEid = record.correlation_id.start || endEid; | ||
} else if (record.correlation_id.partial) { | ||
partialEid = record.correlation_id.partial; | ||
} | ||
} | ||
//console.log((record.correlation_id.start ? "start" : ""), record.correlation_id.partial ? "partial" : "", ":", record.correlation_id.start, record.correlation_id.partial); | ||
} | ||
let tasks = []; | ||
let domainIds = []; | ||
Object.keys(ids).forEach(schema => { | ||
tasks.push(done => { | ||
let subTasks = []; | ||
Object.keys(ids[schema]).forEach(t => { | ||
let lookupIds = Array.from(ids[schema][t]); | ||
if (lookupIds && lookupIds.length) { | ||
let handler = handlers[t] || ((ids, d) => d()); | ||
let tasks = []; | ||
let domainIds = []; | ||
Object.keys(ids).forEach(schema => { | ||
tasks.push(done => { | ||
let subTasks = []; | ||
Object.keys(ids[schema]).forEach(t => { | ||
let lookupIds = Array.from(ids[schema][t]); | ||
if (lookupIds && lookupIds.length) { | ||
let handler = handlers[t] || ((ids, d) => d()); | ||
subTasks.push(done => { | ||
let context = { | ||
database: schema, | ||
schema: schema, | ||
table: t, | ||
client: this.client, | ||
ids: lookupIds, | ||
done: done | ||
}; | ||
subTasks.push(done => { | ||
let context = { | ||
database: schema, | ||
schema: schema, | ||
table: t, | ||
client: client, | ||
ids: lookupIds, | ||
done: done | ||
}; | ||
handler.call(context, context, (err, ids = []) => { | ||
if (err) { | ||
done(err); | ||
} else { | ||
ids.map(id => { | ||
domainIds.push({ | ||
s: schema, | ||
id: id | ||
handler.call(context, context, (err, ids = []) => { | ||
if (err) { | ||
done(err); | ||
} else { | ||
ids.map(id => { | ||
domainIds.push({ | ||
s: schema, | ||
id: id | ||
}); | ||
}); | ||
}); | ||
done(); | ||
} | ||
done(); | ||
} | ||
}); | ||
}); | ||
}); | ||
} | ||
} | ||
}); | ||
async.parallelLimit(subTasks, 10, (err) => { | ||
done(err); | ||
}); | ||
}); | ||
async.parallelLimit(subTasks, 10, (err) => { | ||
done(err); | ||
}); | ||
}); | ||
}); | ||
async.parallelLimit(tasks, 4, (err) => { | ||
if (err) { | ||
return done(err); | ||
} else if (!domainIds.length) { | ||
async.parallelLimit(tasks, 4, (err) => { | ||
if (err) { | ||
return done(err); | ||
} else if (!domainIds.length) { | ||
// TODO: should we set lastFullEid? | ||
return done(null, { | ||
correlation_id: { | ||
// TODO: should we set lastFullEid? | ||
return done(null, { | ||
correlation_id: { | ||
source: obj.correlation_id.source, | ||
start: startEid, | ||
end: endEid, | ||
partial: partialEid, | ||
units: p.length | ||
} | ||
}); | ||
} | ||
let i = 0; | ||
for (; i < domainIds.length - 1; i++) { | ||
domainIds[i].correlation_id = { | ||
source: obj.correlation_id.source, | ||
//start: obj.correlation_id.end || obj.correlation_id.start, | ||
start: startEid, | ||
@@ -320,12 +365,8 @@ end: endEid, | ||
units: p.length | ||
//units: obj.correlation_id.units | ||
} | ||
}); | ||
} | ||
let i = 0; | ||
for (; i < domainIds.length - 1; i++) { | ||
}; | ||
push(domainIds[i]); | ||
} | ||
domainIds[i].correlation_id = { | ||
source: obj.correlation_id.source, | ||
// start: lastFullEid, | ||
// units: obj.correlation_id.units | ||
start: startEid, | ||
@@ -336,239 +377,228 @@ end: endEid, | ||
}; | ||
push(domainIds[i]); | ||
} | ||
// lastFullEid = obj.correlation_id.end || obj.correlation_id.start; | ||
domainIds[i].correlation_id = { | ||
source: obj.correlation_id.source, | ||
//start: lastFullEid, | ||
//units: obj.correlation_id.units || 1 | ||
start: startEid, | ||
end: endEid, | ||
partial: partialEid, | ||
units: p.length | ||
}; | ||
done(null, domainIds[i]); | ||
done(null, domainIds[i]); | ||
}); | ||
}); | ||
}); | ||
} | ||
} | ||
function translateIdsCombineStream() { | ||
return ls.through((obj, done) => { | ||
let startEid; | ||
let endEid; | ||
let partialEid; | ||
//let units = 0; | ||
translateIdsCombineStream() { | ||
return ls.through((obj, done) => { | ||
let startEid; | ||
let endEid; | ||
let partialEid; | ||
let ids = {}; | ||
for (var i = 0; i < obj.payload.length; i++) { | ||
let p = obj.payload[i]; | ||
if (p.s !== undefined && p.id !== undefined) { | ||
if (!(p.s in ids)) { | ||
ids[p.s] = new Set(); | ||
let ids = {}; | ||
for (let i = 0; i < obj.payload.length; i++) { | ||
let p = obj.payload[i]; | ||
if (p.s !== undefined && p.id !== undefined) { | ||
if (!(p.s in ids)) { | ||
ids[p.s] = new Set(); | ||
} | ||
ids[p.s].add(p.id); | ||
} | ||
ids[p.s].add(p.id); | ||
} | ||
let record = p; | ||
if (record.correlation_id.start) { | ||
if (!startEid) { | ||
startEid = record.correlation_id.start; | ||
let record = p; | ||
if (record.correlation_id.start) { | ||
if (!startEid) { | ||
startEid = record.correlation_id.start; | ||
} | ||
partialEid = record.correlation_id.partial; | ||
endEid = record.correlation_id.start || endEid; | ||
} else if (record.correlation_id.partial) { | ||
partialEid = record.correlation_id.partial; | ||
} | ||
partialEid = record.correlation_id.partial; | ||
endEid = record.correlation_id.start || endEid; | ||
//units += (record.correlation_id.units || record.correlation_id.records || 1); | ||
} else if (record.correlation_id.partial) { | ||
partialEid = record.correlation_id.partial; | ||
} | ||
Object.keys(ids).map(k => { | ||
ids[k] = Array.from(ids[k]); | ||
}); | ||
//console.log("combine", (record.correlation_id.start ? "start" : ""), record.correlation_id.partial ? "partial" : "", ":", record.correlation_id.start, record.correlation_id.partial); | ||
} | ||
Object.keys(ids).map(k => { | ||
ids[k] = Array.from(ids[k]); | ||
done(null, { | ||
ids: ids, | ||
correlation_id: { | ||
source: obj.correlation_id.source, | ||
start: startEid, | ||
end: endEid, | ||
partial: partialEid, | ||
units: obj.payload.length | ||
} | ||
}); | ||
}); | ||
} | ||
done(null, { | ||
ids: ids, | ||
correlation_id: { | ||
source: obj.correlation_id.source, | ||
//start: obj.correlation_id.start || obj.correlation_id.end, | ||
//units: obj.correlation_id.units | ||
start: startEid, | ||
end: endEid, | ||
partial: partialEid, | ||
units: obj.payload.length | ||
} | ||
}); | ||
}); | ||
} | ||
buildDomainObject(domainObject, ids, push, callback) { | ||
let opts = {}; | ||
let sqlClient = this.client; | ||
async.eachLimit(Object.entries(ids), 5, ([schema, ids], callback) => { | ||
let tasks = []; | ||
let domains = {}; | ||
let queryIds = []; | ||
logger.log(`Processing ${ids.length} from ${schema}`); | ||
ids.forEach(id => { | ||
// if the id is an object, it's a composite key | ||
if (typeof id === 'object') { | ||
// change the id to be a string with keys separated by a dash | ||
queryIds.push(Object.values(id)); | ||
id = Object.values(id).join('-'); | ||
} else { | ||
queryIds.push(id); | ||
} | ||
function buildDomainObject(client, domainObject, ids, push, callback) { | ||
let opts = {}; | ||
let sqlClient = client; | ||
let joinsCount = Object.keys(domainObject.joins).length; | ||
async.eachLimit(Object.entries(ids), 5, ([schema, ids], callback) => { | ||
let tasks = []; | ||
let domains = {}; | ||
console.log(`Processing ${ids.length} from ${schema}`); | ||
ids.forEach(id => { | ||
domains[id] = {}; | ||
Object.keys(domainObject.joins).forEach(name => { | ||
let t = domainObject.joins[name]; | ||
if (t.type === "one_to_many") { | ||
domains[id] = {}; | ||
Object.keys(domainObject.joins).forEach(name => { | ||
domains[id][name] = []; | ||
} else { | ||
domains[id][name] = {}; | ||
} | ||
}); | ||
}); | ||
}); | ||
function mapResults(results, fields, each) { | ||
let mappings = []; | ||
tasks.push(done => { | ||
if (!domainObject.domainIdColumn) { | ||
logger.error('[FATAL ERROR]: No ID specified'); | ||
} | ||
let last = null; | ||
fields.forEach((f, i) => { | ||
if (last == null) { | ||
last = { | ||
path: null, | ||
start: i | ||
}; | ||
mappings.push(last); | ||
} else if (f.name.match(/^prefix_/)) { | ||
last.end = i; | ||
last = { | ||
path: f.name.replace(/^prefix_/, ''), | ||
start: i + 1 | ||
}; | ||
mappings.push(last); | ||
} | ||
}); | ||
last.end = fields.length; | ||
results.forEach(r => { | ||
//Convert back to object now | ||
let row = {}; | ||
mappings.forEach(m => { | ||
if (m.path === null) { | ||
r.slice(m.start, m.end).forEach((value, i) => { | ||
row[fields[m.start + i].name] = value; | ||
}); | ||
} else if (m.path) { | ||
row[m.path] = r.slice(m.start, m.end).reduce((acc, value, i) => { | ||
acc[fields[m.start + i].name] = value; | ||
return acc; | ||
}, {}); | ||
} | ||
let query = domainObject.sql.call({ | ||
database: schema, | ||
schema: schema, | ||
client: sqlClient | ||
}, { | ||
ids: queryIds, | ||
database: schema, | ||
schema: schema, | ||
client: sqlClient | ||
}); | ||
each(row); | ||
}); | ||
} | ||
tasks.push(done => { | ||
if (!domainObject.domainIdColumn) { | ||
console.log('[FATAL ERROR]: No ID specified'); | ||
} | ||
let query = domainObject.sql.call({ | ||
database: schema, | ||
schema: schema, | ||
client: sqlClient | ||
}, { | ||
ids: ids, | ||
database: schema, | ||
schema: schema, | ||
client: sqlClient | ||
this.buildDomainQuery(domainObject, domains, query, [queryIds], done); | ||
}); | ||
sqlClient.query(query, [ids], (err, results, fields) => { | ||
if (err) return done(err); | ||
mapResults(results, fields, row => { | ||
let domainId = row[domainObject.domainIdColumn]; | ||
if (domainObject.transform) { | ||
row = domainObject.transform(row); | ||
} | ||
delete row._domain_id; | ||
//row._schema = schema; | ||
if (!domainId) { | ||
console.error('ID: "' + domainObject.domainIdColumn + '" not found in object:'); | ||
} else if (!domains[domainId]) { | ||
console.error('ID: "' + domainObject.domainIdColumn + '" with a value of: "' + domainId + '" does not match any ID in the domain object. This could be caused by using a WHERE clause on an ID that differs from the SELECT ID'); | ||
} else { | ||
//We need to keep the domain relationships in tact | ||
domains[domainId] = Object.assign(domains[domainId], row); | ||
} | ||
Object.keys(domainObject.joins).forEach(name => { | ||
let t = domainObject.joins[name]; | ||
t.domainIdColumn = t.domainIdColumn || "_domain_id"; | ||
let query = t.sql.call({ | ||
database: schema, | ||
schema: schema, | ||
client: sqlClient | ||
}, { | ||
ids: queryIds, | ||
database: schema, | ||
schema: schema, | ||
client: sqlClient | ||
}); | ||
done(); | ||
}, { | ||
inRowMode: true | ||
tasks.push(done => { | ||
this.buildJoinQuery(t, name, domains, query, [queryIds], done); | ||
}); | ||
}); | ||
this.processTasks(ids, domains, tasks, domainObject, schema, opts, push, callback); | ||
}, (err) => { | ||
callback(err); | ||
}); | ||
} | ||
mapResults(results, fields, each) { | ||
let mappings = []; | ||
Object.keys(domainObject.joins).forEach(name => { | ||
let t = domainObject.joins[name]; | ||
t.domainIdColumn = t.domainIdColumn || "_domain_id"; | ||
let query = t.sql.call({ | ||
database: schema, | ||
schema: schema, | ||
client: sqlClient | ||
}, { | ||
ids: ids, | ||
database: schema, | ||
schema: schema, | ||
client: sqlClient | ||
let last = null; | ||
fields.forEach((f, i) => { | ||
if (last == null) { | ||
last = { | ||
path: null, | ||
start: i | ||
}; | ||
mappings.push(last); | ||
} else if (f.name.match(/^prefix_/)) { | ||
last.end = i; | ||
last = { | ||
path: f.name.replace(/^prefix_/, ''), | ||
start: i + 1 | ||
}; | ||
mappings.push(last); | ||
} | ||
}); | ||
last.end = fields.length; | ||
results.forEach(r => { | ||
//Convert back to object now | ||
let row = {}; | ||
mappings.forEach(m => { | ||
if (m.path === null) { | ||
r.slice(m.start, m.end).forEach((value, i) => { | ||
row[fields[m.start + i].name] = value; | ||
}); | ||
} else if (m.path) { | ||
row[m.path] = r.slice(m.start, m.end).reduce((acc, value, i) => { | ||
acc[fields[m.start + i].name] = value; | ||
return acc; | ||
}, {}); | ||
} | ||
}); | ||
if (t.type === "one_to_many") { | ||
tasks.push(done => { | ||
sqlClient.query(query, [ids], (err, results, fields) => { | ||
if (err) { | ||
return done(err); | ||
} else if (!results.length) { | ||
return done(); | ||
} | ||
each(row); | ||
}); | ||
} | ||
mapResults(results, fields, row => { | ||
let domainId = row[t.domainIdColumn]; | ||
delete row._domain_id; | ||
processDomainQuery({ transform = a => a, domainIdColumn }, domains, done, err, results, fields) { | ||
if (err) return done(err); | ||
if (t.transform) { | ||
row = t.transform(row); | ||
} | ||
this.mapResults(results, fields, row => { | ||
let domainId = Array.isArray(domainIdColumn)? domainIdColumn.map(i => row[i]).join('-') : row[domainIdColumn]; | ||
row = transform(row); | ||
delete row._domain_id; | ||
domains[domainId][name].push(row); | ||
}); | ||
done(); | ||
}, { | ||
inRowMode: true | ||
}); | ||
}); | ||
if (!domainId) { | ||
logger.error('ID: "' + domainIdColumn + '" not found in object:'); | ||
} else if (!domains[domainId]) { | ||
logger.error('ID: "' + domainIdColumn + '" with a value of: "' + domainId + '" does not match any ID in the domain object. This could be caused by using a WHERE clause on an ID that differs from the SELECT ID'); | ||
} else { | ||
tasks.push(done => { | ||
sqlClient.query(query, [ids], (err, results, fields) => { | ||
if (err) { | ||
return done(err); | ||
} else if (!results.length) { | ||
return done(); | ||
} | ||
mapResults(results, fields, row => { | ||
if (row.length) { | ||
let domainId = row[t.domainIdColumn]; | ||
delete row._domain_id; | ||
if (t.transform) { | ||
row = t.transform(row); | ||
} | ||
domains[domainId][name] = row; | ||
} | ||
}); | ||
done(); | ||
}, { | ||
inRowMode: true | ||
}); | ||
}); | ||
//We need to keep the domain relationships in tact | ||
domains[domainId] = Object.assign(domains[domainId], row); | ||
} | ||
}); | ||
done(); | ||
} | ||
buildDomainQuery(domainObject, domains, query, queryIds, done) { | ||
this.client.query(query, queryIds, (err, results, fields) => { | ||
this.processDomainQuery(domainObject, domains, done, err, results, fields); | ||
}, { | ||
inRowMode: true | ||
}); | ||
} | ||
processJoinQuery({ transform = a => a, domainIdColumn }, name, domains, done, err, results, fields) { | ||
if (err) { | ||
return done(err); | ||
} else if (!results.length) { | ||
return done(); | ||
} | ||
this.mapResults(results, fields, row => { | ||
let domainId = Array.isArray(domainIdColumn)? domainIdColumn.map(i => row[i]).join('-') : row[domainIdColumn]; | ||
delete row._domain_id; | ||
row = transform(row); | ||
domains[domainId][name].push(row); | ||
}); | ||
done(); | ||
} | ||
buildJoinQuery(joinObject, name, domains, query, queryIds, done) { | ||
this.client.query(query, queryIds, (err, results, fields) => { | ||
this.processJoinQuery(joinObject, name, domains, done, err, results, fields); | ||
}, { | ||
inRowMode: true | ||
}); | ||
} | ||
/** | ||
* Process the tasks | ||
* @param ids | ||
* @param domains Array | ||
* @param tasks | ||
* @param domainObject | ||
* @param schema | ||
* @param opts | ||
* @param push | ||
* @param callback | ||
*/ | ||
processTasks(ids, domains, tasks, domainObject, schema, opts, push, callback) { | ||
let limit = 5; | ||
let joinsCount = Object.keys(domainObject.joins).length; | ||
async.parallelLimit(tasks, limit, (err) => { | ||
@@ -578,8 +608,13 @@ if (err) { | ||
} else { | ||
// let getEid = opts.getEid || ((id, obj, stats) => stats.end); | ||
ids.forEach(id => { | ||
// if the id is an object, it's a composite key | ||
if (typeof id === 'object') { | ||
// change the id to be a string with keys separated by a dash | ||
id = Object.values(id).join('-'); | ||
} | ||
// skip the domain if there is no data with it | ||
let keyCount = Object.keys(domains[id]).length; | ||
if (keyCount === 0) { | ||
console.log('[INFO] Skipping domain id due to empty object. #: ' + id); | ||
logger.log('[INFO] Skipping domain id due to empty object. #: ' + id); | ||
return; | ||
@@ -591,3 +626,3 @@ } else if (keyCount <= joinsCount) { | ||
if (!valid) { | ||
console.log('[INFO] Skipping domain id due to empty object. #: ' + id); | ||
logger.log('[INFO] Skipping domain id due to empty object. #: ' + id); | ||
return; | ||
@@ -605,19 +640,25 @@ } | ||
}; | ||
// remove the schema if it's a dummy we added in translate ids | ||
if (event.schema === '__database__') { | ||
delete event.schema; | ||
} | ||
push(event); | ||
}); | ||
callback(); | ||
} | ||
}); | ||
}, (err) => { | ||
callback(err); | ||
}); | ||
} | ||
} | ||
function queryToFunction(query, params) { | ||
if (typeof query === "function") { | ||
return query; | ||
} else if (typeof query === "string") { | ||
params.push(`return \`${query}\`;`); | ||
return Function.apply(this, params); | ||
queryToFunction(query, params) { | ||
if (typeof query === "function") { | ||
return query; | ||
} else if (typeof query === "string") { | ||
params.push(`return \`${query}\`;`); | ||
return Function.apply(this, params); | ||
} | ||
} | ||
} | ||
}; |
{ | ||
"name": "leo-connector-common", | ||
"version": "1.6.0", | ||
"version": "2.0.0-189-g593f905", | ||
"description": "Common package for all Leo Platform database connectors", | ||
@@ -27,7 +27,8 @@ "main": "index.js", | ||
"async": "^2.6.1", | ||
"leo-sdk": "^2.0.1", | ||
"leo-aws": "^1.4.0", | ||
"leo-logger": ">=1.0.0", | ||
"leo-sdk": "^2.2.0", | ||
"moment": "^2.22.2", | ||
"sqlstring": "^2.3.1", | ||
"leo-logger": ">=1.0.0" | ||
"sqlstring": "^2.3.1" | ||
} | ||
} |
@@ -0,55 +1,108 @@ | ||
'use strict'; | ||
const leo = require("leo-sdk"); | ||
const ls = leo.streams; | ||
const leoaws = require('leo-aws'); | ||
const ls = require('leo-streams'); | ||
const leostream = leo.streams; | ||
const moment = require("moment"); | ||
const merge = require('lodash.merge'); | ||
const logger = require('leo-logger'); | ||
const nibbler = require("./nibbler.js"); | ||
const loader = require("./loader.js"); | ||
const loaderJoin = require("./loaderJoinTable.js"); | ||
const moment = require("moment"); | ||
let dynamodb = leo.aws.dynamodb; | ||
const tableName = leo.configuration.resources.LeoCron; | ||
module.exports = function(botId, client, table, id, domain, opts, callback) { | ||
opts = Object.assign({ | ||
event: table, | ||
}, opts || {}); | ||
module.exports = class Snapshotter { | ||
constructor(connector) { | ||
this.params = { | ||
connector: connector, | ||
nibble: null, | ||
logTimeout: null, | ||
runUntilComplete: true, | ||
timestamp: moment(), | ||
botId: process.__config.registry.context.botId | ||
}; | ||
} | ||
let nibble = null; | ||
var logTimeout = null; | ||
/** | ||
* Start snapshot reading | ||
* @param settings | ||
* @returns {*} | ||
*/ | ||
read(settings) { | ||
// add the settings parameters | ||
this.params = merge(this.params, settings); | ||
if (!this.params.table) { | ||
throw new Error('No `table` specified.'); | ||
} else if (!this.params.pk) { | ||
throw new Error('No `pk` specified for the specified table.'); | ||
} else if (!this.params.botId) { | ||
throw new Error('NO botId specified.'); | ||
} | ||
let stream = ls.passthrough({ | ||
objectMode: true, | ||
}); | ||
leoaws.dynamodb.get(tableName, this.params.botId) | ||
.then(result => { | ||
ls.pipe(this.nibble(result), this.format(), stream); | ||
}) | ||
.catch(err => { | ||
throw new Error(err); | ||
}); | ||
return stream; | ||
} | ||
/***************************************** | ||
* Start fancy console log status thingy * | ||
*****************************************/ | ||
//@todo: Update all this to use the log-update node module | ||
function clearLog() { | ||
clearLog() { | ||
process.stdout.write("\r\x1b[K"); | ||
if (logTimeout) clearInterval(logTimeout); | ||
if (this.params.logTimeout) clearInterval(this.params.logTimeout); | ||
} | ||
var log = function() { | ||
clearLog(); | ||
var percent = (nibble.progress / nibble.total) * 100; | ||
var fixed = percent.toFixed(2); | ||
if (fixed == "100.00" && percent < 100) { | ||
log() { | ||
this.clearLog(); | ||
let percent = (this.params.nibble.progress / this.params.nibble.total) * 100; | ||
let fixed = percent.toFixed(2); | ||
if (fixed === "100.00" && percent < 100) { | ||
fixed = "99.99"; | ||
} | ||
console.log(fixed + "% :", Object.keys(arguments).map(k => arguments[k]).join(", ")); | ||
logger.log(fixed + "% :", Object.keys(arguments).map(k => arguments[k]).join(", ")); | ||
}; | ||
function timeLog(message) { | ||
clearLog(); | ||
var time = new Date(); | ||
timeLog(message) { | ||
this.clearLog(); | ||
let time = new Date(); | ||
function writeMessage() { | ||
process.stdout.write("\r\x1b[K"); | ||
process.stdout.write(((new Date() - time) / 1000).toFixed(1) + "s : " + message); | ||
} | ||
writeMessage(); | ||
logTimeout = setInterval(writeMessage, 200); | ||
this.writeMessage(time); | ||
this.params.logTimeout = setInterval(() => { | ||
this.writeMessage(time); | ||
}, 200); | ||
} | ||
function normalLog(message) { | ||
clearLog(); | ||
console.log(message); | ||
writeMessage(time) { | ||
process.stdout.write("\r\x1b[K"); | ||
process.stdout.write(((new Date() - time) / 1000).toFixed(1) + "s : " + message); | ||
} | ||
normalLog(message) { | ||
this.clearLog(); | ||
logger.log(message); | ||
} | ||
/*************************************** | ||
* End fancy console log status thingy * | ||
***************************************/ | ||
function saveProgress(data, timestamp) { | ||
dynamodb.merge(tableName, botId, { | ||
/** | ||
* Save the current progress data for the snapshot | ||
* @param data | ||
* @param timestamp | ||
*/ | ||
saveProgress(data, timestamp) { | ||
leoaws.dynamodb.merge(tableName, this.params.botId, { | ||
snapshot: Object.assign({ | ||
@@ -59,114 +112,157 @@ last_run: moment.now(), | ||
}, data) | ||
}, function(err, result) { | ||
if (err) { | ||
console.log(err); | ||
callback(err); | ||
process.exit(); | ||
} | ||
}).catch(err => { | ||
throw new Error(err); | ||
}); | ||
} | ||
dynamodb.get(tableName, botId, function(err, result) { | ||
if (err) { | ||
callback(err); | ||
} else { | ||
// reuse an existing bucket key if we’re resuming, otherwise create a new one. | ||
let timestamp = moment(result && result.snapshot && !result.snapshot.complete && result.snapshot.bucket_timestamp || undefined), | ||
bucketKey = timestamp.format('YYYY/MM_DD_') + timestamp.valueOf(); | ||
let stream = nibbler(client, table, id, { | ||
limit: 5000, | ||
resume: result && result.snapshot && !result.snapshot.complete && result.snapshot | ||
}); | ||
stream.destroy = stream.destroy || stream.close || (() => {}); | ||
/** | ||
* nibble through the database | ||
* @param result | ||
* @returns {pass} | ||
*/ | ||
nibble(result) { | ||
let self = this; | ||
// reuse an existing bucket key if we’re resuming, otherwise create a new one. | ||
this.params.timestamp = moment(result && result.snapshot && !result.snapshot.complete && result.snapshot.bucket_timestamp || undefined); | ||
stream.on("ranged", function(n) { | ||
nibble = n; | ||
saveProgress(nibble, timestamp); | ||
}); | ||
let transform; | ||
if (Array.isArray(id)) { | ||
transform = loaderJoin(client, id, null, domain, { | ||
this.params.resume = result && result.snapshot && !result.snapshot.complete && result.snapshot; | ||
let stream = nibbler(this.params.connector, this.params.table, this.params.pk, this.params); | ||
stream.destroy = stream.destroy || stream.close || (() => {}); | ||
stream.on("ranged", function (n) { | ||
self.params.nibble = n; | ||
self.saveProgress(self.params.nibble, self.params.timestamp); | ||
}); | ||
stream.on('end', () => { | ||
clearTimeout(self.params.streamTimeout); | ||
}); | ||
this.params.streamTimeout = setTimeout(() => { | ||
stream.stop(); | ||
}, Math.min(2147483647, moment.duration(self.params.duration || { | ||
seconds: process.__config.registry.context.getRemainingTimeInMillis() * 0.8 | ||
}).asMilliseconds())); | ||
return stream; | ||
} | ||
/** | ||
* Format the payload, include a dummy database. | ||
* @todo support multiple databases. | ||
* @returns {*} | ||
*/ | ||
format() { | ||
return ls.through((obj, done) => { | ||
let payload = obj.payload[this.params.table]; | ||
// turn this into a format the domainObjectTransform is expecting, complete with the dummy database name | ||
obj = { | ||
ids: { | ||
__database__: payload | ||
}, | ||
correlation_id: { | ||
source: 'snapshot', | ||
isSnapshot: true, | ||
id: botId | ||
units: payload.length, | ||
start: payload[0], | ||
end: payload[payload.length - 1] | ||
} | ||
}; | ||
done(null, obj); | ||
}); | ||
} | ||
// write out the results | ||
/** | ||
* | ||
* @param {string} botId | ||
* @param {string} destination | ||
* @returns {*} | ||
*/ | ||
write(botId, destination) { | ||
this.params.destination = destination; | ||
return ls.pipeline( | ||
ls.through((event, done)=>{ | ||
event.id = botId; | ||
event.timestamp = Date.now(); | ||
event.event_source_timestamp = Date.now(); | ||
done(null, event); | ||
}), | ||
this.writeToS3(), | ||
this.writeToLeo(), | ||
this.writeCheckpoint() | ||
); | ||
} | ||
/** | ||
* Write the events to a gzip file and upload to S3 | ||
* @returns {*} | ||
*/ | ||
writeToS3() { | ||
let bucketKey = this.params.timestamp.format('YYYY/MM_DD_') + this.params.timestamp.valueOf(); | ||
return leostream.toS3GzipChunks(this.params.destination, { | ||
useS3Mode: true, | ||
time: { | ||
minutes: 1 | ||
}, | ||
prefix: "_snapshot/" + bucketKey, | ||
sectionCount: 30 | ||
}); | ||
} | ||
/** | ||
* Write the snapshot event to leo | ||
* @returns {*} | ||
*/ | ||
writeToLeo() { | ||
return leostream.toLeo('snapshotter', { | ||
snapshot: this.params.timestamp.valueOf() | ||
}); | ||
} | ||
/** | ||
* Send a checkpoint command to the kinesis reader to checkpoint at the end. | ||
*/ | ||
writeCheckpoint() { | ||
return leostream.cmd("checkpoint", (obj, done) => { | ||
if (obj.correlations) { | ||
let records = 0; | ||
obj.correlations.forEach(c => { | ||
this.params.nibble.start = c.snapshot.end; | ||
this.params.nibble.progress += c.snapshot.records; | ||
records += c.snapshot.records; | ||
}); | ||
} else { | ||
transform = loader(client, (obj) => { | ||
return [obj[table]]; | ||
}, domain, { | ||
source: 'snapshot', | ||
isSnapshot: true, | ||
id: botId | ||
}); | ||
} | ||
transform.destroy = transform.destroy || transform.close || (() => {}); | ||
this.log(`Processed ${records} ${this.params.nibble.progress}/${this.params.nibble.total}. Remaining ${this.params.nibble.total - this.params.nibble.progress}`); | ||
if (this.params.nibble.end == this.params.nibble.start) { | ||
this.params.nibble.complete = true; | ||
this.saveProgress(this.params.nibble); | ||
let timeout = setTimeout(() => { | ||
stream.stop(); | ||
}, moment.duration({ | ||
seconds: 180 | ||
}).asMilliseconds()); | ||
let closeStream = ls.pipeline(leostream.toLeo("snapshotter", { | ||
snapshot: this.params.timestamp.valueOf() | ||
}), ls.devnull()); | ||
ls.pipe(stream, | ||
transform, | ||
ls.toS3GzipChunks(opts.event, { | ||
useS3Mode: true, | ||
time: { | ||
minutes: 1 | ||
}, | ||
prefix: "_snapshot/" + bucketKey, | ||
sectionCount: 30 | ||
}), | ||
ls.toLeo("snapshotter", { | ||
snapshot: timestamp.valueOf() | ||
}), | ||
ls.cmd("checkpoint", (obj, done) => { | ||
if (obj.correlations) { | ||
let records = 0; | ||
obj.correlations.forEach(c => { | ||
nibble.start = c.snapshot.end; | ||
nibble.progress += c.snapshot.records; | ||
records += c.snapshot.records; | ||
}); | ||
log(`Processed ${records} ${nibble.progress}/${nibble.total}. Remaining ${nibble.total-nibble.progress}`); | ||
if (nibble.end == nibble.start) { | ||
nibble.complete = true; | ||
saveProgress(nibble); | ||
let closeStream = ls.pipeline(ls.toLeo("snapshotter", { | ||
snapshot: timestamp.valueOf() | ||
}), ls.devnull()); | ||
closeStream.write({ | ||
_cmd: 'registerSnapshot', | ||
event: opts.event, | ||
start: timestamp.valueOf(), | ||
next: timestamp.clone().startOf('day').valueOf(), | ||
id: botId | ||
}); | ||
closeStream.on("finish", done); | ||
closeStream.end(); | ||
} else { | ||
saveProgress(nibble, timestamp); | ||
done(); | ||
} | ||
} else { | ||
done(); | ||
closeStream.write({ | ||
_cmd: 'registerSnapshot', | ||
event: this.params.destination, | ||
start: this.params.timestamp.valueOf(), | ||
next: this.params.timestamp.clone().startOf('day').valueOf(), | ||
id: this.params.botId | ||
}); | ||
closeStream.on("finish", done); | ||
closeStream.end(); | ||
} else { | ||
if (this.params.runUntilComplete) { | ||
leo.bot.runAgain(); | ||
} | ||
}), | ||
ls.devnull(), | ||
(err) => { | ||
clearTimeout(timeout); | ||
if (err) { | ||
console.log(err); | ||
stream.destroy(); | ||
transform.destroy(); | ||
callback(err); | ||
} else { | ||
if (!nibble.complete) { | ||
leo.bot.runAgain(); | ||
} | ||
callback(null); | ||
} | ||
}); | ||
} | ||
}); | ||
this.saveProgress(this.params.nibble, this.params.timestamp); | ||
done(); | ||
} | ||
} else { | ||
done(); | ||
} | ||
}); | ||
} | ||
}; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
2
90777
6
17
3254
1
+ Addedleo-aws@^1.4.0
+ Addedleo-aws@1.6.0(transitive)
+ Addedleo-streams@1.2.1(transitive)
+ Addedpump@3.0.2(transitive)
Updatedleo-sdk@^2.2.0