本课概览Microsoft Agent Framework (MAF) 提供了一套强大的 Workflow工作流 框架用于编排和协调多个智能体Agent或处理组件的执行流程。本课将以通俗易懂的方式帮助你理解 MAF Workflow 的核心概念概念 说明 类比Workflow 工作流定义 ?? 流程图Executor 执行器/处理节点 ?? 工人Edge 连接边 ?? 传送带SuperStep 超步/批量处理 ?? 处理周期WorkflowContext 工作流上下文 ?? 工作台WorkflowEvent 工作流事件 ?? 通知Run 运行实例 ?? 一次执行Checkpoint 检查点 ?? 存档点让我们逐一深入了解这些概念?? 为什么需要 Workflow在构建 AI 智能体应用时我们经常需要多步骤处理用户请求需要经过多个处理环节多智能体协作不同的 Agent 各司其职协同完成任务条件分支根据处理结果决定下一步走向并行处理某些任务可以同时进行以提高效率状态管理需要在处理过程中保存和恢复状态传统方式的问题// ? 硬编码的流程难以维护和扩展var result1 await agent1.ProcessAsync(input);if (result1.Success){var result2 await agent2.ProcessAsync(result1.Data);// ... 越来越复杂}使用 Workflow 的优势// ? 声明式定义清晰可维护var workflow new WorkflowBuilder(startExecutor).AddEdge(executor1, executor2).AddEdge(executor2, executor3, condition: x x.Success).Build();一、核心概念Executor执行器?? 什么是 ExecutorExecutor执行器 是 Workflow 中的最小工作单元类似于类比 说明????? 工厂里的工人 每个工人负责一道工序?? 乐高积木块 每个积木有特定功能组合成整体?? 电路中的元件 接收输入信号输出处理结果?? Executor 的核心特征唯一标识Id每个 Executor 有一个唯一的 ID用于在 Workflow 中引用消息处理接收特定类型的输入消息处理后产生输出消息路由配置通过 ConfigureRoutes 方法定义能处理哪些类型的消息状态感知可以通过 IWorkflowContext 访问和修改工作流状态??? Executor 的类型层次MAF 提供了多种 Executor 类型满足不同场景需求类型 用途 示例场景Executor 处理消息无返回值 日志记录、通知发送Executor 处理消息有返回值 数据转换、AI 调用FunctionExecutor 用委托函数快速创建 简单处理逻辑StatefulExecutor 需要维护状态的执行器 会话管理、计数器?? Executor 源码解析从源码中可以看到 Executor 的核心设计// 来自 Executor.cspublic abstract class Executor : IIdentified{// 唯一标识符public string Id { get; }// 配置消息路由子类必须实现protected abstract RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder);// 执行消息处理public async ValueTask ExecuteAsync(object message,TypeId messageType,IWorkflowContext context,CancellationToken cancellationToken default){// 记录调用事件await context.AddEventAsync(new ExecutorInvokedEvent(this.Id, message));// 路由消息到正确的处理器CallResult? result await this.Router.RouteMessageAsync(message, context, ...);// 记录完成或失败事件ExecutorEvent executionResult result?.IsSuccess is not false? new ExecutorCompletedEvent(this.Id, result?.Result): new ExecutorFailedEvent(this.Id, result.Exception);await context.AddEventAsync(executionResult);return result.Result;}}关键点? 每个 Executor 有唯一 ID? 通过 ConfigureRoutes 声明能处理的消息类型? 执行过程会产生事件ExecutorInvokedEvent、ExecutorCompletedEvent 等二、核心概念Edge边?? 什么是 EdgeEdge边 是连接两个 Executor 的消息通道类似于类比 说明?? 工厂传送带 把上一道工序的产品传送到下一道工序?? 水管 把水从一个容器引导到另一个容器?? 邮路 把信件从发件人送到收件人?? Edge 的三种类型MAF 支持三种类型的 Edge适用于不同的流程模式类型 说明 使用场景Direct直连 一对一连接 顺序处理流程FanOut扇出 一对多连接 并行分发任务FanIn扇入 多对一连接 汇聚多个结果?? Edge 源码解析从源码可以看到 Edge 的核心结构// 来自 Edge.cspublic enum EdgeKind{Direct, // 直连一对一FanOut, // 扇出一对多FanIn // 扇入多对一}public sealed class Edge{public EdgeKind Kind { get; init; } // 边的类型public EdgeData Data { get; init; } // 边的具体数据}// 来自 DirectEdgeData.cs - 直连边的数据public sealed class DirectEdgeData : EdgeData{public string SourceId { get; } // 源 Executor IDpublic string SinkId { get; } // 目标 Executor IDpublic Func? Condition; // 可选的条件判断}Direct Edge 示意图关键点? Edge 定义了消息从哪里来、到哪里去? Direct Edge 支持条件路由只有满足条件的消息才传递? FanOut Edge 可以实现广播或分区逻辑三、核心概念Workflow工作流?? 什么是 WorkflowWorkflow工作流 是将多个 Executor 通过 Edge 连接起来的完整流程定义类似于类比 说明?? 流程图 定义了从开始到结束的完整流程?? 生产线 多个工位通过传送带连接成完整生产线?? 乐谱 规定了演奏的顺序和节奏?? Workflow 的核心属性从源码中可以看到 Workflow 的核心结构// 来自 Workflow.cspublic class Workflow{// 起始 Executor 的 IDpublic string StartExecutorId { get; }// 工作流名称可选public string? Name { get; internal init; }// 工作流描述可选public string? Description { get; internal init; }// Executor 绑定字典internal Dictionary ExecutorBindings { get; init; }// 边的集合按源节点分组internal Dictionary Edges { get; init; }// 输出 Executor 集合internal HashSet OutputExecutors { get; init; }}?? 使用 WorkflowBuilder 构建工作流MAF 采用 建造者模式Builder Pattern 来构建 Workflow这使得工作流的定义更加直观// 创建工作流示例var workflow new WorkflowBuilder(startExecutor) // 指定起点.WithName(订单处理工作流) // 设置名称.WithDescription(处理电商订单的完整流程) // 设置描述.AddEdge(receiveOrder, validateOrder) // 添加边.AddEdge(validateOrder, processPayment,condition: x x.IsValid) // 条件边.AddEdge(processPayment, sendNotification).WithOutputFrom(sendNotification) // 指定输出节点.Build(); // 构建工作流关键方法方法 说明AddEdge(source, sink) 添加直连边AddEdge(..., condition) 添加条件边AddFanOut(source, sinks) 添加扇出边AddFanIn(sources, sink) 添加扇入边WithOutputFrom(executor) 指定输出节点BindExecutor(executor) 绑定占位符执行器Build() 构建最终的 Workflow四、核心概念SuperStep超步?? 什么是 SuperStepSuperStep超步 是 Workflow 执行的基本处理周期。可以类比为类比 说明?? 游戏中的回合 每个回合内所有玩家同时行动? 工厂的班次 每个班次内完成一批任务?? 海浪的一波 一波消息被处理然后产生下一波?? SuperStep 的执行流程每个 SuperStep 内部执行的步骤关键事件SuperStepStartedEvent超步开始SuperStepCompletedEvent超步完成// SuperStep 事件定义public class SuperStepEvent(int stepNumber, object? data null) : WorkflowEvent(data){// 超步的序号从 0 开始public int StepNumber stepNumber;}五、核心概念WorkflowContext工作流上下文?? 什么是 WorkflowContextWorkflowContext工作流上下文 是 Executor 执行时的运行环境类似于类比 说明??? 工人的工作台 提供工具、材料和通信渠道?? 通信枢纽 允许各个工位之间传递信息?? 共享内存 存储和读取状态数据?? IWorkflowContext 核心接口// 来自 IWorkflowContext.cspublic interface IWorkflowContext{// 添加工作流事件在当前 SuperStep 结束时触发ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken default);// 发送消息给下游 Executor在下一个 SuperStep 处理ValueTask SendMessageAsync(object message, string? targetId, CancellationToken cancellationToken default);// 输出工作流结果ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken default);// 请求在当前 SuperStep 结束时停止工作流ValueTask RequestHaltAsync();// 读取状态ValueTask ReadStateAsync(string key, string? scopeName null, CancellationToken cancellationToken default);// 读取或初始化状态ValueTask ReadOrInitStateAsync(string key, Func initialStateFactory, string? scopeName null, CancellationToken cancellationToken default);// 更新状态排队更新在 SuperStep 结束时应用ValueTask QueueStateUpdateAsync(string key, T value, string? scopeName null, CancellationToken cancellationToken default);}关键点? 消息传递通过 SendMessageAsync 在 Executor 之间传递消息? 状态管理支持读取、初始化和更新状态? 事件通知通过 AddEventAsync 发出事件? 流程控制通过 RequestHaltAsync 停止工作流六、核心概念WorkflowEvent工作流事件?? 什么是 WorkflowEventWorkflowEvent工作流事件 是工作流执行过程中产生的通知消息类似于类比 说明?? 广播通知 向所有人广播系统状态变化?? 日志记录 记录系统执行过程中的关键节点?? 事件订阅 允许外部监听并响应特定事件?? 事件分类事件层级 事件类型 说明工作流级别 WorkflowStartedEvent 工作流开始执行WorkflowOutputEvent 工作流产生输出WorkflowErrorEvent 工作流发生错误WorkflowWarningEvent 工作流产生警告超步级别 SuperStepStartedEvent 超步开始SuperStepCompletedEvent 超步完成执行器级别 ExecutorInvokedEvent Executor 被调用ExecutorCompletedEvent Executor 完成处理ExecutorFailedEvent Executor 处理失败七、核心概念Run运行实例?? 什么是 RunRun运行实例 是 Workflow 的一次具体执行类似于类比 说明?? 电影的一场放映 同一部电影可以放映多场?? 生产线的一个批次 同一条生产线可以生产多个批次?? 游戏的一局 同一个游戏可以玩多局?? Run 的核心特性// 来自 Run.cspublic sealed class Run : IAsyncDisposable{// 运行实例的唯一标识符public string RunId this._runHandle.RunId;// 获取当前执行状态public ValueTask GetStatusAsync(CancellationToken cancellationToken default);// 获取所有产生的事件public IEnumerable OutgoingEvents this._eventSink;// 获取自上次访问后的新事件public IEnumerable NewEvents { get; }// 恢复执行带外部响应public async ValueTask ResumeAsync(IEnumerable responses, CancellationToken cancellationToken default);}?? RunStatus运行状态public enum RunStatus{NotStarted, // 尚未开始Idle, // 空闲已暂停无待处理请求PendingRequests, // 等待外部响应Ended, // 已结束Running // 正在运行}八、核心概念Checkpoint检查点?? 什么是 CheckpointCheckpoint检查点 是工作流在某个时刻的完整状态快照类似于类比 说明?? 游戏存档 保存游戏进度随时可以读档继续?? 照片 记录某一时刻的完整状态?? 书签 标记阅读进度下次从这里继续?? Checkpoint 的核心信息// 来自 CheckpointInfo.cspublic sealed class CheckpointInfo{// 运行实例的唯一标识符public string RunId { get; }// 检查点的唯一标识符public string CheckpointId { get; }}// 检查点的完整数据来自 Checkpoint.csinternal sealed class Checkpoint{public int StepNumber { get; } // 超步编号public WorkflowInfo Workflow { get; } // 工作流信息public RunnerStateData RunnerData { get; } // 运行器状态public Dictionary StateData { get; } // 状态数据public Dictionary EdgeStateData { get; } // 边状态数据public CheckpointInfo? Parent { get; } // 父检查点}?? Checkpoint 的使用场景场景 说明故障恢复 系统崩溃后从最近的检查点恢复长时间运行 分段执行每段结束保存进度人机交互 等待用户输入时保存状态调试回放 从任意检查点重新执行版本分支 从同一个检查点创建多个分支执行九、核心概念关系图?? 概念之间的关系让我们把所有核心概念联系起来看看它们是如何协作的?? 生命周期视角从工作流的完整生命周期来看?? 消息流视角从消息在工作流中的流动来看关键理解消息驱动Executor 之间通过消息传递数据异步批处理同一 SuperStep 内的 Executor 可以并行执行边控制流向Edge 决定消息从哪里到哪里状态隔离每个 SuperStep 结束时应用状态更新十、实际应用示例?? 场景电商订单处理工作流让我们通过一个实际场景来理解这些概念的应用?? 概念对应关系概念 在此场景中的体现Workflow 整个订单处理流程Executor 每个处理步骤接收、验证、支付等Edge 步骤之间的连接含条件判断SuperStep 每一轮处理如超步1处理接收超步2处理验证...WorkflowContext 提供订单状态读写、消息发送能力WorkflowEvent 每个步骤的开始/完成/失败事件Run 一个具体订单的处理过程Checkpoint 处理中途保存的状态如支付完成后保存?? 场景多智能体协作工作流在 AI Agent 场景中Workflow 可以用来编排多个 Agent 的协作Workflow 的优势优势 说明?? 灵活编排 可以轻松调整 Agent 之间的协作关系?? 状态管理 自动管理各 Agent 的状态和上下文?? 可中断/恢复 支持人机交互随时暂停和恢复?? 可观测性 通过事件追踪整个执行过程??? 容错能力 通过检查点支持故障恢复十一、概念总结?? 核心概念速查表概念 定义 关键类 核心职责Executor 执行器/处理节点 Executor, Executor, FunctionExecutor 处理消息产生输出Edge 连接边 Edge, EdgeData, DirectEdgeData 定义消息流向和条件Workflow 工作流定义 Workflow, WorkflowBuilder 组织 Executor 和 EdgeSuperStep 超步/批量处理周期 SuperStepEvent, SuperStepStartedEvent 批量处理消息WorkflowContext 工作流上下文 IWorkflowContext 提供运行时服务WorkflowEvent 工作流事件 WorkflowEvent, ExecutorEvent 通知执行状态Run 运行实例 Run, RunStatus 管理一次执行Checkpoint 检查点 CheckpointInfo, ICheckpointStore 保存和恢复状态脚谀抗钨