@live-change/framework
Advanced tools
Comparing version 0.4.42 to 0.4.43
@@ -31,2 +31,6 @@ const crypto = require("crypto") | ||
const commandExecutor = require("./processes/commandExecutor.js") | ||
const triggerExecutor = require("./processes/triggerExecutor.js") | ||
const eventListener = require('./processes/eventListener.js') | ||
const utils = require('./utils.js') | ||
@@ -44,19 +48,24 @@ | ||
this.defaultProcessors = [ | ||
crudGenerator, | ||
draftGenerator, | ||
reverseRelationProcessor, | ||
indexListProcessor, | ||
daoPathView, | ||
fetchView, | ||
accessControl, | ||
autoValidation, | ||
indexCode | ||
crudGenerator, | ||
draftGenerator, | ||
reverseRelationProcessor, | ||
indexListProcessor, | ||
daoPathView, | ||
fetchView, | ||
accessControl, | ||
autoValidation, | ||
indexCode | ||
] | ||
this.defaultUpdaters = [ | ||
databaseUpdater | ||
databaseUpdater | ||
] | ||
this.defaultClientSideFilters = [ | ||
accessControlFilter, | ||
clientSideFilter | ||
accessControlFilter, | ||
clientSideFilter | ||
] | ||
this.defaultProcesses = [ | ||
commandExecutor, | ||
triggerExecutor, | ||
eventListener | ||
] | ||
const dbDao = new ReactiveDao(process.cwd()+' '+process.argv.join(' '), { | ||
@@ -142,3 +151,4 @@ remoteUrl: env.DB_URL || "http://localhost:9417/api/ws", | ||
async startService( serviceDefinition, config ) { | ||
async startService( serviceDefinition, config = {}) { | ||
if(!config.processes) config.processes = this.defaultProcesses | ||
console.log("Starting service", serviceDefinition.name, "!") | ||
@@ -151,3 +161,3 @@ const profileOp = await this.profileLog.begin({ | ||
let service = new Service(serviceDefinition, this) | ||
await service.start(config || {}) | ||
await service.start(config) | ||
console.log("service started", serviceDefinition.name, "!") | ||
@@ -154,0 +164,0 @@ await this.profileLog.end(profileOp) |
@@ -8,9 +8,3 @@ const Model = require("./Model.js") | ||
const TriggerHandler = require("./TriggerHandler.js") | ||
const SearchIndexer = require("./SearchIndexer.js") | ||
const ReactiveDao = require("@live-change/dao") | ||
const EventSourcing = require('../utils/EventSourcing.js') | ||
const CommandQueue = require('../utils/CommandQueue.js') | ||
const KeyBasedExecutionQueues = require('../utils/KeyBasedExecutionQueues.js') | ||
class Service { | ||
@@ -83,7 +77,3 @@ | ||
let promises = [] | ||
if(config.runCommands) promises.push(this.startCommandExecutor()) | ||
if(config.handleEvents) promises.push(this.startEventListener()) | ||
if(config.indexSearch) promises.push(this.startSearchIndexer()) | ||
let promises = config.processes.map(proc => proc(this, config)) | ||
await Promise.all(promises) | ||
@@ -104,333 +94,5 @@ | ||
async startEventListener() { | ||
if(this.app.splitEvents) { | ||
this.eventSourcing = new EventSourcing(this.dao, this.databaseName, | ||
'events_'+this.name, this.name, | ||
{ filter: (event) => event.service == this.name }) | ||
} else { | ||
this.eventSourcing = new EventSourcing(this.dao, this.databaseName, | ||
'events', this.name, | ||
{ filter: (event) => event.service == this.name }) | ||
} | ||
for (let eventName in this.events) { | ||
const event = this.events[eventName] | ||
this.eventSourcing.addEventHandler(eventName, async (ev, bucket) => { | ||
return await this.profileLog.profile({ operation: "handleEvent", eventName, id: ev.id, | ||
bucketId: bucket.id, triggerId: bucket.triggerId, commandId: bucket.commandId }, | ||
() => { | ||
console.log("EXECUTING EVENT", ev) | ||
return event.execute(ev, bucket) | ||
} | ||
) | ||
}) | ||
this.eventSourcing.onBucketEnd = async (bucket, handledEvents) => { | ||
if(bucket.reportFinished && handledEvents.length > 0) { | ||
await this.dao.request(['database', 'update'], this.databaseName, 'eventReports', bucket.reportFinished,[ | ||
{ op: "mergeSets", property: 'finished', values: handledEvents.map(ev => ({ id: ev.id, type: ev.type })) } | ||
]) | ||
} | ||
} | ||
} | ||
this.eventSourcing.start() | ||
} | ||
async startCommandExecutor() { | ||
this.commandQueue = new CommandQueue(this.dao, this.databaseName, | ||
this.app.splitCommands ? `${this.name}_commands` : 'commands', this.name) | ||
this.keyBasedCommandQueues = new KeyBasedExecutionQueues(r => r.key) | ||
for (let actionName in this.actions) { | ||
const action = this.actions[actionName] | ||
if(action.definition.queuedBy) { | ||
const queuedBy = action.definition.queuedBy | ||
const keyFunction = typeof queuedBy == 'function' ? queuedBy : ( | ||
Array.isArray(queuedBy) ? (c) => JSON.stringify(queuedBy.map(k=>c[k])) : | ||
(c) => JSON.stringify(c[queuedBy]) ) | ||
this.commandQueue.addCommandHandler(actionName, async (command) => { | ||
const profileOp = await this.profileLog.begin({ operation: 'queueCommand', commandType: actionName, | ||
commandId: command.id, client: command.client }) | ||
const reportFinished = action.definition.waitForEvents ? 'command_'+command.id : undefined | ||
const flags = { commandId: command.id, reportFinished } | ||
const emit = this.app.splitEvents | ||
? new SplitEmitQueue(this, flags) | ||
: new SingleEmitQueue(this, flags) | ||
const routine = () => this.profileLog.profile({ operation: 'runCommand', commandType: actionName, | ||
commandId: command.id, client: command.client }, async () => { | ||
const result = await this.app.assertTime('command '+action.definition.name, | ||
action.definition.timeout || 10000, | ||
() => action.runCommand(command, (...args) => emit.emit(...args)), command) | ||
const events = await emit.commit() | ||
if(action.definition.waitForEvents) | ||
await this.app.waitForEvents(reportFinished, events, action.definition.waitForEvents) | ||
return result | ||
}) | ||
routine.key = keyFunction(command) | ||
const promise = this.keyBasedCommandQueues.queue(routine) | ||
await this.profileLog.endPromise(profileOp, promise) | ||
return promise | ||
}) | ||
} else { | ||
this.commandQueue.addCommandHandler(actionName, | ||
(command) => this.profileLog.profile({ operation: 'runCommand', commandType: actionName, | ||
commandId: command.id, client: command.client }, async () => { | ||
const reportFinished = action.definition.waitForEvents ? 'command_'+command.id : undefined | ||
const flags = { commandId: command.id, reportFinished } | ||
const emit = this.app.splitEvents | ||
? new SplitEmitQueue(this, flags) | ||
: new SingleEmitQueue(this, flags) | ||
const result = await this.app.assertTime('command '+action.definition.name, | ||
action.definition.timeout || 10000, | ||
() => action.runCommand(command, (...args) => emit.emit(...args)), command) | ||
const events = await emit.commit() | ||
if(action.definition.waitForEvents) | ||
await this.app.waitForEvents(reportFinished, events, action.definition.waitForEvents) | ||
return result | ||
}) | ||
) | ||
} | ||
} | ||
await this.dao.request(['database', 'createTable'], this.databaseName, 'triggerRoutes').catch(e => 'ok') | ||
this.triggerQueue = new CommandQueue(this.dao, this.databaseName, | ||
this.app.splitTriggers ? `${this.name}_triggers` : 'triggers', this.name ) | ||
this.keyBasedTriggerQueues = new KeyBasedExecutionQueues(r => r.key) | ||
for (let triggerName in this.triggers) { | ||
const trigger = this.triggers[triggerName] | ||
await this.dao.request(['database', 'put'], this.databaseName, 'triggerRoutes', | ||
{ id: triggerName + '=>' + this.name, trigger: triggerName, service: this.name }) | ||
if(trigger.definition.queuedBy) { | ||
const queuedBy = trigger.definition.queuedBy | ||
const keyFunction = typeof queuedBy == 'function' ? queuedBy : ( | ||
Array.isArray(queuedBy) ? (c) => JSON.stringify(queuedBy.map(k=>c[k])) : | ||
(c) => JSON.stringify(c[queuedBy]) ) | ||
this.triggerQueue.addCommandHandler(triggerName, async (trig) => { | ||
const profileOp = await this.profileLog.begin({ operation: 'queueTrigger', triggerType: triggerName, | ||
triggerId: trig.id, by: trig.by }) | ||
console.log("QUEUED TRIGGER STARTED", trig) | ||
const reportFinished = trigger.definition.waitForEvents ? 'trigger_'+trig.id : undefined | ||
const flags = { triggerId: trig.id, reportFinished } | ||
const emit = this.app.splitEvents | ||
? new SplitEmitQueue(this, flags) | ||
: new SingleEmitQueue(this, flags) | ||
const routine = () => this.profileLog.profile({ operation: 'runTrigger', triggerType: triggerName, | ||
commandId: trig.id, by: trig.by }, async () => { | ||
let result | ||
try { | ||
console.log("TRIGGERED!!", trig) | ||
result = await this.app.assertTime('trigger '+trigger.definition.name, | ||
trigger.definition.timeout || 10000, | ||
() => trigger.execute(trig, (...args) => emit.emit(...args)), trig) | ||
console.log("TRIGGER DONE!", trig) | ||
} catch (e) { | ||
console.error(`TRIGGER ${triggerName} ERROR`, e.stack) | ||
throw e | ||
} | ||
const events = await emit.commit() | ||
if(trigger.definition.waitForEvents) | ||
await this.app.waitForEvents(reportFinished, events, trigger.definition.waitForEvents) | ||
return result | ||
}) | ||
try { | ||
routine.key = keyFunction(trig) | ||
} catch(e) { | ||
console.error("QUEUE KEY FUNCTION ERROR", e) | ||
} | ||
console.log("TRIGGER QUEUE KEY", routine.key) | ||
const promise = this.keyBasedTriggerQueues.queue(routine) | ||
await this.profileLog.endPromise(profileOp, promise) | ||
return promise | ||
}) | ||
} else { | ||
this.triggerQueue.addCommandHandler(triggerName, | ||
(trig) => this.profileLog.profile({ operation: 'runTrigger', triggerType: triggerName, | ||
commandId: trig.id, by: trig.by }, async () => { | ||
console.log("NOT QUEUED TRIGGER STARTED", trig) | ||
const reportFinished = trigger.definition.waitForEvents ? 'trigger_'+trig.id : undefined | ||
const flags = { triggerId: trig.id, reportFinished } | ||
const emit = this.app.splitEvents | ||
? new SplitEmitQueue(this, flags) | ||
: new SingleEmitQueue(this, flags) | ||
let result | ||
try { | ||
result = await this.app.assertTime('trigger '+trigger.definition.name, | ||
trigger.definition.timeout || 10000, | ||
() => trigger.execute(trig, (...args) => emit.emit(...args)), trig) | ||
console.log("TRIGGER DONE!", trig) | ||
} catch (e) { | ||
console.error(`TRIGGER ${triggerName} ERROR`, e.stack) | ||
throw e | ||
} | ||
const events = await emit.commit() | ||
if(trigger.definition.waitForEvents) | ||
await this.app.waitForEvents(reportFinished, events, trigger.definition.waitForEvents) | ||
return result | ||
}) | ||
) | ||
} | ||
} | ||
this.commandQueue.start() | ||
this.triggerQueue.start() | ||
} | ||
async startSearchIndexer() { | ||
let anyIndex = false | ||
for(const name in this.models) if(this.models[name].definition.searchIndex) anyIndex = true | ||
for(const name in this.indexes) if(this.indexes[name].definition.searchIndex) anyIndex = true | ||
if(!anyIndex) { | ||
console.log("not starting search indexer - nothing to index!") | ||
return | ||
} | ||
console.log("starting search indexer!") | ||
await this.dao.request(['database', 'createTable'], this.databaseName, 'searchIndexes').catch(e => 'ok') | ||
this.searchIndexers = [] | ||
const elasticsearch = this.app.connectToSearch() | ||
for(const modelName in this.models) { | ||
const model = this.models[modelName] | ||
const indexName = model.definition.searchIndex | ||
if(!indexName) continue | ||
const indexer = new SearchIndexer( | ||
this.dao, this.databaseName, 'Table', model.tableName, elasticsearch, indexName, model.definition | ||
) | ||
this.searchIndexers.push(indexer) | ||
} | ||
for(const indexName in this.indexes) { | ||
const index = this.indexes[indexName] | ||
const indexName = index.definition.searchIndex | ||
if(!indexName) continue | ||
const indexer = new SearchIndexer( | ||
this.dao, this.databaseName, 'Index', model.tableName, elasticsearch, indexName, index.definition | ||
) | ||
this.searchIndexers.push(indexer) | ||
} | ||
const promises = [] | ||
for(const searchIndexer of this.searchIndexers) { | ||
promises.push(this.profileLog.profile({ | ||
operation: "startIndexer", serviceName: this.name, indexName: searchIndexer.indexName | ||
}, () => searchIndexer.start())) | ||
} | ||
await Promise.all(promises) | ||
console.log("search indexer started!") | ||
} | ||
} | ||
class SplitEmitQueue { | ||
constructor(service, flags = {}) { | ||
this.service = service | ||
this.flags = flags | ||
this.emittedEvents = new Map() | ||
this.commited = false | ||
} | ||
emit(service, event) { | ||
if(!event) { | ||
event = service | ||
if(Array.isArray(event)) { | ||
let hasServices = false | ||
for(let ev of event) { | ||
if(ev.service) hasServices = true | ||
} | ||
if(hasServices) { | ||
for(let ev of event) { | ||
this.emit(ev) | ||
} | ||
return | ||
} | ||
} else { | ||
service = event.service || this.service.name | ||
} | ||
} | ||
let events | ||
if(!this.commited) { | ||
events = this.emittedEvents.get(service) | ||
if(!events) { | ||
events = [] | ||
this.emittedEvents.set(service, events) | ||
} | ||
} else { | ||
events = [] | ||
} | ||
if(Array.isArray(event)) { | ||
for(let ev of event) ev.service = service | ||
events.push(...event) | ||
} else { | ||
event.service = service | ||
events.push(event) | ||
} | ||
if(this.commited) { | ||
if(events.length == 0) return | ||
this.service.dao.request(['database', 'putLog'], this.service.databaseName, | ||
this.service.name+'_events', { type: 'bucket', events, ...this.flags }) | ||
} | ||
} | ||
async commit() { | ||
let promises = [] | ||
this.commited = true | ||
if(this.emittedEvents.length == 0) return [] | ||
let allEvents = [] | ||
for(const [service, events] of this.emittedEvents.keys()) { | ||
promises.push(this.service.dao.request(['database', 'putLog'], this.service.databaseName, | ||
this.service.name+'_events', { type: 'bucket', events, ...this.flags })) | ||
allEvents.push(...events) | ||
} | ||
await Promise.all(promises) | ||
return allEvents | ||
} | ||
} | ||
class SingleEmitQueue { | ||
constructor(service, flags = {}) { | ||
this.service = service | ||
this.flags = flags | ||
this.emittedEvents = [] | ||
this.commited = false | ||
} | ||
emit(service, event) { | ||
if(!event) { | ||
event = service | ||
service = this.service.name | ||
} | ||
let events | ||
if(!this.commited) { | ||
events = this.emittedEvents | ||
} else { | ||
events = [] | ||
} | ||
if(Array.isArray(event)) { | ||
for(let ev of event) if(!ev.service) ev.service = service | ||
events.push(...event) | ||
} else { | ||
if(!event.service) event.service = service | ||
events.push(event) | ||
} | ||
if(this.commited) { | ||
if(events.length == 0) return | ||
this.service.dao.request(['database', 'putLog'], this.service.databaseName, | ||
'events', { type: 'bucket', events, ...this.flags }) | ||
} | ||
} | ||
async commit() { | ||
this.commited = true | ||
if(this.emittedEvents.length == 0) return [] | ||
await this.service.dao.request(['database', 'putLog'], this.service.databaseName, | ||
'events', { type: 'bucket', events: this.emittedEvents, ...this.flags }) | ||
return this.emittedEvents | ||
} | ||
} | ||
module.exports = Service |
{ | ||
"name": "@live-change/framework", | ||
"version": "0.4.42", | ||
"version": "0.4.43", | ||
"description": "Live Change Framework - ultimate solution for real time mobile/web apps", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
62
157627
4280