Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@live-change/framework

Package Overview
Dependencies
Maintainers
1
Versions
333
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@live-change/framework - npm Package Compare versions

Comparing version 0.4.42 to 0.4.43

lib/processes/commandExecutor.js

38

lib/App.js

@@ -31,2 +31,6 @@ const crypto = require("crypto")

const commandExecutor = require("./processes/commandExecutor.js")
const triggerExecutor = require("./processes/triggerExecutor.js")
const eventListener = require('./processes/eventListener.js')
const utils = require('./utils.js')

@@ -44,19 +48,24 @@

this.defaultProcessors = [
crudGenerator,
draftGenerator,
reverseRelationProcessor,
indexListProcessor,
daoPathView,
fetchView,
accessControl,
autoValidation,
indexCode
crudGenerator,
draftGenerator,
reverseRelationProcessor,
indexListProcessor,
daoPathView,
fetchView,
accessControl,
autoValidation,
indexCode
]
this.defaultUpdaters = [
databaseUpdater
databaseUpdater
]
this.defaultClientSideFilters = [
accessControlFilter,
clientSideFilter
accessControlFilter,
clientSideFilter
]
this.defaultProcesses = [
commandExecutor,
triggerExecutor,
eventListener
]
const dbDao = new ReactiveDao(process.cwd()+' '+process.argv.join(' '), {

@@ -142,3 +151,4 @@ remoteUrl: env.DB_URL || "http://localhost:9417/api/ws",

async startService( serviceDefinition, config ) {
async startService( serviceDefinition, config = {}) {
if(!config.processes) config.processes = this.defaultProcesses
console.log("Starting service", serviceDefinition.name, "!")

@@ -151,3 +161,3 @@ const profileOp = await this.profileLog.begin({

let service = new Service(serviceDefinition, this)
await service.start(config || {})
await service.start(config)
console.log("service started", serviceDefinition.name, "!")

@@ -154,0 +164,0 @@ await this.profileLog.end(profileOp)

@@ -8,9 +8,3 @@ const Model = require("./Model.js")

const TriggerHandler = require("./TriggerHandler.js")
const SearchIndexer = require("./SearchIndexer.js")
const ReactiveDao = require("@live-change/dao")
const EventSourcing = require('../utils/EventSourcing.js')
const CommandQueue = require('../utils/CommandQueue.js')
const KeyBasedExecutionQueues = require('../utils/KeyBasedExecutionQueues.js')
class Service {

@@ -83,7 +77,3 @@

let promises = []
if(config.runCommands) promises.push(this.startCommandExecutor())
if(config.handleEvents) promises.push(this.startEventListener())
if(config.indexSearch) promises.push(this.startSearchIndexer())
let promises = config.processes.map(proc => proc(this, config))
await Promise.all(promises)

@@ -104,333 +94,5 @@

async startEventListener() {
if(this.app.splitEvents) {
this.eventSourcing = new EventSourcing(this.dao, this.databaseName,
'events_'+this.name, this.name,
{ filter: (event) => event.service == this.name })
} else {
this.eventSourcing = new EventSourcing(this.dao, this.databaseName,
'events', this.name,
{ filter: (event) => event.service == this.name })
}
for (let eventName in this.events) {
const event = this.events[eventName]
this.eventSourcing.addEventHandler(eventName, async (ev, bucket) => {
return await this.profileLog.profile({ operation: "handleEvent", eventName, id: ev.id,
bucketId: bucket.id, triggerId: bucket.triggerId, commandId: bucket.commandId },
() => {
console.log("EXECUTING EVENT", ev)
return event.execute(ev, bucket)
}
)
})
this.eventSourcing.onBucketEnd = async (bucket, handledEvents) => {
if(bucket.reportFinished && handledEvents.length > 0) {
await this.dao.request(['database', 'update'], this.databaseName, 'eventReports', bucket.reportFinished,[
{ op: "mergeSets", property: 'finished', values: handledEvents.map(ev => ({ id: ev.id, type: ev.type })) }
])
}
}
}
this.eventSourcing.start()
}
async startCommandExecutor() {
this.commandQueue = new CommandQueue(this.dao, this.databaseName,
this.app.splitCommands ? `${this.name}_commands` : 'commands', this.name)
this.keyBasedCommandQueues = new KeyBasedExecutionQueues(r => r.key)
for (let actionName in this.actions) {
const action = this.actions[actionName]
if(action.definition.queuedBy) {
const queuedBy = action.definition.queuedBy
const keyFunction = typeof queuedBy == 'function' ? queuedBy : (
Array.isArray(queuedBy) ? (c) => JSON.stringify(queuedBy.map(k=>c[k])) :
(c) => JSON.stringify(c[queuedBy]) )
this.commandQueue.addCommandHandler(actionName, async (command) => {
const profileOp = await this.profileLog.begin({ operation: 'queueCommand', commandType: actionName,
commandId: command.id, client: command.client })
const reportFinished = action.definition.waitForEvents ? 'command_'+command.id : undefined
const flags = { commandId: command.id, reportFinished }
const emit = this.app.splitEvents
? new SplitEmitQueue(this, flags)
: new SingleEmitQueue(this, flags)
const routine = () => this.profileLog.profile({ operation: 'runCommand', commandType: actionName,
commandId: command.id, client: command.client }, async () => {
const result = await this.app.assertTime('command '+action.definition.name,
action.definition.timeout || 10000,
() => action.runCommand(command, (...args) => emit.emit(...args)), command)
const events = await emit.commit()
if(action.definition.waitForEvents)
await this.app.waitForEvents(reportFinished, events, action.definition.waitForEvents)
return result
})
routine.key = keyFunction(command)
const promise = this.keyBasedCommandQueues.queue(routine)
await this.profileLog.endPromise(profileOp, promise)
return promise
})
} else {
this.commandQueue.addCommandHandler(actionName,
(command) => this.profileLog.profile({ operation: 'runCommand', commandType: actionName,
commandId: command.id, client: command.client }, async () => {
const reportFinished = action.definition.waitForEvents ? 'command_'+command.id : undefined
const flags = { commandId: command.id, reportFinished }
const emit = this.app.splitEvents
? new SplitEmitQueue(this, flags)
: new SingleEmitQueue(this, flags)
const result = await this.app.assertTime('command '+action.definition.name,
action.definition.timeout || 10000,
() => action.runCommand(command, (...args) => emit.emit(...args)), command)
const events = await emit.commit()
if(action.definition.waitForEvents)
await this.app.waitForEvents(reportFinished, events, action.definition.waitForEvents)
return result
})
)
}
}
await this.dao.request(['database', 'createTable'], this.databaseName, 'triggerRoutes').catch(e => 'ok')
this.triggerQueue = new CommandQueue(this.dao, this.databaseName,
this.app.splitTriggers ? `${this.name}_triggers` : 'triggers', this.name )
this.keyBasedTriggerQueues = new KeyBasedExecutionQueues(r => r.key)
for (let triggerName in this.triggers) {
const trigger = this.triggers[triggerName]
await this.dao.request(['database', 'put'], this.databaseName, 'triggerRoutes',
{ id: triggerName + '=>' + this.name, trigger: triggerName, service: this.name })
if(trigger.definition.queuedBy) {
const queuedBy = trigger.definition.queuedBy
const keyFunction = typeof queuedBy == 'function' ? queuedBy : (
Array.isArray(queuedBy) ? (c) => JSON.stringify(queuedBy.map(k=>c[k])) :
(c) => JSON.stringify(c[queuedBy]) )
this.triggerQueue.addCommandHandler(triggerName, async (trig) => {
const profileOp = await this.profileLog.begin({ operation: 'queueTrigger', triggerType: triggerName,
triggerId: trig.id, by: trig.by })
console.log("QUEUED TRIGGER STARTED", trig)
const reportFinished = trigger.definition.waitForEvents ? 'trigger_'+trig.id : undefined
const flags = { triggerId: trig.id, reportFinished }
const emit = this.app.splitEvents
? new SplitEmitQueue(this, flags)
: new SingleEmitQueue(this, flags)
const routine = () => this.profileLog.profile({ operation: 'runTrigger', triggerType: triggerName,
commandId: trig.id, by: trig.by }, async () => {
let result
try {
console.log("TRIGGERED!!", trig)
result = await this.app.assertTime('trigger '+trigger.definition.name,
trigger.definition.timeout || 10000,
() => trigger.execute(trig, (...args) => emit.emit(...args)), trig)
console.log("TRIGGER DONE!", trig)
} catch (e) {
console.error(`TRIGGER ${triggerName} ERROR`, e.stack)
throw e
}
const events = await emit.commit()
if(trigger.definition.waitForEvents)
await this.app.waitForEvents(reportFinished, events, trigger.definition.waitForEvents)
return result
})
try {
routine.key = keyFunction(trig)
} catch(e) {
console.error("QUEUE KEY FUNCTION ERROR", e)
}
console.log("TRIGGER QUEUE KEY", routine.key)
const promise = this.keyBasedTriggerQueues.queue(routine)
await this.profileLog.endPromise(profileOp, promise)
return promise
})
} else {
this.triggerQueue.addCommandHandler(triggerName,
(trig) => this.profileLog.profile({ operation: 'runTrigger', triggerType: triggerName,
commandId: trig.id, by: trig.by }, async () => {
console.log("NOT QUEUED TRIGGER STARTED", trig)
const reportFinished = trigger.definition.waitForEvents ? 'trigger_'+trig.id : undefined
const flags = { triggerId: trig.id, reportFinished }
const emit = this.app.splitEvents
? new SplitEmitQueue(this, flags)
: new SingleEmitQueue(this, flags)
let result
try {
result = await this.app.assertTime('trigger '+trigger.definition.name,
trigger.definition.timeout || 10000,
() => trigger.execute(trig, (...args) => emit.emit(...args)), trig)
console.log("TRIGGER DONE!", trig)
} catch (e) {
console.error(`TRIGGER ${triggerName} ERROR`, e.stack)
throw e
}
const events = await emit.commit()
if(trigger.definition.waitForEvents)
await this.app.waitForEvents(reportFinished, events, trigger.definition.waitForEvents)
return result
})
)
}
}
this.commandQueue.start()
this.triggerQueue.start()
}
async startSearchIndexer() {
let anyIndex = false
for(const name in this.models) if(this.models[name].definition.searchIndex) anyIndex = true
for(const name in this.indexes) if(this.indexes[name].definition.searchIndex) anyIndex = true
if(!anyIndex) {
console.log("not starting search indexer - nothing to index!")
return
}
console.log("starting search indexer!")
await this.dao.request(['database', 'createTable'], this.databaseName, 'searchIndexes').catch(e => 'ok')
this.searchIndexers = []
const elasticsearch = this.app.connectToSearch()
for(const modelName in this.models) {
const model = this.models[modelName]
const indexName = model.definition.searchIndex
if(!indexName) continue
const indexer = new SearchIndexer(
this.dao, this.databaseName, 'Table', model.tableName, elasticsearch, indexName, model.definition
)
this.searchIndexers.push(indexer)
}
for(const indexName in this.indexes) {
const index = this.indexes[indexName]
const indexName = index.definition.searchIndex
if(!indexName) continue
const indexer = new SearchIndexer(
this.dao, this.databaseName, 'Index', model.tableName, elasticsearch, indexName, index.definition
)
this.searchIndexers.push(indexer)
}
const promises = []
for(const searchIndexer of this.searchIndexers) {
promises.push(this.profileLog.profile({
operation: "startIndexer", serviceName: this.name, indexName: searchIndexer.indexName
}, () => searchIndexer.start()))
}
await Promise.all(promises)
console.log("search indexer started!")
}
}
class SplitEmitQueue {
constructor(service, flags = {}) {
this.service = service
this.flags = flags
this.emittedEvents = new Map()
this.commited = false
}
emit(service, event) {
if(!event) {
event = service
if(Array.isArray(event)) {
let hasServices = false
for(let ev of event) {
if(ev.service) hasServices = true
}
if(hasServices) {
for(let ev of event) {
this.emit(ev)
}
return
}
} else {
service = event.service || this.service.name
}
}
let events
if(!this.commited) {
events = this.emittedEvents.get(service)
if(!events) {
events = []
this.emittedEvents.set(service, events)
}
} else {
events = []
}
if(Array.isArray(event)) {
for(let ev of event) ev.service = service
events.push(...event)
} else {
event.service = service
events.push(event)
}
if(this.commited) {
if(events.length == 0) return
this.service.dao.request(['database', 'putLog'], this.service.databaseName,
this.service.name+'_events', { type: 'bucket', events, ...this.flags })
}
}
async commit() {
let promises = []
this.commited = true
if(this.emittedEvents.length == 0) return []
let allEvents = []
for(const [service, events] of this.emittedEvents.keys()) {
promises.push(this.service.dao.request(['database', 'putLog'], this.service.databaseName,
this.service.name+'_events', { type: 'bucket', events, ...this.flags }))
allEvents.push(...events)
}
await Promise.all(promises)
return allEvents
}
}
class SingleEmitQueue {
constructor(service, flags = {}) {
this.service = service
this.flags = flags
this.emittedEvents = []
this.commited = false
}
emit(service, event) {
if(!event) {
event = service
service = this.service.name
}
let events
if(!this.commited) {
events = this.emittedEvents
} else {
events = []
}
if(Array.isArray(event)) {
for(let ev of event) if(!ev.service) ev.service = service
events.push(...event)
} else {
if(!event.service) event.service = service
events.push(event)
}
if(this.commited) {
if(events.length == 0) return
this.service.dao.request(['database', 'putLog'], this.service.databaseName,
'events', { type: 'bucket', events, ...this.flags })
}
}
async commit() {
this.commited = true
if(this.emittedEvents.length == 0) return []
await this.service.dao.request(['database', 'putLog'], this.service.databaseName,
'events', { type: 'bucket', events: this.emittedEvents, ...this.flags })
return this.emittedEvents
}
}
module.exports = Service
{
"name": "@live-change/framework",
"version": "0.4.42",
"version": "0.4.43",
"description": "Live Change Framework - ultimate solution for real time mobile/web apps",

@@ -5,0 +5,0 @@ "main": "index.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc