
Research
2025 Report: Destructive Malware in Open Source Packages
Destructive malware is rising across open source registries, using delays and kill switches to wipe code, break builds, and disrupt CI/CD.
@paralect/node-mongo
Advanced tools
Lightweight reactive extension to official Node.js MongoDB driver.
_id field from the ObjectId to a string.createdOn, updatedOn, and deletedOn timestamps for CUD operations;deletedOn field;_outbox postfix that stores all CUD events for implementing the transactional outbox pattern;The following example shows some of these features:
import { eventBus, InMemoryEvent } from "@paralect/node-mongo";
await userService.updateOne({ _id: "62670b6204f1aab85e5033dc" }, (doc) => ({
firstName: "Mark",
}));
eventBus.onUpdated(
"users",
["firstName", "lastName"],
async (data: InMemoryEvent<User>) => {
await userService.atomic.updateOne(
{ _id: data.doc._id },
{ $set: { fullName: `${data.doc.firstName} ${data.doc.lastName}` } }
);
}
);
npm i @paralect/node-mongo
Usually, you need to define a file called db that does two things:
createService to create different Services to work with MongoDB;import {
Database,
Service,
ServiceOptions,
IDocument,
} from "@paralect/node-mongo";
import config from "config";
const database = new Database(config.mongo.connection, config.mongo.dbName);
database.connect();
class CustomService<T extends IDocument> extends Service<T> {
// You can add new methods or override existing here
}
function createService<T extends IDocument>(
collectionName: string,
options: ServiceOptions = {}
) {
return new CustomService<T>(collectionName, database, options);
}
export default {
database,
createService,
};
Service is a collection wrapper that adds all node-mongo features. Under the hood it uses Node.js MongoDB native methods.
createService method returns the service instance. It accepts two parameters: collection name and ServiceOptions.
import { z } from "zod";
import db from "db";
const schema = z
.object({
_id: z.string(),
createdOn: z.date().optional(),
updatedOn: z.date().optional(),
deletedOn: z.date().optional().nullable(),
fullName: z.string(),
})
.strict();
type User = z.infer<typeof schema>;
const service = db.createService<User>("users", {
schemaValidator: (obj) => schema.parseAsync(obj),
});
export default service;
import userService from "user.service";
await userService.insertOne({ fullName: "Max" });
Node-mongo supports any schema library, but we recommend Zod, due to this ability to generate TypeScript types from the schemas.
const schema = z.object({
_id: z.string(),
createdOn: z.date().optional(),
updatedOn: z.date().optional(),
deletedOn: z.date().optional().nullable(),
fullName: z.string(),
});
type User = z.infer<typeof schema>;
const service = createService<User>("users", {
schemaValidator: (obj) => schema.parseAsync(obj),
});
const schema = Joi.object({
_id: Joi.string().required(),
createdOn: Joi.date(),
updatedOn: Joi.date(),
deletedOn: Joi.date().allow(null),
fullName: Joi.string().required(),
});
type User = {
_id: string;
createdOn?: Date;
updatedOn?: Date;
deletedOn?: Date | null;
fullName: string;
};
const service = createService<User>("users", {
schemaValidator: (obj) => schema.validateAsync(obj),
});
Node-mongo validates documents before save.
The key feature of the node-mongo is that each create, update or delete operation publishes a CUD event.
${collectionName}.created${collectionName}.updated${collectionName}.deletedEvents are used to easily update denormalized data and also to implement complex business logic without tight coupling of different entities.
SDK support two type of events:
eventBus (Node.js EventEmitter instance);eventBus.{ outbox: true } when creating a service;_outbox postfix;watch (method for working with Change Streams) on the outbox table;On the project start, we recommend using in-memory events. When your application becomes tougher you should migrate to transactional events.
findfind(
filter: Filter<T>,
readConfig: ReadConfig & { page?: number; perPage?: number } = {},
findOptions: FindOptions = {},
): Promise<FindResult<T>>
const { results: users, count: usersCount } = await userService.find({
status: "active",
});
Fetches documents that matches the filter. Returns an object with the following fields(FindResult):
| Field | Description |
|---|---|
| results | documents, that matches the filter |
| count | total number of documents, that matches the filter |
| pagesCount | total number of documents, that matches the filter divided by the number of documents per page |
Pass page and perPage params to get a paginated result. Otherwise, all documents will be returned.
Parameters
Filter<T>;ReadConfig & { page?: number; perPage?: number };FindOptions;Returns Promise<FindResult<T>>.
findOnefindOne(
filter: Filter<T>,
readConfig: ReadConfig = {},
findOptions: FindOptions = {},
): Promise<T | null>
const user = await userService.findOne({ _id: u._id });
Fetches the first document that matches the filter. Returns null if document was not found.
Parameters
Filter<T>;ReadConfig;FindOptions;Returns Promise<T | null>.
updateOneupdateOne: (
filter: Filter<T>,
updateFn: (doc: T) => Partial<T>,
updateConfig: UpdateConfig = {},
updateOptions: UpdateOptions = {},
): Promise<T | null>
const updatedUserWithEvent = await userService.updateOne(
{ _id: u._id },
(doc) => ({ fullName: "Updated fullname" })
);
const updatedUser = await userService.updateOne(
{ _id: u._id },
(doc) => ({ fullName: "Updated fullname" }),
{ publishEvents: false }
);
Updates a single document and returns it. Returns null if document was not found.
Parameters
Filter<T>;(doc: T) => Partial<T>;UpdateConfig;UpdateOptions;Returns Promise<T | null>.
updateManyupdateMany: (
filter: Filter<T>,
updateFn: (doc: T) => Partial<T>,
updateConfig: UpdateConfig = {},
updateOptions: UpdateOptions = {},
): Promise<T[]>
const updatedUsers = await userService.updateMany(
{ status: "active" },
(doc) => ({ isEmailVerified: true })
);
Updates multiple documents that match the query. Returns array with updated documents.
Parameters
Filter<T>;(doc: T) => Partial<T>;UpdateConfig;UpdateOptions;Returns Promise<T[]>.
insertOneinsertOne: (
object: Partial<T>,
createConfig: CreateConfig = {},
insertOneOptions: InsertOneOptions = {},
): Promise<T>
const user = await userService.insertOne({
fullName: "John",
});
Inserts a single document into a collection and returns it.
Parameters
Partial<T>;CreateConfig;InsertOneOptions;Returns Promise<T>.
insertManyinsertMany: (
objects: Partial<T>[],
createConfig: CreateConfig = {},
bulkWriteOptions: BulkWriteOptions = {},
): Promise<T[]>
const users = await userService.insertMany([
{ fullName: "John" },
{ fullName: "Kobe" },
]);
Inserts multiple documents into a collection and returns them.
Parameters
Partial<T>[];CreateConfig;BulkWriteOptions;Returns Promise<T[]>.
deleteSoftdeleteSoft: (
filter: Filter<T>,
deleteConfig: DeleteConfig = {},
deleteOptions: DeleteOptions = {},
): Promise<T[]>
const deletedUsers = await userService.deleteSoft({ status: "deactivated" });
Adds deletedOn field to the documents that match the query and returns them.
Parameters
Filter<T>;DeleteConfig;DeleteOptions;Returns Promise<T[]>.
deleteOnedeleteOne: (
filter: Filter<T>,
deleteConfig: DeleteConfig = {},
deleteOptions: DeleteOptions = {},
): Promise<T | null>
const deletedUser = await userService.deleteOne({ _id: u._id });
Deletes a single document and returns it. Returns null if document was not found.
Parameters
Filter<T>;DeleteConfig;DeleteOptions;Returns Promise<T | null>.
deleteManydeleteMany: (
filter: Filter<T>,
deleteConfig: DeleteConfig = {},
deleteOptions: DeleteOptions = {},
): Promise<T[]>
const deletedUsers = await userService.deleteMany({ status: "deactivated" });
Deletes multiple documents that match the query. Returns array with deleted documents.
Parameters
Filter<T>;DeleteConfig;DeleteOptions;Returns Promise<T[]>.
replaceOnereplaceOne: (
filter: Filter<T>,
replacement: Partial<T>,
readConfig: ReadConfig = {},
replaceOptions: ReplaceOptions = {},
): Promise<UpdateResult | Document>
await usersService.replaceOne({ _id: u._id }, { fullName: fullNameToUpdate });
Replaces a single document within the collection based on the filter. Doesn't validate schema or publish events.
Parameters
Filter<T>;Partial<T>;ReadConfig;ReplaceOptions;Returns Promise<UpdateResult | Document>.
atomic.updateOneupdateOne: (
filter: Filter<T>,
updateFilter: UpdateFilter<T>,
readConfig: ReadConfig = {},
updateOptions: UpdateOptions = {},
): Promise<UpdateResult>
await userService.atomic.updateOne(
{ _id: u._id },
{ $set: { fullName: `${u.firstName} ${u.lastName}` } }
);
Updates a single document. Doesn't validate schema or publish events.
Parameters
Filter<T>;UpdateFilter<T>;ReadConfig;UpdateOptions;Returns Promise<UpdateResult>.
atomic.updateManyupdateMany: (
filter: Filter<T>,
updateFilter: UpdateFilter<T>,
readConfig: ReadConfig = {},
updateOptions: UpdateOptions = {},
): Promise<Document | UpdateResult>
await userService.atomic.updateMany(
{ firstName: { $exists: true }, lastName: { $exists: true } },
{ $set: { fullName: `${u.firstName} ${u.lastName}` } }
);
Updates all documents that match the specified filter. Doesn't validate schema or publish events.
Parameters
Filter<T>;UpdateFilter<T>;ReadConfig;UpdateOptions;Returns Promise<UpdateResult | Document>.
existsexists(
filter: Filter<T>,
readConfig: ReadConfig = {},
findOptions: FindOptions = {},
): Promise<boolean>
const isUserExists = await userService.exists({ email: "example@gmail.com" });
Returns true if document exists, otherwise false.
Parameters
Filter<T>;ReadConfig;FindOptions;Returns Promise<boolean>.
countDocumentscountDocuments(
filter: Filter<T>,
readConfig: ReadConfig = {},
countDocumentOptions: CountDocumentsOptions = {},
): Promise<boolean>
const documentsCount = await userService.countDocuments({ status: "active" });
Returns amount of documents that matches the query.
Parameters
Filter<T>;ReadConfig;CountDocumentsOptions;Returns Promise<number>.
distinctdistinct(
key: string,
filter: Filter<T>,
readConfig: ReadConfig = {},
distinctOptions: DistinctOptions = {},
): Promise<any[]>
const statesList = await userService.distinct("states");
Returns distinct values for a specified field across a single collection or view and returns the results in an array.
Parameters
string;Filter<T>;ReadConfig;DistinctOptions;Returns Promise<any[]>.
aggregateaggregate: (
pipeline: any[],
options: AggregateOptions = {},
): Promise<any[]>
const sortedActiveUsers = await userService.aggregate([
{ $match: { status: "active" } },
{ $sort: { firstName: -1, lastName: -1 } },
]);
Executes an aggregation framework pipeline and returns array with aggregation result of documents.
Parameters
any[];AggregateOptions;Returns Promise<any[]>.
watchwatch: (
pipeline: Document[] | undefined,
options: ChangeStreamOptions = {},
): Promise<any>
const watchCursor = userService.watch();
Creates a new Change Stream, watching for new changes and returns a cursor.
Parameters
Document[] | undefined;ChangeStreamOptions;Returns Promise<any>.
dropdrop: (
recreate: boolean = false,
): Promise<void>
await userService.drop();
Removes a collection from the database. The method also removes any indexes associated with the dropped collection.
Parameters
boolean;
Should create collection after deletion.Returns Promise<void>.
indexExistsindexExists: (
indexes: string | string[],
indexInformationOptions: IndexInformationOptions = {},
): Promise<boolean>
const isIndexExists = await usersService.indexExists(index);
Checks if one or more indexes exist on the collection, fails on first non-existing index.
Parameters
string | string[];IndexInformationOptions;Returns Promise<string | void>.
createIndexcreateIndex: (
indexSpec: IndexSpecification,
options: CreateIndexesOptions = {},
): Promise<string | void>
await usersService.createIndex({ fullName: 1 });
Creates collection index.
Parameters
IndexSpecification;CreateIndexesOptions;Returns Promise<string | void>.
createIndexescreateIndexes: (
indexSpecs: IndexDescription[],
options: CreateIndexesOptions = {},
): Promise<string[] | void>
await usersService.createIndexes([
{ key: { fullName: 1 } },
{ key: { createdOn: 1 } },
]);
Creates one or more indexes on a collection.
Parameters
IndexDescription[];CreateIndexesOptions;Returns Promise<string[] | void>.
dropIndexdropIndex: (
indexName: string,
options: DropIndexesOptions = {},
): Promise<void | Document>
await userService.dropIndex({ firstName: 1, lastName: -1 });
Removes the specified index from a collection.
Parameters
string;DropIndexesOptions;Returns Promise<void | Document>.
dropIndexesdropIndexes: (
options: DropIndexesOptions = {},
): Promise<void | Document>
Removes all but the _id index from a collection.
await userService.dropIndexes();
Parameters
DropIndexesOptions;Returns Promise<void | Document>.
eventBus.onon: (
eventName: string,
handler: InMemoryEventHandler,
): void
import { eventBus, InMemoryEvent } from "@paralect/node-mongo";
const collectionName = "users";
eventBus.on(`${collectionName}.created`, (data: InMemoryEvent<User>) => {
try {
const user = data.doc;
console.log("user created", user);
} catch (err) {
logger.error(`${USERS}.created handler error: ${err}`);
}
});
eventBus.on(`${collectionName}.updated`, (data: InMemoryEvent<User>) => {});
eventBus.on(`${collectionName}.deleted`, (data: InMemoryEvent<User>) => {});
In-memory events handler that listens for a CUD events.
Parameters
string;${collectionName}.created, ${collectionName}.updated, ${collectionName}.deleted.InMemoryEventHandler;Returns void.
eventBus.onceonce: (
eventName: string,
handler: InMemoryEventHandler,
): void
eventBus.once(`${USERS}.updated`, (data: InMemoryEvent<User>) => {
try {
const user = data.doc;
console.log("user updated", user);
} catch (err) {
logger.error(`${USERS}.updated handler error: ${err}`);
}
});
In-memory events handler that listens for a CUD events. It will be called only once.
Parameters
string;${collectionName}.created, ${collectionName}.updated, ${collectionName}.deleted.InMemoryEventHandler;Returns void.
eventBus.onUpdatedonUpdated: (
entity: string,
properties: OnUpdatedProperties,
handler: InMemoryEventHandler,
): void
import { eventBus, InMemoryEvent } from "@paralect/node-mongo";
eventBus.onUpdated(
"users",
["firstName", "lastName"],
async (data: InMemoryEvent<User>) => {
try {
await userService.atomic.updateOne(
{ _id: data.doc._id },
{ $set: { fullName: `${data.doc.firstName} ${data.doc.lastName}` } }
);
} catch (err) {
console.log(
`users onUpdated ['firstName', 'lastName'] handler error: ${err}`
);
}
}
);
eventBus.onUpdated(
"users",
[{ fullName: "John Wake", firstName: "John" }, "lastName"],
() => {}
);
eventBus.onUpdated("users", ["oauth.google"], () => {});
In-memory events handler that listens for specific fields updates. It will be called when one of the provided properties updates.
Parameters
string;OnUpdatedProperties;InMemoryEventHandler;Returns void.
withTransactionwithTransaction: <TRes = any>(
transactionFn: (session: ClientSession) => Promise<TRes>,
): Promise<TRes>
Runs callbacks and automatically commits or rollbacks transaction.
import db from "db";
const { user, company } = await db.withTransaction(async (session) => {
const createdUser = await usersService.insertOne(
{ fullName: "Bahrimchuk" },
{},
{ session }
);
const createdCompany = await companyService.insertOne(
{ users: [createdUser._id] },
{},
{ session }
);
return { user: createdUser, company: createdCompany };
});
Parameters
(session: ClientSession) => Promise<TRes>;Promise.Returns Promise<TRes>.
ServiceOptionsinterface ServiceOptions {
skipDeletedOnDocs?: boolean;
schemaValidator?: (obj: any) => Promise<any>;
publishEvents?: boolean;
addCreatedOnField?: boolean;
addUpdatedOnField?: boolean;
outbox?: boolean;
collectionOptions?: CollectionOptions;
collectionCreateOptions?: CreateCollectionOptions;
}
| Option | Description | Default value |
|---|---|---|
skipDeletedOnDocs | Skip documents with the deletedOn field | true |
schemaValidator | Validation function that will be called on data save | - |
publishEvents | Publish CUD events on save. | true |
addCreatedOnField | Set the createdOn field to the current timestamp on document creation. | true |
addUpdatedOnField | Set updateOne field to the current timestamp on the document update. | true |
outbox | Use transactional events instead of in-memory events | false |
collectionOptions | MongoDB CollectionOptions | {} |
collectionCreateOptions | MongoDB CreateCollectionOptions | {} |
CreateConfigOverrides ServiceOptions parameters for create operations.
type CreateConfig = {
validateSchema?: boolean;
publishEvents?: boolean;
};
ReadConfigOverrides ServiceOptions parameters for read operations.
type ReadConfig = {
skipDeletedOnDocs?: boolean;
};
UpdateConfigOverrides ServiceOptions parameters for update operations.
type UpdateConfig = {
skipDeletedOnDocs?: boolean;
validateSchema?: boolean;
publishEvents?: boolean;
};
DeleteConfigOverrides ServiceOptions parameters for delete operations.
type DeleteConfig = {
skipDeletedOnDocs?: boolean;
publishEvents?: boolean;
};
InMemoryEventtype InMemoryEvent<T = any> = {
doc: T;
prevDoc?: T;
name: string;
createdOn: Date;
};
InMemoryEventHandlertype InMemoryEventHandler = (evt: InMemoryEvent) => Promise<void> | void;
OnUpdatedPropertiestype OnUpdatedProperties = Array<Record<string, unknown> | string>;
Extending API for a single service.
const service = db.createService<User>("users", {
schemaValidator: (obj) => schema.parseAsync(obj),
});
const privateFields = ["passwordHash", "signupToken", "resetPasswordToken"];
const getPublic = (user: User | null) => _.omit(user, privateFields);
export default Object.assign(service, {
updateLastRequest,
getPublic,
});
Extending API for all services.
const database = new Database(config.mongo.connection, config.mongo.dbName);
class CustomService<T extends IDocument> extends Service<T> {
createOrUpdate = async (
query: any,
updateCallback: (item?: T) => Partial<T>
) => {
const docExists = await this.exists(query);
if (!docExists) {
const newDoc = updateCallback();
return this.insertOne(newDoc);
}
return this.updateOne(query, (doc) => updateCallback(doc));
};
}
function createService<T extends IDocument>(
collectionName: string,
options: ServiceOptions = {}
) {
return new CustomService<T>(collectionName, database, options);
}
const userService = createService<UserType>("users", {
schemaValidator: (obj) => schema.parseAsync(obj),
});
await userService.createOrUpdate({ _id: "some-id" }, () => ({
fullName: "Max",
}));
FAQs
Reactive MongoDB wrapper for Node.JS
We found that @paralect/node-mongo demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 5 open source maintainers collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Research
Destructive malware is rising across open source registries, using delays and kill switches to wipe code, break builds, and disrupt CI/CD.

Security News
Socket CTO Ahmad Nassri shares practical AI coding techniques, tools, and team workflows, plus what still feels noisy and why shipping remains human-led.

Research
/Security News
A five-month operation turned 27 npm packages into durable hosting for browser-run lures that mimic document-sharing portals and Microsoft sign-in, targeting 25 organizations across manufacturing, industrial automation, plastics, and healthcare for credential theft.