chrono-forge
Advanced tools
Comparing version 0.6.1 to 0.6.3
@@ -11,4 +11,5 @@ import EventEmitter from 'eventemitter3'; | ||
export declare class SchemaManager extends EventEmitter { | ||
protected workflowId: string; | ||
private static instance; | ||
static getInstance(): SchemaManager; | ||
static getInstance(workflowId?: string): SchemaManager; | ||
private schemas; | ||
@@ -18,2 +19,3 @@ private state; | ||
private queue; | ||
private changeOrigins; | ||
get pendingChanges(): EntityAction[]; | ||
@@ -33,3 +35,3 @@ private history; | ||
getSchema(schemaName: string): schema.Entity; | ||
dispatch(action: EntityAction, sync?: boolean): Promise<void>; | ||
dispatch(action: EntityAction, sync?: boolean, changeOrigin?: string | null): Promise<void>; | ||
processChanges(): Promise<void>; | ||
@@ -36,0 +38,0 @@ private emitStateChangeEvents; |
"use strict"; | ||
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
var desc = Object.getOwnPropertyDescriptor(m, k); | ||
if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) { | ||
desc = { enumerable: true, get: function() { return m[k]; } }; | ||
} | ||
Object.defineProperty(o, k2, desc); | ||
}) : (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
o[k2] = m[k]; | ||
})); | ||
var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) { | ||
Object.defineProperty(o, "default", { enumerable: true, value: v }); | ||
}) : function(o, v) { | ||
o["default"] = v; | ||
}); | ||
var __importStar = (this && this.__importStar) || function (mod) { | ||
if (mod && mod.__esModule) return mod; | ||
var result = {}; | ||
if (mod != null) for (var k in mod) if (k !== "default" && Object.prototype.hasOwnProperty.call(mod, k)) __createBinding(result, mod, k); | ||
__setModuleDefault(result, mod); | ||
return result; | ||
}; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
@@ -13,7 +36,9 @@ return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
const limitRecursion_1 = require("./utils/limitRecursion"); | ||
const workflow = __importStar(require("@temporalio/workflow")); | ||
class SchemaManager extends eventemitter3_1.default { | ||
workflowId; | ||
static instance; | ||
static getInstance() { | ||
static getInstance(workflowId) { | ||
if (!this.instance) { | ||
this.instance = new SchemaManager(); | ||
this.instance = new SchemaManager(workflowId); | ||
} | ||
@@ -26,2 +51,3 @@ return this.instance; | ||
queue = []; | ||
changeOrigins = new Set(); | ||
get pendingChanges() { | ||
@@ -33,4 +59,5 @@ return this.queue; | ||
maxHistory = 50; | ||
constructor() { | ||
constructor(workflowId = '__ACTIVITY_CONTEXT__') { | ||
super(); | ||
this.workflowId = workflowId; | ||
} | ||
@@ -57,4 +84,7 @@ get maxHistorySize() { | ||
} | ||
async dispatch(action, sync = true) { | ||
async dispatch(action, sync = true, changeOrigin = null) { | ||
this.queue.push(action); | ||
if (changeOrigin) { | ||
this.changeOrigins.add(changeOrigin); | ||
} | ||
if (sync && !this.processing) { | ||
@@ -78,8 +108,9 @@ this.processing = true; | ||
this.state = { ...newState }; | ||
this.emitStateChangeEvents(differences, previousState, newState); | ||
this.emitStateChangeEvents(differences, previousState, newState, Array.from(this.changeOrigins)); | ||
} | ||
} | ||
this.changeOrigins.clear(); | ||
this.processing = false; | ||
} | ||
emitStateChangeEvents(differences, previousState, newState) { | ||
emitStateChangeEvents(differences, previousState, newState, changeOrigins) { | ||
const changedPaths = ['added', 'updated', 'deleted']; | ||
@@ -95,4 +126,8 @@ changedPaths.forEach((changeType) => { | ||
const path = `${entityName}.${entityId}`; | ||
if (changeOrigins.includes(this.workflowId)) { | ||
workflow.log.debug(`[SchemaManager]: Skipping state change event emission for changes originating from known workflows.`); | ||
return; | ||
} | ||
if (this.listenerCount(path) > 0) { | ||
this.emit(path, { newState, previousState, changes: entityChanges[entityId] }); | ||
this.emit(path, { newState, previousState, changes: entityChanges[entityId], changeOrigins }); | ||
} | ||
@@ -102,3 +137,3 @@ }); | ||
}); | ||
this.emit('stateChange', { newState, previousState, differences }); | ||
this.emit('stateChange', { newState, previousState, differences, changeOrigins }); | ||
} | ||
@@ -105,0 +140,0 @@ pushToHistory(currentState) { |
@@ -52,3 +52,5 @@ 'use strict'; | ||
api_1.default.trace.setGlobalTracerProvider(provider); | ||
api_1.default.propagation.setGlobalPropagator(new core_1.W3CTraceContextPropagator()); | ||
api_1.default.propagation.setGlobalPropagator(new core_1.CompositePropagator({ | ||
propagators: [new core_1.W3CTraceContextPropagator(), new core_1.W3CBaggagePropagator()] | ||
})); | ||
(0, instrumentation_1.registerInstrumentations)({ | ||
@@ -78,2 +80,6 @@ instrumentations: [ | ||
}); | ||
['SIGINT', 'SIGTERM'].forEach((signal) => { | ||
process.on(signal, () => provider.shutdown().catch(console.error)); | ||
process.on(signal, () => exporter.shutdown().catch(console.error)); | ||
}); | ||
exports.tracers.set(serviceName, { | ||
@@ -80,0 +86,0 @@ tracer: provider.getTracer(serviceName), |
@@ -57,2 +57,3 @@ import * as workflow from '@temporalio/workflow'; | ||
strategy?: '$set' | '$merge'; | ||
changeOrigin?: string; | ||
}; | ||
@@ -84,3 +85,3 @@ export declare abstract class StatefulWorkflow extends Workflow { | ||
protected executeWorkflow(): Promise<any>; | ||
update({ data, updates, entityName, strategy }: PendingChange & { | ||
update({ data, updates, entityName, strategy, changeOrigin }: PendingChange & { | ||
data?: Record<string, any>; | ||
@@ -103,3 +104,2 @@ }): void; | ||
protected sendSubscriptionUpdate(workflowId: string, signalName: string, updatedData: any): Promise<void>; | ||
private isPathMatchingSelector; | ||
private shouldPropagateUpdate; | ||
@@ -106,0 +106,0 @@ protected processChildState(newState: EntitiesState, differences: DetailedDiff, previousState: EntitiesState): Promise<void>; |
@@ -51,3 +51,3 @@ "use strict"; | ||
class StatefulWorkflow extends Workflow_1.Workflow { | ||
schemaManager = SchemaManager_1.SchemaManager.getInstance(); | ||
schemaManager; | ||
schema; | ||
@@ -93,2 +93,3 @@ async condition() { | ||
super(args, options); | ||
this.schemaManager = SchemaManager_1.SchemaManager.getInstance(workflow.workflowInfo().workflowId); | ||
this.params = args[0]; | ||
@@ -99,3 +100,2 @@ this.options = options; | ||
this.id = this.params?.id; | ||
this.status = this.params?.status ?? 'init'; | ||
if (this.params?.ancestorWorkflowIds) { | ||
@@ -119,2 +119,3 @@ this.ancestorWorkflowIds = this.params.ancestorWorkflowIds; | ||
} | ||
this.status = this.params?.status ?? 'running'; | ||
} | ||
@@ -172,3 +173,3 @@ async executeWorkflow() { | ||
} | ||
update({ data, updates, entityName, strategy = '$merge' }) { | ||
update({ data, updates, entityName, strategy = '$merge', changeOrigin }) { | ||
this.log.debug(`[StatefulWorkflow]:${this.constructor.name}:update`); | ||
@@ -180,3 +181,3 @@ if (!(0, lodash_isempty_1.default)(data)) { | ||
this.pendingUpdate = true; | ||
this.schemaManager.dispatch((0, entities_1.updateNormalizedEntities)(updates, strategy), false); | ||
this.schemaManager.dispatch((0, entities_1.updateNormalizedEntities)(updates, strategy), false, changeOrigin); | ||
} | ||
@@ -198,3 +199,4 @@ } | ||
if (this.ancestorWorkflowIds.includes(workflowId)) { | ||
this.log.warn(`[${this.constructor.name}] Circular subscription detected for workflowId: ${workflowId}. Skipping subscription.`); | ||
this.log.warn(`[${this.constructor.name}]:${this.entityName}:${this.id} Circular subscription detected for workflowId: ${workflowId}. Skipping subscription.`); | ||
this.log.warn(this.ancestorWorkflowIds.join(',\n ')); | ||
return; | ||
@@ -258,3 +260,2 @@ } | ||
this.pendingUpdate = false; | ||
this.emit('ready'); | ||
} | ||
@@ -294,6 +295,2 @@ } | ||
} | ||
isPathMatchingSelector(path, selector) { | ||
const regex = new RegExp(`^${selector.replace(/\*/g, '.*')}$`); | ||
return regex.test(path); | ||
} | ||
shouldPropagateUpdate(flattenedState, differences, selector, condition, sourceWorkflowId, ancestorWorkflowIds = []) { | ||
@@ -413,6 +410,7 @@ this.log.debug(`[StatefulWorkflow]: Checking if we should propagate update for selector: ${selector}`); | ||
const entitySchema = SchemaManager_1.SchemaManager.getInstance().getSchema(entityName); | ||
const rawData = (0, limitRecursion_1.limitRecursion)((0, normalizr_1.denormalize)(state, entitySchema, newState), entitySchema); | ||
const data = typeof config.processData === 'function' ? config.processData(rawData, this) : rawData; | ||
const { [idAttribute]: id, ...rest } = state; | ||
const compositeId = Array.isArray(idAttribute) ? (0, getCompositeKey_1.getCompositeKey)(state, idAttribute) : id; | ||
const compositeId = Array.isArray(idAttribute) ? (0, getCompositeKey_1.getCompositeKey)(data, idAttribute) : id; | ||
const workflowId = `${entityName}-${compositeId}`; | ||
const rawData = (0, limitRecursion_1.limitRecursion)((0, normalizr_1.denormalize)(state, entitySchema, newState), entitySchema); | ||
if (this.ancestorWorkflowIds.includes(workflowId)) { | ||
@@ -429,3 +427,2 @@ this.log.warn(`[${this.constructor.name}] Circular dependency detected for workflowId: ${workflowId}. Skipping child workflow start.`); | ||
} | ||
const data = typeof config.processData === 'function' ? config.processData(rawData, this) : rawData; | ||
const startPayload = { | ||
@@ -480,6 +477,7 @@ workflowId, | ||
const entitySchema = SchemaManager_1.SchemaManager.getInstance().getSchema(entityName); | ||
const rawData = (0, limitRecursion_1.limitRecursion)((0, normalizr_1.denormalize)(state, entitySchema, newState), entitySchema); | ||
const data = typeof config.processData === 'function' ? config.processData(rawData, this) : rawData; | ||
const { [idAttribute]: id } = state; | ||
const compositeId = Array.isArray(config.idAttribute) ? (0, getCompositeKey_1.getCompositeKey)(state, config.idAttribute) : state[config.idAttribute]; | ||
const compositeId = Array.isArray(config.idAttribute) ? (0, getCompositeKey_1.getCompositeKey)(data, config.idAttribute) : state[config.idAttribute]; | ||
const workflowId = `${entityName}-${compositeId}`; | ||
const rawData = (0, limitRecursion_1.limitRecursion)((0, normalizr_1.denormalize)(state, entitySchema, newState), entitySchema); | ||
if (this.ancestorWorkflowIds.includes(workflowId)) { | ||
@@ -496,3 +494,2 @@ this.log.warn(`[${this.constructor.name}] Circular update detected for workflowId: ${workflowId}. Skipping child workflow update.`); | ||
} | ||
const data = typeof config.processData === 'function' ? config.processData(rawData, this) : rawData; | ||
await handle.signal('update', { data, entityName: config.entityName, strategy: '$merge' }); | ||
@@ -499,0 +496,0 @@ this.emit(`childUpdated:${config.workflowType}`, handle.workflowId, data); |
{ | ||
"name": "chrono-forge", | ||
"version": "0.6.1", | ||
"version": "0.6.3", | ||
"description": "A comprehensive framework for building resilient Temporal workflows, advanced state management, and real-time streaming activities in TypeScript. Designed for a seamless developer experience with powerful abstractions, dynamic orchestration, and full control over distributed systems.", | ||
@@ -151,2 +151,3 @@ "main": "dist/index.js", | ||
"testEnvironment": "node", | ||
"globalTeardown": "./src/tests/teardown.ts", | ||
"setupFiles": [ | ||
@@ -153,0 +154,0 @@ "./src/tests/setup.ts" |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
259752
2998