
Research
Two Malicious Rust Crates Impersonate Popular Logger to Steal Wallet Keys
Socket uncovers malicious Rust crates impersonating fast_log to steal Solana and Ethereum wallet keys from source code.
@moleculer/workflows
Advanced tools
****
Reliable & scalable workflow feature (like Temporal.io or Restate) for Moleculer framework.
This project is in work-in-progress. Be careful using it in production.
To install the package, use the following command:
npm i @moleculer/workflows
const { ServiceBroker } = require("moleculer");
const WorkflowsMiddleware = require("@moleculer/workflows").Middleware;
// Create a ServiceBroker
const broker = new ServiceBroker({
logger: true,
middlewares: [WorkflowsMiddleware({ adapter: "Redis" })]
});
// Define a service with workflows
broker.createService({
name: "test",
workflows: {
simpleWorkflow: {
async handler(ctx) {
return `Hello, ${ctx.params.name}`;
}
}
}
});
// Start the broker
broker.start().then(async () => {
// Run the workflow
const result = await broker.wf.run("test.simpleWorkflow", { name: "World" });
console.log(result);
});
broker.createService({
name: "users",
workflows: {
signupWorkflow: {
// Max execution time
timeout: "1 day",
// Retention of finished job history
retention: "3 days",
// Concurrent executions
concurrency: 3,
// Parameter validation
params: {
email: { type: "email" },
name: { type: "string" }
},
// Enable tracing
tracing: true,
async handler(ctx) {
// Create user
const user = await ctx.call("users.create", ctx.params);
// Save the state of the job (It can be a string, number, or object)
await ctx.wf.setState("CREATED");
// Send verification email
await ctx.call("mail.send", { type: "welcome", user });
try {
// Waiting for verification (max 1h)
await ctx.wf.waitForSignal("email.verification", user.id, {
timeout: "1 hour"
});
await ctx.wf.setState("VERIFIED");
} catch (err) {
if (err.name == "WorkflowTaskTimeoutError") {
// Registraion not verified in 1 hour, remove the user
await ctx.call("user.remove", { id: user.id });
return null;
}
// Other error should be thrown further
throw err;
}
// Set user verified and save
user.verified = true;
await ctx.call("users.update", user);
// Set the workflow state to done
await ctx.wf.setState("DONE");
return user;
}
}
},
actions: {
// ... common actions
// REST API to start the signup workflow job
register: {
rest: "POST /register",
async handler(ctx) {
const job = await this.broker.wf.run("users.signupWorkflow", ctx.params, {
jobId: ctx.requestID // optional
/* other options */
});
// Here the workflow is running, the res is a state object
return {
// With the jobId, you can call the `checkSignupState` REST action
// to get the state of the execution on the frontend.
jobId: job.id
};
// or wait for the execution and return the result
// return await job.promise();
}
},
// REST API for verification URL in the sent e-mail
verify: {
rest: "POST /verify/:token",
async handler(ctx) {
// Check the validity
const user = ctx.call("users.find", { verificationToken: ctx.params.token });
if (user) {
// Trigger the signal for the Workflow job to continue the execution
this.broker.wf.triggerSignal("email.verification", user.id);
}
}
},
// Check the signup process state. You can call it from the frontend
// and show the current state for the customer. The `jobId` sent back
// in the "register" REST call.
checkRegisterState: {
rest: "GET /registerState/:jobId",
async handler(ctx) {
const res = await ctx.wf.getState({ jobId: ctx.params.jobId });
if (res.state == "DONE") {
return { state: res.state, user: res.result };
} else {
return { state: res.state, startedAt: res.startedAt };
}
}
}
}
});
The middleware uses an event log for replay after failures. This means that non-deterministic results inside the workflow handlers need to be stored in the event log. To make it easier for developers, the middleware wraps the ctx.call
and other Moleculer functions automatically and stores the execution result in the event log. It means, if you call a Moleculer action in your workflow and after the calling, the worker crashed, the workflows middleware restart the job and skip the already executed action calls. For example a new entity creator action won't be called multiple times in case of failure.
The following functions are protected with journaling:
ctx.call
- Call a Moleculer actionctx.mcall
- Call multiple Moleculer actionsctx.broadcast
- Broadcast a messagectx.emit
- Emit an eventIf you using other non-deterministic functions (e.g. UUID generation, or random numbers), you need to store the result in the event log manually. You can use the ctx.wf.task(name, fn)
function to execute a task and store the result in the event log. The name
parameter is used for logging purposes. The function can be asynchronous.
const rnd = await ctx.wf.task("Generate a random number", () => Math.random());
const uuid = await ctx.wf.task("Generate a UUID", () => uuid.v4());
Signals are a way to communicate with a workflow job. You can send signals to a workflow job to trigger specific actions or to notify the job about certain events. Signals are useful for scenarios where you need to wait for an external event or user input before continuing the workflow execution. For example, in the user sign-up workflow, you can send a signal to notify the workflow job when the user has verified their email address. The workflow job will then continue its execution based on the signal received.
The signal is not related to workflows, so you can use the same signal in multiple workflows. The signal is identified by a unique key, which can be any string or number. The signal can be sent to a specific workflow job by using the job ID as the key.
You can define a timeout for the signal waiting. If the timeout is reached, the workflow job will throw a WorkflowSignalTimeoutError
error. You can catch this error and handle it accordingly.
try {
// Waiting for verification (max 1h)
await ctx.wf.waitForSignal("email.verification", user.id, {
timeout: "1 hour"
});
await ctx.wf.setState("VERIFIED");
} catch (err) {
if (err.name == "WorkflowSignalTimeoutError") {
// Registration not verified in 1 hour, remove the user
await ctx.call("user.remove", { id: user.id });
return null;
}
// Other error should be thrown further
throw err;
}
The signal timeout is journaled, so if the workflow job is restarted, the waiting is continued, not started from the beginning.
You can trigger a signal by calling the broker.wf.triggerSignal(signal, key, payload)
method. The signal
parameter is the name of the signal, the key
parameter is the unique identifier for the signal (e.g., user ID), and the payload
parameter is the data you want to send with the signal. The signal triggering uses Redis Pub/Sub, so you can trigger a signal from any node.
await broker.wf.triggerSignal("email.verification", user.id);
You can also remove a signal by calling the broker.wf.removeSignal(signal, key)
method. The signal
parameter is the name of the signal, and the key
parameter is the unique identifier for the signal (e.g., user ID).
await broker.wf.removeSignal("email.verification", user.id);
If a workflow job fails (or executor node is crashed), the job will be retried based on the retry policy defined in the workflow options. The retry policy can be configured to use a fixed or exponential backoff strategy. You can also define the maximum number of retries and the delay between retries.
If the job failed by an unhandled error, the workflow checks the
retryable
property of the error (MoleculerRetryableError
,WorkflowRetryableError
). If it'strue
, the job will be retried. If it'sfalse
, the job will be marked as failed and removed from the queue without retry.
In case of retry, the workflow job will be restarted from the beginning, skipping the already executed actions. The workflow job will be retried until the maximum number of retries is reached or the job is marked as completed.
Name | Type | Description |
---|---|---|
adapter | string | BaseAdapter | RedisAdapterOptions | Adapter instance, name, or options for workflow storage. Default: "Redis" |
schemaProperty | string | Service schema property name for workflows. Default: "workflows" |
workflowHandlerTrigger | string | Name of the method to trigger workflow handler. Default: emitLocalWorkflowHandler |
jobEventType | string | How job events are emitted (e.g., broadcast, emit`). |
signalExpiration | string | Signal expiration time. Default: 1h |
maintenanceTime | number | Maintenance process time (sec). Default: 10 |
lockExpiration | number | Job lock expiration time (sec). Default: 30 |
jobIdCollision | string | Job ID collision policy. Available values: reject , skip , rerun , Default: reject |
tracing | boolean | Enable tracing feature for workflow jobs. Default: false |
Name | Type | Description |
---|---|---|
redis | RedisOptions | { url: string } | { cluster: { nodes: string[]; clusterOptions?: any } } | Redis connection options, URL, or cluster configuration. Default: (required) |
prefix | string | Prefix for Redis keys. Default: "wf" |
serializer | string | Serializer to use for job data. Default: JSON |
drainDelay | number | Blocking delay time (sec). Default: 5 |
Name | Type | Description |
---|---|---|
name | String | Name of workflow. Default: service name + workflow name |
fullName | String | Full name of workflow. If you don't want to prepend the service name for the workflow name. |
params | object | Job parameter validation schema. |
removeOnCompleted | boolean | Remove the job when it's completed. Default: false |
removeOnFailed | boolean | Remove the job when it's failed. Default: false |
concurrency | number | Number of concurrent running jobs. Default: 1 |
retention | string or number | Retention time of job history. Default: null |
retryPolicy | object | Retry policy. |
retryPolicy.retries | number | Number of retries. Default: 0 |
retryPolicy.delay | number | Delay between retries (ms). Default: 100 |
retryPolicy.maxDelay | number | Maximum delay between retries (ms). Default: 1000 |
retryPolicy.factor | number | Exponential backoff factor. Default: 1 (fixed) |
maxStalledCount | number | Number of maximum put back the stalled job. 0 or null value disables it. Default: null |
tracing | boolean | Enable tracing feature for workflow jobs. Default: false |
async broker.wf.run(workflowName, payload, options?)
workflowName
(string
): Full workflow name (e.g., service.workflowName
).payload
(object
): Payload for the workflow.options
(object
, optional): Additional job options (e.g., jobId
)..id
, and .promise()
for result.const job = await broker.wf.run("users.signupWorkflow", { email: "a@b.com", name: "Alice" });
const result = await job.promise();
async broker.wf.getState(workflowName, jobId)
workflowName
(string
): Full workflow name (e.g., service.workflowName
).jobId
(string
): The job ID to query.const state = await broker.wf.getState("users.signupWorkflow", "abc123");
async broker.wf.get(workflowName, jobId)
workflowName
(string
): Full workflow name (e.g., service.workflowName
).jobId
(string
): The job ID to query.const job = await broker.wf.get("users.signupWorkflow", "abc123");
async broker.wf.getEvents(workflowName, jobId)
workflowName
(string
): Full workflow name (e.g., service.workflowName
).jobId
(string
): The job ID to query.const getEvents = await broker.wf.getEvents("users.signupWorkflow", "abc123");
async broker.wf.triggerSignal(signal, key, payload?)
signal
(string
): Signal name.key
(string|number
): Signal key (e.g., user ID).payload
(any
, optional): Data to send with the signal.void
await broker.wf.triggerSignal("email.verification", user.id);
async broker.wf.removeSignal(signal, key?)
signal
(string
): Signal name.key
(string|number
): Signal key (e.g., user ID).void
await broker.wf.removeSignal("email.verification", user.id);
async broker.wf.listCompletedJobs(workflowName)
workflowName
(string
): Full workflow name (e.g., service.workflowName
).const jobIds = await broker.wf.listCompletedJobs("users.signupWorkflow");
async broker.wf.listFailedJobs(workflowName)
workflowName
(string
): Full workflow name (e.g., service.workflowName
).const jobIds = await broker.wf.listFailedJobs("users.signupWorkflow");
async broker.wf.listDelayedJobs(workflowName)
workflowName
(string
): Full workflow name (e.g., service.workflowName
).const jobIds = await broker.wf.listDelayedJobs("users.signupWorkflow");
async broker.wf.listActiveJobs(workflowName)
workflowName
(string
): Full workflow name (e.g., service.workflowName
).{ "jobId": "" }
const jobIds = await broker.wf.listActiveJobs("users.signupWorkflow");
async broker.wf.listWaitingJobs(workflowName)
workflowName
(string
): Full workflow name (e.g., service.workflowName
).{ "jobId": "" }
const jobIds = await broker.wf.listWaitingJobs("users.signupWorkflow");
async broker.wf.cleanUp(workflowName)
workflowName
(string
): Full workflow name (e.g., service.workflowName
).jobId
(string
): The job ID to retry.void
const job = await broker.wf.cleanUp("users.signupWorkflow");
async broker.wf.remove(workflowName, jobId)
workflowName
(string
): Full workflow name (e.g., service.workflowName
).jobId
(string
): The job ID to remove.void
await broker.wf.remove("users.signupWorkflow", "abc123");
async broker.wf.getAdapter()
Adapter
await (await broker.wf.getAdapter()).getJob("users.signupWorkflow", "abc123");
These methods & properties is available only in
ctx
instance of workflow handler.
ctx.wf.setState(state)
state
(any
): The new state name.void
await ctx.wf.setState("VERIFIED");
await ctx.wf.setState(90);
await ctx.wf.setState({ progress: 50, status: "Waiting for verification..."});
ctx.wf.waitForSignal(signal, key, options?)
signal
(string
): Signal name to wait for.key
(string|number
): Signal key (e.g., user ID).options
(object
, optional): Options like timeout
(e.g., "1 hour"
).await ctx.wf.waitForSignal("email.verification", user.id, { timeout: "1 hour" });
ctx.wf.task(name, fn)
name
(string
): Name of the task.fn
(Function
): The function to execute. It can be asynchronous.void
const users = await ctx.wf.task("Fetch a URL", () => (await fetch("http://example.org/users.json")).json());
ctx.wf.sleep(time)
time
(number
|string
): Use number for millisecond value, or string for human-readable format.void
await ctx.wf.sleep(500); // Wait for 500ms
await ctx.wf.sleep("5m"); // Wait for 5 minutes
ctx.wf.name
string
ctx.wf.jobId
string
ctx.wf.retries
number
ctx.wf.retryAttempts
0
, it means, it's the first attempt. If greater than zero, it's a retried job execution.number
ctx.wf.timeout
number
The project is available under the MIT license.
Copyright (c) 2025 MoleculerJS
FAQs
Advanced Workflow feature for Moleculer microservices framework
The npm package @moleculer/workflows receives a total of 56 weekly downloads. As such, @moleculer/workflows popularity was classified as not popular.
We found that @moleculer/workflows demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 3 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Socket uncovers malicious Rust crates impersonating fast_log to steal Solana and Ethereum wallet keys from source code.
Research
A malicious package uses a QR code as steganography in an innovative technique.
Research
/Security News
Socket identified 80 fake candidates targeting engineering roles, including suspected North Korean operators, exposing the new reality of hiring as a security function.