Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@node-ts/bus-mongodb

Package Overview
Dependencies
Maintainers
1
Versions
6
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@node-ts/bus-mongodb - npm Package Compare versions

Comparing version 1.0.3 to 1.1.0-beta.0

3

dist/error/index.js
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const tslib_1 = require("tslib");
(0, tslib_1.__exportStar)(require("./workflow-state-not-found"), exports);
//# sourceMappingURL=index.js.map
tslib_1.__exportStar(require("./workflow-state-not-found"), exports);

@@ -13,2 +13,1 @@ "use strict";

exports.WorkflowStateNotFound = WorkflowStateNotFound;
//# sourceMappingURL=workflow-state-not-found.js.map
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const tslib_1 = require("tslib");
(0, tslib_1.__exportStar)(require("./mongodb-configuration"), exports);
(0, tslib_1.__exportStar)(require("./mongodb-persistence"), exports);
//# sourceMappingURL=index.js.map
tslib_1.__exportStar(require("./mongodb-configuration"), exports);
tslib_1.__exportStar(require("./mongodb-persistence"), exports);
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
//# sourceMappingURL=mongodb-configuration.js.map
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.MongodbPersistence = void 0;
const tslib_1 = require("tslib");
const mongodb_1 = require("mongodb");

@@ -19,156 +20,174 @@ const error_1 = require("./error");

}
async initialize() {
this.logger.info('Initializing mongodb persistence...');
await this.client.connect();
this.database = this.client.db(this.configuration.databaseName);
this.logger.info('Mongodb persistence initialized');
initialize() {
return tslib_1.__awaiter(this, void 0, void 0, function* () {
this.logger.info('Initializing mongodb persistence...');
yield this.client.connect();
this.database = this.client.db(this.configuration.databaseName);
this.logger.info('Mongodb persistence initialized');
});
}
async dispose() {
this.logger.info('Disposing Mongodb persistence...');
await this.client.close();
this.logger.info('Mongodb persistence disposed');
dispose() {
return tslib_1.__awaiter(this, void 0, void 0, function* () {
this.logger.info('Disposing Mongodb persistence...');
yield this.client.close();
this.logger.info('Mongodb persistence disposed');
});
}
async initializeWorkflow(workflowStateConstructor, messageWorkflowMappings) {
await this.client.connect();
this.database = this.client.db(this.configuration.databaseName);
const workflowStateName = new workflowStateConstructor().$name;
this.logger.info('Initializing workflow', {
workflowState: workflowStateName
initializeWorkflow(workflowStateConstructor, messageWorkflowMappings) {
return tslib_1.__awaiter(this, void 0, void 0, function* () {
yield this.client.connect();
this.database = this.client.db(this.configuration.databaseName);
const workflowStateName = new workflowStateConstructor().$name;
this.logger.info('Initializing workflow', {
workflowState: workflowStateName
});
const collectionName = resolveQualifiedTableName(workflowStateName);
yield this.ensureCollectionExists(collectionName);
yield this.ensureIndexesExist(collectionName, messageWorkflowMappings);
});
const collectionName = resolveQualifiedTableName(workflowStateName);
await this.ensureCollectionExists(collectionName);
await this.ensureIndexesExist(collectionName, messageWorkflowMappings);
}
async getWorkflowState(workflowStateConstructor, messageMap, message, attributes, includeCompleted = false) {
this.logger.debug('Getting workflow state', {
workflowStateName: workflowStateConstructor.name
getWorkflowState(workflowStateConstructor, messageMap, message, attributes, includeCompleted = false) {
return tslib_1.__awaiter(this, void 0, void 0, function* () {
this.logger.debug('Getting workflow state', {
workflowStateName: workflowStateConstructor.name
});
const workflowStateName = new workflowStateConstructor().$name;
const tableName = resolveQualifiedTableName(workflowStateName);
const matcherValue = messageMap.lookup(message, attributes);
const workflowStateField = `${WORKFLOW_DATA_FIELD_NAME}.${normalizeProperty(messageMap.mapsTo)}`;
const collection = this.database.collection(tableName);
const findObject = {
[workflowStateField]: matcherValue
};
if (!includeCompleted) {
findObject[`${WORKFLOW_DATA_FIELD_NAME}.${normalizeProperty('$status')}`] = 'running';
}
const documents = yield collection.find(findObject).toArray();
this.logger.debug('Querying workflow state', { findObject });
this.logger.debug('Got workflow state', {
resultsCount: documents === null || documents === void 0 ? void 0 : documents.length
});
const rows = documents.map(x => x[WORKFLOW_DATA_FIELD_NAME]);
return rows
.map(row => mapKeys(row, (key, _) => denormalizeProperty(key)))
.filter(workflowState => workflowState !== undefined)
.map(workflowState => this.coreDependencies.serializer.toClass(workflowState, workflowStateConstructor));
});
const workflowStateName = new workflowStateConstructor().$name;
const tableName = resolveQualifiedTableName(workflowStateName);
const matcherValue = messageMap.lookup(message, attributes);
const workflowStateField = `${WORKFLOW_DATA_FIELD_NAME}.${normalizeProperty(messageMap.mapsTo)}`;
const collection = this.database.collection(tableName);
const findObject = {
[workflowStateField]: matcherValue
};
if (!includeCompleted) {
findObject[`${WORKFLOW_DATA_FIELD_NAME}.${normalizeProperty('$status')}`] = 'running';
}
const documents = await collection.find(findObject).toArray();
this.logger.debug('Querying workflow state', { findObject });
this.logger.debug('Got workflow state', {
resultsCount: documents === null || documents === void 0 ? void 0 : documents.length
}
saveWorkflowState(workflowState) {
return tslib_1.__awaiter(this, void 0, void 0, function* () {
this.logger.debug('Saving workflow state', {
workflowStateName: workflowState.$name,
id: workflowState.$workflowId
});
const collectionName = resolveQualifiedTableName(workflowState.$name);
const oldVersion = workflowState.$version;
const newVersion = oldVersion + 1;
const modifiedState = mapKeys(workflowState, (key, _) => normalizeProperty(key));
const plainWorkflowState = Object.assign(Object.assign({}, this.coreDependencies.serializer.toPlain(modifiedState)), { __version: newVersion });
yield this.upsertWorkflowState(collectionName, workflowState.$workflowId, plainWorkflowState, oldVersion, newVersion);
});
const rows = documents.map(x => x[WORKFLOW_DATA_FIELD_NAME]);
return rows
.map(row => mapKeys(row, (key, _) => denormalizeProperty(key)))
.filter(workflowState => workflowState !== undefined)
.map(workflowState => this.coreDependencies.serializer.toClass(workflowState, workflowStateConstructor));
}
async saveWorkflowState(workflowState) {
this.logger.debug('Saving workflow state', {
workflowStateName: workflowState.$name,
id: workflowState.$workflowId
ensureCollectionExists(collectionName) {
return tslib_1.__awaiter(this, void 0, void 0, function* () {
this.logger.debug('Ensuring mongodb collection for workflow state exists', {
collectionName
});
const collectionExists = yield this.database
.listCollections({ name: collectionName }, { nameOnly: true })
.hasNext();
if (!collectionExists) {
yield this.database.createCollection(collectionName);
}
});
const collectionName = resolveQualifiedTableName(workflowState.$name);
const oldVersion = workflowState.$version;
const newVersion = oldVersion + 1;
const modifiedState = mapKeys(workflowState, (key, _) => normalizeProperty(key));
const plainWorkflowState = Object.assign(Object.assign({}, this.coreDependencies.serializer.toPlain(modifiedState)), { __version: newVersion });
await this.upsertWorkflowState(collectionName, workflowState.$workflowId, plainWorkflowState, oldVersion, newVersion);
}
async ensureCollectionExists(collectionName) {
this.logger.debug('Ensuring mongodb collection for workflow state exists', {
collectionName
ensureIndexesExist(collectionName, messageWorkflowMappings) {
var _a;
return tslib_1.__awaiter(this, void 0, void 0, function* () {
const collection = this.database.collection(collectionName);
const existingIndexes = (_a = (yield collection.listIndexes().toArray())) !== null && _a !== void 0 ? _a : [];
const existingIndexNames = existingIndexes
.map(index => index.name)
.filter(x => x !== '_id_');
const createPrimaryIndex = this.createPrimaryIndex(collectionName, existingIndexNames);
const allWorkflowFields = messageWorkflowMappings.map(mapping => mapping.mapsTo);
const distinctWorkflowFields = new Set(allWorkflowFields);
const workflowFields = [...distinctWorkflowFields];
const createSecondaryIndexes = workflowFields.map((workflowField) => tslib_1.__awaiter(this, void 0, void 0, function* () {
const indexName = resolveIndexName(collectionName, workflowField);
const existingIndexLocation = existingIndexNames.indexOf(indexName);
if (existingIndexLocation !== -1) {
this.logger.debug('Index already exists', { indexName });
existingIndexNames.splice(existingIndexLocation, 1);
return;
}
const workflowStateField = `${WORKFLOW_DATA_FIELD_NAME}.'${workflowField}'`;
this.logger.debug('Ensuring secondary index exists', {
indexName
});
yield collection.createIndex({ [workflowStateField]: 1 }, { name: indexName });
}));
const dropIndexes = existingIndexNames.map((indexName) => tslib_1.__awaiter(this, void 0, void 0, function* () {
yield collection.dropIndex(indexName);
}));
yield Promise.all([
createPrimaryIndex,
...createSecondaryIndexes,
...dropIndexes
]);
});
const collectionExists = await this.database
.listCollections({ name: collectionName }, { nameOnly: true })
.hasNext();
if (!collectionExists) {
await this.database.createCollection(collectionName);
}
}
async ensureIndexesExist(collectionName, messageWorkflowMappings) {
var _a;
const collection = this.database.collection(collectionName);
const existingIndexes = (_a = (await collection.listIndexes().toArray())) !== null && _a !== void 0 ? _a : [];
const existingIndexNames = existingIndexes
.map(index => index.name)
.filter(x => x !== '_id_');
const createPrimaryIndex = this.createPrimaryIndex(collectionName, existingIndexNames);
const allWorkflowFields = messageWorkflowMappings.map(mapping => mapping.mapsTo);
const distinctWorkflowFields = new Set(allWorkflowFields);
const workflowFields = [...distinctWorkflowFields];
const createSecondaryIndexes = workflowFields.map(async (workflowField) => {
const indexName = resolveIndexName(collectionName, workflowField);
const existingIndexLocation = existingIndexNames.indexOf(indexName);
if (existingIndexLocation !== -1) {
this.logger.debug('Index already exists', { indexName });
existingIndexNames.splice(existingIndexLocation, 1);
createPrimaryIndex(collectionName, existingIndexesNames) {
return tslib_1.__awaiter(this, void 0, void 0, function* () {
const collection = this.database.collection(collectionName);
const primaryIndexName = resolveIndexName(collectionName, 'id', 'version');
const primaryIndexLocation = existingIndexesNames.indexOf(primaryIndexName);
if (primaryIndexLocation !== -1) {
existingIndexesNames.splice(primaryIndexLocation, 1);
return;
}
const workflowStateField = `${WORKFLOW_DATA_FIELD_NAME}.'${workflowField}'`;
this.logger.debug('Ensuring secondary index exists', {
indexName
this.logger.debug('Ensuring primary index exists', {
primaryIndexName
});
await collection.createIndex({ [workflowStateField]: 1 }, { name: indexName });
yield collection.createIndex({ _id: 1, version: 1 }, { name: primaryIndexName });
});
const dropIndexes = existingIndexNames.map(async (indexName) => {
await collection.dropIndex(indexName);
});
await Promise.all([
createPrimaryIndex,
...createSecondaryIndexes,
...dropIndexes
]);
}
async createPrimaryIndex(collectionName, existingIndexesNames) {
const collection = this.database.collection(collectionName);
const primaryIndexName = resolveIndexName(collectionName, 'id', 'version');
const primaryIndexLocation = existingIndexesNames.indexOf(primaryIndexName);
if (primaryIndexLocation !== -1) {
existingIndexesNames.splice(primaryIndexLocation, 1);
return;
}
this.logger.debug('Ensuring primary index exists', {
primaryIndexName
});
await collection.createIndex({ _id: 1, version: 1 }, { name: primaryIndexName });
}
async upsertWorkflowState(collectionName, workflowId, plainWorkflowState, oldVersion, newVersion) {
const collection = this.database.collection(collectionName);
if (oldVersion === 0) {
this.logger.debug('Inserting new workflow state', {
collectionName,
workflowId,
oldVersion,
newVersion
});
// This is a new workflow, so just insert the data
await collection.insertOne({
id: workflowId,
version: newVersion,
[WORKFLOW_DATA_FIELD_NAME]: plainWorkflowState
});
}
else {
this.logger.debug('Updating existing workflow state', {
collectionName,
workflowId,
oldVersion,
newVersion
});
const result = await collection.findOneAndUpdate({
id: workflowId,
version: oldVersion
}, {
$set: {
upsertWorkflowState(collectionName, workflowId, plainWorkflowState, oldVersion, newVersion) {
return tslib_1.__awaiter(this, void 0, void 0, function* () {
const collection = this.database.collection(collectionName);
if (oldVersion === 0) {
this.logger.debug('Inserting new workflow state', {
collectionName,
workflowId,
oldVersion,
newVersion
});
// This is a new workflow, so just insert the data
yield collection.insertOne({
id: workflowId,
version: newVersion,
[WORKFLOW_DATA_FIELD_NAME]: plainWorkflowState
});
}
else {
this.logger.debug('Updating existing workflow state', {
collectionName,
workflowId,
oldVersion,
newVersion
});
const result = yield collection.findOneAndUpdate({
id: workflowId,
version: oldVersion
}, {
$set: {
version: newVersion,
[WORKFLOW_DATA_FIELD_NAME]: plainWorkflowState
}
});
if (!(result === null || result === void 0 ? void 0 : result.value)) {
throw new error_1.WorkflowStateNotFound(workflowId, collectionName, oldVersion);
}
});
if (!(result === null || result === void 0 ? void 0 : result.value)) {
throw new error_1.WorkflowStateNotFound(workflowId, collectionName, oldVersion);
}
}
});
}

@@ -215,2 +234,1 @@ }

}
//# sourceMappingURL=mongodb-persistence.js.map
{
"name": "@node-ts/bus-mongodb",
"description": "A Mongodb persistence adapter for workflow storage in @node-ts/bus-workflow.",
"version": "1.0.3",
"version": "1.1.0-beta.0",
"license": "MIT",

@@ -13,9 +13,8 @@ "main": "./dist/index.js",

"dependencies": {
"@node-ts/bus-messages": "^1.0.4",
"mongodb": "^5.3.0",
"tslib": "^1.9.3",
"uuid": "^3.3.2"
"tslib": "^2.6.2",
"uuid": "^3.3.2",
"@node-ts/bus-messages": "^1.1.0-beta.0"
},
"devDependencies": {
"@node-ts/bus-core": "^1.0.15",
"@node-ts/code-standards": "^0.0.10",

@@ -26,3 +25,4 @@ "@types/amqplib": "^0.5.11",

"typemoq": "^2.1.0",
"typescript": "^4.3.5"
"typescript": "^5.3.3",
"@node-ts/bus-core": "^1.1.0-beta.0"
},

@@ -51,5 +51,5 @@ "peerDependencies": {

"clean": "rm -rf dist",
"build": "tsc --project tsconfig.json --declaration",
"build": "tsc",
"build:watch": "pnpm run build --incremental --watch --preserveWatchOutput"
}
}

@@ -31,6 +31,8 @@ # @node-ts/bus-mongodb

const run = async () => {
await Bus
const bus = Bus
.configure()
.withPersistence(mongodbPersistence)
.initialize()
.build()
await bus.initialize()
await bus.start()
}

@@ -37,0 +39,0 @@ run.then(() => void)

@@ -35,8 +35,9 @@ import {

sut = new MongodbPersistence(configuration)
bus = await Bus.configure()
bus = Bus.configure()
.withLogger(() => Mock.ofType<Logger>().object)
.withPersistence(sut)
.withWorkflow(TestWorkflow)
.initialize()
.build()
await bus.initialize()
await bus.start()

@@ -150,3 +151,3 @@ })

} catch (err) {
error = err
error = err as Error
}

@@ -153,0 +154,0 @@ })

@@ -1,13 +0,12 @@

import { TestWorkflowState } from './test-workflow-state'
import {
Bus,
HandlerContext,
Workflow,
WorkflowMapper
} from '@node-ts/bus-core'
import { TestCommand } from './test-command'
import { BusInstance, Workflow, WorkflowMapper } from '@node-ts/bus-core'
import { RunTask } from './run-task'
import { TaskRan } from './task-ran'
import { TestCommand } from './test-command'
import { TestWorkflowState } from './test-workflow-state'
export class TestWorkflow extends Workflow<TestWorkflowState> {
constructor(private readonly bus: BusInstance) {
super()
}
configureWorkflow(

@@ -20,3 +19,3 @@ mapper: WorkflowMapper<TestWorkflowState, TestWorkflow>

.when(TaskRan, 'complete', {
lookup: ({ message }) => message.value,
lookup: message => message.value,
mapsTo: 'property1'

@@ -27,5 +26,5 @@ })

async sendRunTask({
message: { property1 }
}: HandlerContext<TestCommand>): Promise<Partial<TestWorkflowState>> {
await Bus.send(new RunTask(property1!))
property1
}: TestCommand): Promise<Partial<TestWorkflowState>> {
await this.bus.send(new RunTask(property1!))
return {

@@ -36,5 +35,3 @@ property1

complete({
message: { value }
}: HandlerContext<TaskRan>): Partial<TestWorkflowState> {
complete({ value }: TaskRan): Partial<TestWorkflowState> {
return this.completeWorkflow({

@@ -41,0 +38,0 @@ eventValue: value

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc