@node-ts/bus-mongodb
Advanced tools
Comparing version 1.0.3 to 1.1.0-beta.0
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const tslib_1 = require("tslib"); | ||
(0, tslib_1.__exportStar)(require("./workflow-state-not-found"), exports); | ||
//# sourceMappingURL=index.js.map | ||
tslib_1.__exportStar(require("./workflow-state-not-found"), exports); |
@@ -13,2 +13,1 @@ "use strict"; | ||
exports.WorkflowStateNotFound = WorkflowStateNotFound; | ||
//# sourceMappingURL=workflow-state-not-found.js.map |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const tslib_1 = require("tslib"); | ||
(0, tslib_1.__exportStar)(require("./mongodb-configuration"), exports); | ||
(0, tslib_1.__exportStar)(require("./mongodb-persistence"), exports); | ||
//# sourceMappingURL=index.js.map | ||
tslib_1.__exportStar(require("./mongodb-configuration"), exports); | ||
tslib_1.__exportStar(require("./mongodb-persistence"), exports); |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
//# sourceMappingURL=mongodb-configuration.js.map |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.MongodbPersistence = void 0; | ||
const tslib_1 = require("tslib"); | ||
const mongodb_1 = require("mongodb"); | ||
@@ -19,156 +20,174 @@ const error_1 = require("./error"); | ||
} | ||
async initialize() { | ||
this.logger.info('Initializing mongodb persistence...'); | ||
await this.client.connect(); | ||
this.database = this.client.db(this.configuration.databaseName); | ||
this.logger.info('Mongodb persistence initialized'); | ||
initialize() { | ||
return tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
this.logger.info('Initializing mongodb persistence...'); | ||
yield this.client.connect(); | ||
this.database = this.client.db(this.configuration.databaseName); | ||
this.logger.info('Mongodb persistence initialized'); | ||
}); | ||
} | ||
async dispose() { | ||
this.logger.info('Disposing Mongodb persistence...'); | ||
await this.client.close(); | ||
this.logger.info('Mongodb persistence disposed'); | ||
dispose() { | ||
return tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
this.logger.info('Disposing Mongodb persistence...'); | ||
yield this.client.close(); | ||
this.logger.info('Mongodb persistence disposed'); | ||
}); | ||
} | ||
async initializeWorkflow(workflowStateConstructor, messageWorkflowMappings) { | ||
await this.client.connect(); | ||
this.database = this.client.db(this.configuration.databaseName); | ||
const workflowStateName = new workflowStateConstructor().$name; | ||
this.logger.info('Initializing workflow', { | ||
workflowState: workflowStateName | ||
initializeWorkflow(workflowStateConstructor, messageWorkflowMappings) { | ||
return tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
yield this.client.connect(); | ||
this.database = this.client.db(this.configuration.databaseName); | ||
const workflowStateName = new workflowStateConstructor().$name; | ||
this.logger.info('Initializing workflow', { | ||
workflowState: workflowStateName | ||
}); | ||
const collectionName = resolveQualifiedTableName(workflowStateName); | ||
yield this.ensureCollectionExists(collectionName); | ||
yield this.ensureIndexesExist(collectionName, messageWorkflowMappings); | ||
}); | ||
const collectionName = resolveQualifiedTableName(workflowStateName); | ||
await this.ensureCollectionExists(collectionName); | ||
await this.ensureIndexesExist(collectionName, messageWorkflowMappings); | ||
} | ||
async getWorkflowState(workflowStateConstructor, messageMap, message, attributes, includeCompleted = false) { | ||
this.logger.debug('Getting workflow state', { | ||
workflowStateName: workflowStateConstructor.name | ||
getWorkflowState(workflowStateConstructor, messageMap, message, attributes, includeCompleted = false) { | ||
return tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
this.logger.debug('Getting workflow state', { | ||
workflowStateName: workflowStateConstructor.name | ||
}); | ||
const workflowStateName = new workflowStateConstructor().$name; | ||
const tableName = resolveQualifiedTableName(workflowStateName); | ||
const matcherValue = messageMap.lookup(message, attributes); | ||
const workflowStateField = `${WORKFLOW_DATA_FIELD_NAME}.${normalizeProperty(messageMap.mapsTo)}`; | ||
const collection = this.database.collection(tableName); | ||
const findObject = { | ||
[workflowStateField]: matcherValue | ||
}; | ||
if (!includeCompleted) { | ||
findObject[`${WORKFLOW_DATA_FIELD_NAME}.${normalizeProperty('$status')}`] = 'running'; | ||
} | ||
const documents = yield collection.find(findObject).toArray(); | ||
this.logger.debug('Querying workflow state', { findObject }); | ||
this.logger.debug('Got workflow state', { | ||
resultsCount: documents === null || documents === void 0 ? void 0 : documents.length | ||
}); | ||
const rows = documents.map(x => x[WORKFLOW_DATA_FIELD_NAME]); | ||
return rows | ||
.map(row => mapKeys(row, (key, _) => denormalizeProperty(key))) | ||
.filter(workflowState => workflowState !== undefined) | ||
.map(workflowState => this.coreDependencies.serializer.toClass(workflowState, workflowStateConstructor)); | ||
}); | ||
const workflowStateName = new workflowStateConstructor().$name; | ||
const tableName = resolveQualifiedTableName(workflowStateName); | ||
const matcherValue = messageMap.lookup(message, attributes); | ||
const workflowStateField = `${WORKFLOW_DATA_FIELD_NAME}.${normalizeProperty(messageMap.mapsTo)}`; | ||
const collection = this.database.collection(tableName); | ||
const findObject = { | ||
[workflowStateField]: matcherValue | ||
}; | ||
if (!includeCompleted) { | ||
findObject[`${WORKFLOW_DATA_FIELD_NAME}.${normalizeProperty('$status')}`] = 'running'; | ||
} | ||
const documents = await collection.find(findObject).toArray(); | ||
this.logger.debug('Querying workflow state', { findObject }); | ||
this.logger.debug('Got workflow state', { | ||
resultsCount: documents === null || documents === void 0 ? void 0 : documents.length | ||
} | ||
saveWorkflowState(workflowState) { | ||
return tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
this.logger.debug('Saving workflow state', { | ||
workflowStateName: workflowState.$name, | ||
id: workflowState.$workflowId | ||
}); | ||
const collectionName = resolveQualifiedTableName(workflowState.$name); | ||
const oldVersion = workflowState.$version; | ||
const newVersion = oldVersion + 1; | ||
const modifiedState = mapKeys(workflowState, (key, _) => normalizeProperty(key)); | ||
const plainWorkflowState = Object.assign(Object.assign({}, this.coreDependencies.serializer.toPlain(modifiedState)), { __version: newVersion }); | ||
yield this.upsertWorkflowState(collectionName, workflowState.$workflowId, plainWorkflowState, oldVersion, newVersion); | ||
}); | ||
const rows = documents.map(x => x[WORKFLOW_DATA_FIELD_NAME]); | ||
return rows | ||
.map(row => mapKeys(row, (key, _) => denormalizeProperty(key))) | ||
.filter(workflowState => workflowState !== undefined) | ||
.map(workflowState => this.coreDependencies.serializer.toClass(workflowState, workflowStateConstructor)); | ||
} | ||
async saveWorkflowState(workflowState) { | ||
this.logger.debug('Saving workflow state', { | ||
workflowStateName: workflowState.$name, | ||
id: workflowState.$workflowId | ||
ensureCollectionExists(collectionName) { | ||
return tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
this.logger.debug('Ensuring mongodb collection for workflow state exists', { | ||
collectionName | ||
}); | ||
const collectionExists = yield this.database | ||
.listCollections({ name: collectionName }, { nameOnly: true }) | ||
.hasNext(); | ||
if (!collectionExists) { | ||
yield this.database.createCollection(collectionName); | ||
} | ||
}); | ||
const collectionName = resolveQualifiedTableName(workflowState.$name); | ||
const oldVersion = workflowState.$version; | ||
const newVersion = oldVersion + 1; | ||
const modifiedState = mapKeys(workflowState, (key, _) => normalizeProperty(key)); | ||
const plainWorkflowState = Object.assign(Object.assign({}, this.coreDependencies.serializer.toPlain(modifiedState)), { __version: newVersion }); | ||
await this.upsertWorkflowState(collectionName, workflowState.$workflowId, plainWorkflowState, oldVersion, newVersion); | ||
} | ||
async ensureCollectionExists(collectionName) { | ||
this.logger.debug('Ensuring mongodb collection for workflow state exists', { | ||
collectionName | ||
ensureIndexesExist(collectionName, messageWorkflowMappings) { | ||
var _a; | ||
return tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
const collection = this.database.collection(collectionName); | ||
const existingIndexes = (_a = (yield collection.listIndexes().toArray())) !== null && _a !== void 0 ? _a : []; | ||
const existingIndexNames = existingIndexes | ||
.map(index => index.name) | ||
.filter(x => x !== '_id_'); | ||
const createPrimaryIndex = this.createPrimaryIndex(collectionName, existingIndexNames); | ||
const allWorkflowFields = messageWorkflowMappings.map(mapping => mapping.mapsTo); | ||
const distinctWorkflowFields = new Set(allWorkflowFields); | ||
const workflowFields = [...distinctWorkflowFields]; | ||
const createSecondaryIndexes = workflowFields.map((workflowField) => tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
const indexName = resolveIndexName(collectionName, workflowField); | ||
const existingIndexLocation = existingIndexNames.indexOf(indexName); | ||
if (existingIndexLocation !== -1) { | ||
this.logger.debug('Index already exists', { indexName }); | ||
existingIndexNames.splice(existingIndexLocation, 1); | ||
return; | ||
} | ||
const workflowStateField = `${WORKFLOW_DATA_FIELD_NAME}.'${workflowField}'`; | ||
this.logger.debug('Ensuring secondary index exists', { | ||
indexName | ||
}); | ||
yield collection.createIndex({ [workflowStateField]: 1 }, { name: indexName }); | ||
})); | ||
const dropIndexes = existingIndexNames.map((indexName) => tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
yield collection.dropIndex(indexName); | ||
})); | ||
yield Promise.all([ | ||
createPrimaryIndex, | ||
...createSecondaryIndexes, | ||
...dropIndexes | ||
]); | ||
}); | ||
const collectionExists = await this.database | ||
.listCollections({ name: collectionName }, { nameOnly: true }) | ||
.hasNext(); | ||
if (!collectionExists) { | ||
await this.database.createCollection(collectionName); | ||
} | ||
} | ||
async ensureIndexesExist(collectionName, messageWorkflowMappings) { | ||
var _a; | ||
const collection = this.database.collection(collectionName); | ||
const existingIndexes = (_a = (await collection.listIndexes().toArray())) !== null && _a !== void 0 ? _a : []; | ||
const existingIndexNames = existingIndexes | ||
.map(index => index.name) | ||
.filter(x => x !== '_id_'); | ||
const createPrimaryIndex = this.createPrimaryIndex(collectionName, existingIndexNames); | ||
const allWorkflowFields = messageWorkflowMappings.map(mapping => mapping.mapsTo); | ||
const distinctWorkflowFields = new Set(allWorkflowFields); | ||
const workflowFields = [...distinctWorkflowFields]; | ||
const createSecondaryIndexes = workflowFields.map(async (workflowField) => { | ||
const indexName = resolveIndexName(collectionName, workflowField); | ||
const existingIndexLocation = existingIndexNames.indexOf(indexName); | ||
if (existingIndexLocation !== -1) { | ||
this.logger.debug('Index already exists', { indexName }); | ||
existingIndexNames.splice(existingIndexLocation, 1); | ||
createPrimaryIndex(collectionName, existingIndexesNames) { | ||
return tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
const collection = this.database.collection(collectionName); | ||
const primaryIndexName = resolveIndexName(collectionName, 'id', 'version'); | ||
const primaryIndexLocation = existingIndexesNames.indexOf(primaryIndexName); | ||
if (primaryIndexLocation !== -1) { | ||
existingIndexesNames.splice(primaryIndexLocation, 1); | ||
return; | ||
} | ||
const workflowStateField = `${WORKFLOW_DATA_FIELD_NAME}.'${workflowField}'`; | ||
this.logger.debug('Ensuring secondary index exists', { | ||
indexName | ||
this.logger.debug('Ensuring primary index exists', { | ||
primaryIndexName | ||
}); | ||
await collection.createIndex({ [workflowStateField]: 1 }, { name: indexName }); | ||
yield collection.createIndex({ _id: 1, version: 1 }, { name: primaryIndexName }); | ||
}); | ||
const dropIndexes = existingIndexNames.map(async (indexName) => { | ||
await collection.dropIndex(indexName); | ||
}); | ||
await Promise.all([ | ||
createPrimaryIndex, | ||
...createSecondaryIndexes, | ||
...dropIndexes | ||
]); | ||
} | ||
async createPrimaryIndex(collectionName, existingIndexesNames) { | ||
const collection = this.database.collection(collectionName); | ||
const primaryIndexName = resolveIndexName(collectionName, 'id', 'version'); | ||
const primaryIndexLocation = existingIndexesNames.indexOf(primaryIndexName); | ||
if (primaryIndexLocation !== -1) { | ||
existingIndexesNames.splice(primaryIndexLocation, 1); | ||
return; | ||
} | ||
this.logger.debug('Ensuring primary index exists', { | ||
primaryIndexName | ||
}); | ||
await collection.createIndex({ _id: 1, version: 1 }, { name: primaryIndexName }); | ||
} | ||
async upsertWorkflowState(collectionName, workflowId, plainWorkflowState, oldVersion, newVersion) { | ||
const collection = this.database.collection(collectionName); | ||
if (oldVersion === 0) { | ||
this.logger.debug('Inserting new workflow state', { | ||
collectionName, | ||
workflowId, | ||
oldVersion, | ||
newVersion | ||
}); | ||
// This is a new workflow, so just insert the data | ||
await collection.insertOne({ | ||
id: workflowId, | ||
version: newVersion, | ||
[WORKFLOW_DATA_FIELD_NAME]: plainWorkflowState | ||
}); | ||
} | ||
else { | ||
this.logger.debug('Updating existing workflow state', { | ||
collectionName, | ||
workflowId, | ||
oldVersion, | ||
newVersion | ||
}); | ||
const result = await collection.findOneAndUpdate({ | ||
id: workflowId, | ||
version: oldVersion | ||
}, { | ||
$set: { | ||
upsertWorkflowState(collectionName, workflowId, plainWorkflowState, oldVersion, newVersion) { | ||
return tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
const collection = this.database.collection(collectionName); | ||
if (oldVersion === 0) { | ||
this.logger.debug('Inserting new workflow state', { | ||
collectionName, | ||
workflowId, | ||
oldVersion, | ||
newVersion | ||
}); | ||
// This is a new workflow, so just insert the data | ||
yield collection.insertOne({ | ||
id: workflowId, | ||
version: newVersion, | ||
[WORKFLOW_DATA_FIELD_NAME]: plainWorkflowState | ||
}); | ||
} | ||
else { | ||
this.logger.debug('Updating existing workflow state', { | ||
collectionName, | ||
workflowId, | ||
oldVersion, | ||
newVersion | ||
}); | ||
const result = yield collection.findOneAndUpdate({ | ||
id: workflowId, | ||
version: oldVersion | ||
}, { | ||
$set: { | ||
version: newVersion, | ||
[WORKFLOW_DATA_FIELD_NAME]: plainWorkflowState | ||
} | ||
}); | ||
if (!(result === null || result === void 0 ? void 0 : result.value)) { | ||
throw new error_1.WorkflowStateNotFound(workflowId, collectionName, oldVersion); | ||
} | ||
}); | ||
if (!(result === null || result === void 0 ? void 0 : result.value)) { | ||
throw new error_1.WorkflowStateNotFound(workflowId, collectionName, oldVersion); | ||
} | ||
} | ||
}); | ||
} | ||
@@ -215,2 +234,1 @@ } | ||
} | ||
//# sourceMappingURL=mongodb-persistence.js.map |
{ | ||
"name": "@node-ts/bus-mongodb", | ||
"description": "A Mongodb persistence adapter for workflow storage in @node-ts/bus-workflow.", | ||
"version": "1.0.3", | ||
"version": "1.1.0-beta.0", | ||
"license": "MIT", | ||
@@ -13,9 +13,8 @@ "main": "./dist/index.js", | ||
"dependencies": { | ||
"@node-ts/bus-messages": "^1.0.4", | ||
"mongodb": "^5.3.0", | ||
"tslib": "^1.9.3", | ||
"uuid": "^3.3.2" | ||
"tslib": "^2.6.2", | ||
"uuid": "^3.3.2", | ||
"@node-ts/bus-messages": "^1.1.0-beta.0" | ||
}, | ||
"devDependencies": { | ||
"@node-ts/bus-core": "^1.0.15", | ||
"@node-ts/code-standards": "^0.0.10", | ||
@@ -26,3 +25,4 @@ "@types/amqplib": "^0.5.11", | ||
"typemoq": "^2.1.0", | ||
"typescript": "^4.3.5" | ||
"typescript": "^5.3.3", | ||
"@node-ts/bus-core": "^1.1.0-beta.0" | ||
}, | ||
@@ -51,5 +51,5 @@ "peerDependencies": { | ||
"clean": "rm -rf dist", | ||
"build": "tsc --project tsconfig.json --declaration", | ||
"build": "tsc", | ||
"build:watch": "pnpm run build --incremental --watch --preserveWatchOutput" | ||
} | ||
} |
@@ -31,6 +31,8 @@ # @node-ts/bus-mongodb | ||
const run = async () => { | ||
await Bus | ||
const bus = Bus | ||
.configure() | ||
.withPersistence(mongodbPersistence) | ||
.initialize() | ||
.build() | ||
await bus.initialize() | ||
await bus.start() | ||
} | ||
@@ -37,0 +39,0 @@ run.then(() => void) |
@@ -35,8 +35,9 @@ import { | ||
sut = new MongodbPersistence(configuration) | ||
bus = await Bus.configure() | ||
bus = Bus.configure() | ||
.withLogger(() => Mock.ofType<Logger>().object) | ||
.withPersistence(sut) | ||
.withWorkflow(TestWorkflow) | ||
.initialize() | ||
.build() | ||
await bus.initialize() | ||
await bus.start() | ||
@@ -150,3 +151,3 @@ }) | ||
} catch (err) { | ||
error = err | ||
error = err as Error | ||
} | ||
@@ -153,0 +154,0 @@ }) |
@@ -1,13 +0,12 @@ | ||
import { TestWorkflowState } from './test-workflow-state' | ||
import { | ||
Bus, | ||
HandlerContext, | ||
Workflow, | ||
WorkflowMapper | ||
} from '@node-ts/bus-core' | ||
import { TestCommand } from './test-command' | ||
import { BusInstance, Workflow, WorkflowMapper } from '@node-ts/bus-core' | ||
import { RunTask } from './run-task' | ||
import { TaskRan } from './task-ran' | ||
import { TestCommand } from './test-command' | ||
import { TestWorkflowState } from './test-workflow-state' | ||
export class TestWorkflow extends Workflow<TestWorkflowState> { | ||
constructor(private readonly bus: BusInstance) { | ||
super() | ||
} | ||
configureWorkflow( | ||
@@ -20,3 +19,3 @@ mapper: WorkflowMapper<TestWorkflowState, TestWorkflow> | ||
.when(TaskRan, 'complete', { | ||
lookup: ({ message }) => message.value, | ||
lookup: message => message.value, | ||
mapsTo: 'property1' | ||
@@ -27,5 +26,5 @@ }) | ||
async sendRunTask({ | ||
message: { property1 } | ||
}: HandlerContext<TestCommand>): Promise<Partial<TestWorkflowState>> { | ||
await Bus.send(new RunTask(property1!)) | ||
property1 | ||
}: TestCommand): Promise<Partial<TestWorkflowState>> { | ||
await this.bus.send(new RunTask(property1!)) | ||
return { | ||
@@ -36,5 +35,3 @@ property1 | ||
complete({ | ||
message: { value } | ||
}: HandlerContext<TaskRan>): Partial<TestWorkflowState> { | ||
complete({ value }: TaskRan): Partial<TestWorkflowState> { | ||
return this.completeWorkflow({ | ||
@@ -41,0 +38,0 @@ eventValue: value |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
832
55
35597
26
2
- Removedtslib@1.14.1(transitive)
Updatedtslib@^2.6.2