
Security News
The Hidden Blast Radius of the Axios Compromise
The Axios compromise shows how time-dependent dependency resolution makes exposure harder to detect and contain.
FluentWorkflow.Core
Advanced tools
A message driven distributed asynchronous workflow framework. 消息驱动的分布式异步工作流程处理框架。
A message driven distributed asynchronous workflow framework. 消息驱动的分布式异步工作流程处理框架。
基于消息驱动的分布式异步工作流程处理框架,使用 SourceGenerator 简化开发中的重复工作。
在典型的消息驱动处理流程中,阶段的开始消息与结束消息、各个消息的触发都需要手动定义,这些多数属于重复工作,FluentWorkflow是为了减少这些重复劳动而诞生的
2.0 完全不兼容之前的代码
消息、处理器、相关命名空间 等,都需要按新的命名规则调整FluentWorkflow.RabbitMQ 启用 RabbitMQ.Client 的 7.* 版本支持,6.* 版本支持使用包 FluentWorkflow.RabbitMQ.Legacypartial/继承);Diagnostic支持;net8.0+;尽可能的全链路更新,避免导致的未知问题;WorkflowContext 核心为 字符串字典 其属性在赋值时进行序列化存放,对象后续的修改不会反应到上下文中;2.0 已调整相关逻辑,引用类型对象的修改将会正常反馈到后续流程中Workflow 中重写各个阶段的触发事件方法时,方法内不能往外抛出异常,会导致该阶段消息重新进入队列,再次执行;FluentWorkflow.RabbitMQ 依赖 交换机 和 队列 进行消息分发,当存在多套环境需要隔离时,确保 交换机 和 队列 都不相同,否则将会出现消息重复消费;FluentWorkflow.RabbitMQ 在 绑定信息(交换机、队列)变更时不能完全自动调整,需要人工修正,如手动移除队列错误的交换机绑定和RoutingKey绑定,否则将会出现消息重复消费;消息队列中间件异常的情况下可能会出现流程中断、重复消费等情况;FluentWorkflow.Core 包<ItemGroup>
<PackageReference Include="FluentWorkflow.Core" Version="2.1.1" />
</ItemGroup>
工作流程声明public partial class SampleWorkflowDeclaration : IWorkflowDeclaration
{
}
partial;IWorkflowDeclaration;public partial class SampleWorkflowDeclaration : IWorkflowDeclaration
{
internal override void DeclareContext(IWorkflowContextDeclarator declarator)
{
declarator.Property<int>("Count");
}
internal override void DeclareWorkflow(IWorkflowDeclarator declarator)
{
declarator.Name("Sample") //声明工作流程名称
.Begin() //开始定义流程
.Then("SampleStage1") //声明阶段 SampleStage1
.Then("SampleStage2") //声明阶段 SampleStage2
.Then("SampleStage3") //声明阶段 SampleStage3
.Completion(); //完成声明
}
}
到此一个 工作流程声明 就定义完成了,该工作流程名为Sample,包含三个阶段 SampleStage1 -> SampleStage2 -> SampleStage3,上下文中包含一个名为Count的int类型属性
工作流程声明在DeclareWorkflow方法中使用参数declarator定义,必须链式调用:
Name("{WorkflowName}")定义名称Begin()开始定义流程Then("{StageName}")声明每个阶段,声明顺序即为阶段顺序,阶段名称必须满足C#标识符命名规则和约定Completion()结束定义工作流程上下文声明在DeclareContext方法中使用参数declarator定义,必须链式调用:
Property<T>("{PropertyName}", "{Comment}")定义上下文的属性及其类型与备注partial 类进行属性添加相同,此逻辑更方便分发工作流程 的工作代码GenerateWorkflowCodesAttribute 声明要生成的工作流程[assembly: GenerateWorkflowCodes<SampleWorkflowDeclaration>]
工作流程声明 的相同命名空间下声明 {WorkflowName}Workflow 的 partial 类[assembly: GenerateWorkflowCodes<SampleWorkflowDeclaration>]
namespace SampleWorkflowNamespace; //应当与目标声明相同,即与 SampleWorkflowDeclaration 的命名空间相同
public partial class SampleWorkflow
{
public SampleWorkflow(SampleWorkflowContext context, IServiceProvider serviceProvider) : base(context, serviceProvider)
{
}
}
SampleWorkflowContext (模板:{WorkflowName}Context)Stage{StageName}Message)Stage{StageName}CompletedMessage)StageSampleStage1HandlerBase、StageSampleStage2HandlerBase、StageSampleStage3HandlerBase (模板:Stage{StageName}HandlerBase)SampleWorkflowDeclaration 的命名空间及其子命名空间下阶段处理器基类,并实现各个阶段处理逻辑// SampleStage2 与 SampleStage3 同理
public class StageSampleStage1Handler : StageSampleStage1HandlerBase
{
public StageSampleStage1Handler(IServiceProvider serviceProvider) : base(serviceProvider)
{
}
protected override Task ProcessAsync(ProcessContext processContext, StageSampleStage1Message stageMessage, CancellationToken cancellationToken)
{
//TODO 阶段业务逻辑
return Task.CompletedTask;
}
}
控制服务services.AddFluentWorkflow()
.AddSampleWorkflow(configuration => //添加工作流程
{
configuration.AddScheduler() //添加工作流程调度器
.AddResultObserver(); //添加结果观察器
})
.UseInMemoryWorkflowMessageDispatcher(); //配置使用的消息分发器,这里使用基于内存的分发器来示范
阶段处理服务services.AddFluentWorkflow()
.AddSampleWorkflow(configuration => //添加工作流程
{
configuration.StageSampleStage1Handler<StageSampleStage1Handler>(); //添加对应阶段的处理器, SampleStage2 与 SampleStage3 同理
})
.<>()
.UseInMemoryWorkflowMessageDispatcher(); //配置使用的消息分发器,这里使用基于内存的分发器来示范
FluentWorkflow正常工作的必要条件:
服务使用同一套消息分发器;WorkflowScheduler;StageHandler,各个阶段的阶段处理器有且仅有一个(单个服务,可多实例);子工作流程时必须配置子工作流程结果观察器 - ResultObserver;子工作流程时,必须使用支持等待多个子工作流程的 IWorkflowAwaitProcessor; (默认实现了基于redis的多流程等待处理器,配置时使用UseRedisWorkflowAwaitProcessor方法以启用)//从DI容器中获取工作流程构建器
var workflowBuilder = ServiceProvider.GetRequiredService<IWorkflowBuilder<SampleWorkflow>>();
//创建工作流程上下文
var context = new SampleWorkflowContext();
//构建工作流程
var workflow = workflowBuilder.Build(context);
//启动工作流程,框架会自动触发各个阶段处理器后完成
await workflow.StartAsync(default);
WorkflowScheduler的服务,但需要接入消息分发器并在配置时使用 Add****Workflow() 添加对应的工作流程构造器;partial的,可以声明partial类进行拓展,不可使用partial类拓展的功能基本上都可以继承后重写,在配置服务时替换默认实现即可;Workflow 类会添加生命周期各个阶段的触发事件方法,可以继承后重写其逻辑以在各个阶段执行相关的逻辑(注意每次触发可能不在同一个服务实例中。重写后应当捕获并处理所有异常,不要抛出);WorkflowContext 核心为字符串/对象混合字典,对其修改理论上只对后续可见并在整个执行周期可用,可以将执行参数、结果、中间值等存放其中;IWorkflowMessageDispatcher控制(默认提供了基于CAP、Abp以及基础的FluentWorkflow.RabbitMQ可选);StageHandler 出现异常则认为工作流程失败,不会将异常抛给上层 IWorkflowMessageDispatcher(消息分发的重试不会触发),可以重写 StageHandler 的 OnException 方法来将异常向上抛出;修改/删除了既有的阶段定义,会导致还在处理过程中工作流程无法正常运行(但添加不会影响);部分功能为源码接入的方式,默认不生成,在项目中指定需要的功能后自动生成
<PropertyGroup>
<FluentWorkflowGeneratorAdditional>AbpFoundation,CAPFoundation,AbpMessageDispatcher,CAPMessageDispatcher,RedisAwaitProcessor</FluentWorkflowGeneratorAdditional>
</PropertyGroup>
| 名称 | 功能 |
|---|---|
| AbpFoundation | Abp的基础功能支持 |
| CAPFoundation | CAP的基础功能支持 |
| AbpMessageDispatcher | Abp的消息分发器 |
| CAPMessageDispatcher | CAP的消息分发器 |
| RedisAwaitProcessor | 基于StackExchange.Redis的子流程等待处理器 |
FluentWorkflow.GenericExtension.{工作流程命名空间} 下,如配置拓展方法等;FluentWorkflow.RabbitMQFluentWorkflow.RabbitMQ 包<ItemGroup>
<PackageReference Include="FluentWorkflow.RabbitMQ" Version="2.1.1" />
</ItemGroup>
services.AddFluentWorkflow()
.UseRabbitMQMessageDispatcher(options =>
{
//配置RabbitMQ
});
配置单个消息的消费速率,其它消息不受限
services.Configure<RabbitMQOptions>(options =>
{
//配置阶段Stage1的消息 - StageSampleStage1Message 的消费速率,即当前服务实例同时只会有一个阶段Stage1在处理
options.MessageGroup("Group1", builder =>
{
builder.Add<StageSampleStage1Message>()
.WithQosChannelInitialization(1);
});
});
RabbitMQ消息的消费ack超时时间默认为30分钟,进行长时间处理时可能会出现意外情况,可参照 acknowledgement-timeout 进行调整
x-consumer-timeout 为 1 小时(如果RabbitMQ版本支持的话);RabbitMQOptions.QueueArgumentsSetup 对队列的 x-consumer-timeout 参数进行调整;默认情况下,当抛出的异常继承接口 IBusyConsumer 时,流程不会立即失败,而是将消息重新返回消息队列
在阶段处理器中实现子工作流程等待逻辑
internal class StageSampleStage1Handler : StageSampleStage1HandlerBase
{
public StageSampleStage1Handler(IServiceProvider serviceProvider) : base(serviceProvider)
{
}
protected async override Task ProcessAsync(ProcessContext processContext, StageSampleStage1Message stageMessage, CancellationToken cancellationToken)
{
//构建子工作流程
var workflowBuilder = ServiceProvider.GetRequiredService<IWorkflowBuilder<SampleWorkflow>>();
var workflow = workflowBuilder.Build(new SampleWorkflowContext());
//Other workflow setting
//将未启动的子工作流程传递给当前阶段处理上下文,并命名为 - 'taskName'
processContext.AwaitChildWorkflow("taskName", workflow);
// Other logic
//当前阶段将等待子工作流程处理完成后再完成
}
protected override async Task OnAwaitFinishedAsync(SampleWorkflowContext context, IReadOnlyDictionary<string, IWorkflowContext?> childWorkflowContexts, CancellationToken cancellationToken)
{
//从等待的子工作流程上下文字典中取出 - 'taskName'
var workflowContext = (SampleWorkflowContext)childWorkflowContexts["taskName"];
//处理子工作流程结果,如将 workflowContext 内的结果赋值给 context,以便在当前工作流程的后续阶段中使用等
await base.OnAwaitFinishedAsync(context, childWorkflowContexts, cancellationToken);
//当前阶段将完成
}
}
Diagnostic支持services.AddFluentWorkFlow().EnableDiagnostic();
IWorkflowDebugRunner进行调试运行services.AddFluentWorkflow().AddDebugRunner();
IWorkflowDebugRunner进行消息的执行var transmissionModelRawData = """
//MessageJson
"""u8.ToArray();
var debugRunner = ServiceProvider.GetRequiredService<IWorkflowDebugRunner>();
debugRunner.RunAsync(transmissionModelRawData);
在 WorkFlow 的 On{StageName}Async 和 On{StageName}CompletedAsync 中不执行参数委托 fireMessage,则后续流程不再执行
在 WorkFlow 的 On{StageName}Async 和 On{StageName}CompletedAsync 中不执行参数委托 fireMessage,中止流程,在此基础上调用 SerializeContext 方法将上下文序列化后存放
// 存放 contextData 以用于流程恢复
var contextData = SerializeContext(message.Context);
调用具体 WorkFlow 的静态方法 ResumeAsync 使用挂起的流程数据进行恢复执行
// contextData 为序列化的上下文数据
await XXXXWorkflow.ResumeAsync(contextData, serviceProvider, cancellationToken)
恢复流程将会再次调用序列化上下文时的方法,需要注意,小心再次被挂起
更多信息参见源码内的测试代码
FAQs
A message driven distributed asynchronous workflow framework. 消息驱动的分布式异步工作流程处理框架。
We found that fluentworkflow.core demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
The Axios compromise shows how time-dependent dependency resolution makes exposure harder to detect and contain.

Research
A supply chain attack on Axios introduced a malicious dependency, plain-crypto-js@4.2.1, published minutes earlier and absent from the project’s GitHub releases.

Research
Malicious versions of the Telnyx Python SDK on PyPI delivered credential-stealing malware via a multi-stage supply chain attack.