@live-change/framework
Advanced tools
Comparing version 0.3.23 to 0.3.24
@@ -267,2 +267,53 @@ const ReactiveDao = require("@live-change/dao") | ||
async waitForEvents(reportId, events, timeout) { | ||
return new Promise((resolve, reject) => { | ||
let done = false | ||
let finishedEvents = [] | ||
const handleError = (message) => { | ||
console.error(`waitForEvents error: `, message) | ||
const eventsNotDone = events.filter(event => finished.find(e => e.id == event.id)) | ||
if(eventsNotDone.length > 0) { | ||
console.error(" pending events:") | ||
for(const event of eventsNotDone) { | ||
console.error(` ${event.id} - type: ${event.type}`) | ||
} | ||
} | ||
reject(message) | ||
done = true | ||
} | ||
if(Number.isFinite(timeout)) { | ||
setTimeout(() => { | ||
if(done) return | ||
handleError('timeout') | ||
}, timeout) | ||
} | ||
const observable = this.dao.observable(['database', 'tableObject', 'eventReports', reportId]) | ||
const reportsObserver = (signal, data) => { | ||
if(signal != 'set') { | ||
handleError(`unknown signal ${signal} with data: ${data}`) | ||
} | ||
if(data.finished) { | ||
finishedEvents = data.finished | ||
if(finishedEvents.length >= events.length) { | ||
const eventsNotDone = events.filter(event => finished.find(e => e.id == event.id)) | ||
if(eventsNotDone.length != 0) { | ||
const eventsDone = events.filter(event => !finished.find(e => e.id == event.id)) | ||
console.error("waitForEvents - finished events does not match!") | ||
console.error(" finished events:") | ||
for(const event of eventsNotDone) { | ||
console.error(` ${event.id} - type: ${event.type}`) | ||
} | ||
console.error(" pending events:") | ||
for(const event of eventsNotDone) { | ||
console.error(` ${event.id} - type: ${event.type}`) | ||
} | ||
} else { | ||
resolve('finished') | ||
} | ||
} | ||
} | ||
} | ||
}) | ||
} | ||
async close() { | ||
@@ -269,0 +320,0 @@ this.dao.dispose() |
@@ -111,3 +111,12 @@ const Model = require("./Model.js") | ||
const event = this.events[eventName] | ||
this.eventSourcing.addEventHandler(eventName, (ev, bucket) => event.execute(ev, bucket)) | ||
this.eventSourcing.addEventHandler(eventName, async (ev, bucket) => { | ||
await event.execute(ev, bucket) | ||
}) | ||
this.eventSourcing.onBucketEnd = async (bucket, handledEvents) => { | ||
if(bucket.reportFinished && handledEvents.length > 0) { | ||
await this.dao.request(['database', 'update'], this.databaseName, 'eventReports', bucket.reportFinished,[ | ||
{ op: "mergeSets", property: 'finished', values: handledEvents.map(ev => ({ id: ev.id, type: ev.type })) } | ||
]) | ||
} | ||
} | ||
} | ||
@@ -131,8 +140,11 @@ | ||
let emittedEvents = new Map() | ||
const reportFinished = command.waitForEvents ? 'trigger_'+trig.id : undefined | ||
const flags = { commandId: command.id, reportFinished } | ||
const emit = this.app.splitEvents | ||
? new SplitEmitQueue(this, { commandId: command.id }) | ||
: new SingleEmitQueue(this, { commandId: command.id }) | ||
? new SplitEmitQueue(this, flags) | ||
: new SingleEmitQueue(this, flags) | ||
const routine = async () => { | ||
const result = await action.runCommand(command, (...args) => emit.emit(...args)) | ||
await emit.commit() | ||
const event = await emit.commit() | ||
if(command.waitForEvents) this.app.waitForEvents(reportFinished, events, action.waitForEvents) | ||
return result | ||
@@ -146,7 +158,10 @@ } | ||
let emittedEvents = new Map() | ||
const reportFinished = command.waitForEvents ? 'trigger_'+trig.id : undefined | ||
const flags = { commandId: command.id, reportFinished } | ||
const emit = this.app.splitEvents | ||
? new SplitEmitQueue(this, { commandId: command.id }) | ||
: new SingleEmitQueue(this, { commandId: command.id }) | ||
? new SplitEmitQueue(this, flags) | ||
: new SingleEmitQueue(this, flags) | ||
const result = await action.runCommand(command, (...args) => emit.emit(...args)) | ||
await emit.commit() | ||
const events = await emit.commit() | ||
if(command.waitForEvents) this.app.waitForEvents(reportFinished, events, action.waitForEvents) | ||
return result | ||
@@ -172,8 +187,11 @@ }) | ||
this.triggerQueue.addCommandHandler(triggerName, async (trig) => { | ||
const reportFinished = trigger.waitForEvents ? 'trigger_'+trig.id : undefined | ||
const flags = { triggerId: trig.id, reportFinished } | ||
const emit = this.app.splitEvents | ||
? new SplitEmitQueue(this, { triggerId: trigger.id }) | ||
: new SingleEmitQueue(this, { triggerId: trigger.id }) | ||
? new SplitEmitQueue(this, flags) | ||
: new SingleEmitQueue(this, flags) | ||
const routine = async () => { | ||
let result | ||
try { | ||
console.error("TRIGGERED!!!!!!") | ||
result = await trigger.execute(trig, (...args) => emit.emit(...args)) | ||
@@ -184,3 +202,4 @@ } catch (e) { | ||
} | ||
await emit.commit() | ||
const events = await emit.commit() | ||
if(trigger.waitForEvents) this.app.waitForEvents(reportFinished, events, trigger.waitForEvents) | ||
return result | ||
@@ -193,5 +212,7 @@ } | ||
this.triggerQueue.addCommandHandler(triggerName, async (trig) => { | ||
const reportFinished = trigger.waitForEvents ? 'trigger_'+trig.id : undefined | ||
const flags = { triggerId: trig.id, reportFinished } | ||
const emit = this.app.splitEvents | ||
? new SplitEmitQueue(this, { triggerId: trigger.id }) | ||
: new SingleEmitQueue(this, { triggerId: trigger.id }) | ||
? new SplitEmitQueue(this, flags) | ||
: new SingleEmitQueue(this, flags) | ||
let result | ||
@@ -204,3 +225,4 @@ try { | ||
} | ||
await emit.commit() | ||
const events = await emit.commit() | ||
if(trigger.waitForEvents) this.app.waitForEvents(reportFinished, events, trigger.waitForEvents) | ||
return result | ||
@@ -303,7 +325,10 @@ }) | ||
this.commited = true | ||
let allEvents = [] | ||
for(const [service, events] of this.emittedEvents.keys()) { | ||
promises.push(this.service.dao.request(['database', 'putLog'], this.service.databaseName, | ||
this.service.name+'_events', { type: 'bucket', events, ...this.flags })) | ||
allEvents.push(...events) | ||
} | ||
return Promise.all(promises) | ||
await Promise.all(promises) | ||
return events | ||
} | ||
@@ -346,4 +371,5 @@ } | ||
this.commited = true | ||
return this.service.dao.request(['database', 'putLog'], this.service.databaseName, | ||
await this.service.dao.request(['database', 'putLog'], this.service.databaseName, | ||
'events', { type: 'bucket', events: this.emittedEvents, ...this.flags }) | ||
return this.emittedEvents | ||
} | ||
@@ -350,0 +376,0 @@ } |
@@ -10,2 +10,3 @@ const utils = require("../utils.js") | ||
dao.request(['database', 'createTable'], database, 'triggerRoutes').catch(e => 'ok') | ||
dao.request(['database', 'createTable'], database, 'eventReports').catch(e => 'ok') | ||
if(app.splitEvents) { | ||
@@ -12,0 +13,0 @@ dao.request(['database', 'createLog'], database, service.name+'_events').catch(e => 'ok') |
{ | ||
"name": "@live-change/framework", | ||
"version": "0.3.23", | ||
"version": "0.3.24", | ||
"description": "Live Change Framework - ultimate solution for real time mobile/web apps", | ||
@@ -46,4 +46,4 @@ "main": "index.js", | ||
"@live-change/dao": "^0.2.16", | ||
"@live-change/event-sourcing": "^0.1.6" | ||
"@live-change/event-sourcing": "^0.1.7" | ||
} | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
130753
3590