Spiga

Net+AI智能体8:Workflow概念篇

2025-11-22 18:21:43

一、核心概念

1. Executor(执行器)

Executor(执行器) 是 Workflow 中的最小工作单元,类似于:

类比 说明
工厂里的工人 每个工人负责一道工序
乐高积木块 每个积木有特定功能,组合成整体
电路中的元件 接收输入信号,输出处理结果
flowchart LR
    Input[输入消息] --> Executor[Executor\n执行器]
    Executor --> Output[输出消息]
    
    style Executor fill:#4CAF50,color:white
  • Executor 的核心特征

    • 唯一标识(Id):每个 Executor 有一个唯一的 ID,用于在 Workflow 中引用
    • 消息处理:接收特定类型的输入消息,处理后产生输出消息
    • 路由配置:通过 ConfigureRoutes 方法定义能处理哪些类型的消息
    • 状态感知:可以通过 IWorkflowContext 访问和修改工作流状态
  • Executor 的类型层次:MAF 提供了多种 Executor 类型,满足不同场景需求

classDiagram
    class Executor {
        +string Id
        +ExecuteAsync()
        #ConfigureRoutes()
    }
    
    class Executor~TInput~ {
        +HandleAsync(TInput)
    }
    
    class Executor~TInput,TOutput~ {
        +HandleAsync(TInput) TOutput
    }
    
    class FunctionExecutor~TInput~ {
        +委托函数处理
    }
    
    class FunctionExecutor~TInput,TOutput~ {
        +委托函数处理
    }
    
    class StatefulExecutor~TState~ {
        +TState State
        +ReadStateAsync()
        +QueueStateUpdateAsync()
    }
    
    Executor <|-- Executor~TInput~
    Executor <|-- Executor~TInput,TOutput~
    Executor~TInput~ <|-- FunctionExecutor~TInput~
    Executor~TInput,TOutput~ <|-- FunctionExecutor~TInput,TOutput~
    Executor <|-- StatefulExecutor~TState~
类型 用途 示例场景
Executor 处理消息,无返回值 日志记录、通知发送
Executor<TInput, TOutput> 处理消息,有返回值 数据转换、AI 调用
FunctionExecutor 用委托函数快速创建 简单处理逻辑
StatefulExecutor 需要维护状态的执行器 会话管理、计数器
  • Executor 源码解析,关键点:
    • 每个 Executor 有唯一 ID
    • 通过 ConfigureRoutes 声明能处理的消息类型
    • 执行过程会产生事件(ExecutorInvokedEvent、ExecutorCompletedEvent 等)
// 来自 Executor.cs
public abstract class Executor : IIdentified
{
    // 唯一标识符
    public string Id { get; }
    
    // 配置消息路由(子类必须实现)
    protected abstract RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder);
    
    // 执行消息处理
    public async ValueTask<object?> ExecuteAsync(
        object message, 
        TypeId messageType, 
        IWorkflowContext context, 
        CancellationToken cancellationToken = default)
    {
        // 记录调用事件
        await context.AddEventAsync(new ExecutorInvokedEvent(this.Id, message));
        
        // 路由消息到正确的处理器
        CallResult? result = await this.Router.RouteMessageAsync(message, context, ...);
        
        // 记录完成或失败事件
        ExecutorEvent executionResult = result?.IsSuccess is not false
            ? new ExecutorCompletedEvent(this.Id, result?.Result)
            : new ExecutorFailedEvent(this.Id, result.Exception);
            
        await context.AddEventAsync(executionResult);
        return result.Result;
    }
}

2.Edge(边)

Edge(边) 是连接两个 Executor 的消息通道,类似于:

类比 说明
工厂传送带 把上一道工序的产品传送到下一道工序
水管 把水从一个容器引导到另一个容器
邮路 把信件从发件人送到收件人
flowchart LR
    A[Executor A] -->|Edge 边| B[Executor B]
    
    subgraph "Edge 的作用"
        direction TB
        E1["1. 定义消息流向"]
        E2["2. 可设置条件"]
        E3["3. 支持扇入/扇出"]
    end
  • Edge 的三种类型
flowchart TB
    subgraph "Direct 直连边"
        A1[A] --> B1[B]
    end
    
    subgraph "FanOut 扇出边"
        A2[A] --> B2[B]
        A2 --> C2[C]
        A2 --> D2[D]
    end
    
    subgraph "FanIn 扇入边"
        B3[B] --> E3[E]
        C3[C] --> E3
        D3[D] --> E3
    end
类型 说明 使用场景
Direct(直连) 一对一连接 顺序处理流程
FanOut(扇出) 一对多连接 并行分发任务
FanIn(扇入) 多对一连接 汇聚多个结果
  • Edge 源码解析,关键点:
    • Edge 定义了消息从哪里来、到哪里去
    • Direct Edge 支持条件路由(只有满足条件的消息才传递)
    • FanOut Edge 可以实现广播或分区逻辑
// 来自 Edge.cs
public enum EdgeKind
{
    Direct,   // 直连:一对一
    FanOut,   // 扇出:一对多
    FanIn     // 扇入:多对一
}

public sealed class Edge
{
    public EdgeKind Kind { get; init; }    // 边的类型
    public EdgeData Data { get; init; }    // 边的具体数据
}

// 来自 DirectEdgeData.cs - 直连边的数据
public sealed class DirectEdgeData : EdgeData
{
    public string SourceId { get; }        // 源 Executor ID
    public string SinkId { get; }          // 目标 Executor ID
    public Func<object?, bool>? Condition; // 可选的条件判断
}

3. Workflow(工作流)

Workflow(工作流) 是将多个 Executor 通过 Edge 连接起来的完整流程定义,类似于:

类比 说明
流程图 定义了从开始到结束的完整流程
生产线 多个工位通过传送带连接成完整生产线
乐谱 规定了演奏的顺序和节奏
flowchart TB
    subgraph "Workflow 的组成"
        direction LR
        S[StartExecutor<br>起点] --> E1[Executor 1]
        E1 --> E2[Executor 2]
        E2 --> E3[Output<br>终点]
    end
    
    subgraph "Workflow 元数据"
        N["Name: 工作流名称"]
        D["Description: 工作流描述"]
        O["OutputExecutors: 输出节点集合"]
    end
  • Workflow 的核心属性
// 来自 Workflow.cs
public class Workflow
{
    // 起始 Executor 的 ID
    public string StartExecutorId { get; }
    
    // 工作流名称(可选)
    public string? Name { get; internal init; }
    
    // 工作流描述(可选)
    public string? Description { get; internal init; }
    
    // Executor 绑定字典
    internal Dictionary<string, ExecutorBinding> ExecutorBindings { get; init; }
    
    // 边的集合(按源节点分组)
    internal Dictionary<string, HashSet<Edge>> Edges { get; init; }
    
    // 输出 Executor 集合
    internal HashSet<string> OutputExecutors { get; init; }
}
  • 使用 WorkflowBuilder 构建工作流:MAF 采用建造者模式(Builder Pattern)来构建 Workflow,这使得工作流的定义更加直观
// 创建工作流示例
var workflow = new WorkflowBuilder(startExecutor)  // 指定起点
    .WithName("订单处理工作流")                      // 设置名称
    .WithDescription("处理电商订单的完整流程")       // 设置描述
    .AddEdge(receiveOrder, validateOrder)           // 添加边
    .AddEdge(validateOrder, processPayment, 
             condition: x => x.IsValid)             // 条件边
    .AddEdge(processPayment, sendNotification)
    .WithOutputFrom(sendNotification)               // 指定输出节点
    .Build();                                       // 构建工作流

4. SuperStep(超步)

SuperStep(超步) 是 Workflow 执行的基本处理周期。可以类比为:

类比 说明
游戏中的"回合" 每个回合内所有玩家同时行动
工厂的"班次" 每个班次内完成一批任务
海浪的"一波" 一波消息被处理,然后产生下一波
sequenceDiagram
    participant W as Workflow
    participant S1 as SuperStep 1
    participant S2 as SuperStep 2
    participant S3 as SuperStep 3
    
    W->>S1: 开始执行
    Note over S1: 处理初始消息
    S1->>S2: 传递输出消息
    Note over S2: 处理第二批消息
    S2->>S3: 传递输出消息
    Note over S3: 处理完成
    S3->>W: 返回结果
  • SuperStep 的执行流程,关键事件:
    • SuperStepStartedEvent:超步开始
    • SuperStepCompletedEvent:超步完成
flowchart TB
    subgraph "SuperStep N"
        A[1.开始] --> B[2.收集待处理消息]
        B --> C[3.执行所有 Executor]
        C --> D[4.收集输出消息]
        D --> E[5.应用状态更新]
        E --> F[6.触发事件]
        F --> G[7.完成]
    end
    
    G --> H{还有消息?}
    H -->|是| I[SuperStep N+1]
    H -->|否| J[工作流结束]
// SuperStep 事件定义
public class SuperStepEvent(int stepNumber, object? data = null) : WorkflowEvent(data)
{
    // 超步的序号(从 0 开始)
    public int StepNumber => stepNumber;
}

5. WorkflowContext(工作流上下文)

WorkflowContext(工作流上下文) 是 Executor 执行时的运行环境,类似于:

类比 说明
工人的工作台 提供工具、材料和通信渠道
通信枢纽 允许各个工位之间传递信息
共享内存 存储和读取状态数据
flowchart TB
    subgraph "IWorkflowContext 提供的能力"
        A[发送消息<br>SendMessageAsync]
        B[添加事件<br>AddEventAsync]
        C[输出结果<br>YieldOutputAsync]
        D[读取状态<br>ReadStateAsync]
        E[更新状态<br>QueueStateUpdateAsync]
        F[请求停止<br>RequestHaltAsync]
    end
  • IWorkflowContext 核心接口,关键点:
    • 消息传递:通过 SendMessageAsync 在 Executor 之间传递消息
    • 状态管理:支持读取、初始化和更新状态
    • 事件通知:通过 AddEventAsync 发出事件
    • 流程控制:通过 RequestHaltAsync 停止工作流
// 来自 IWorkflowContext.cs
public interface IWorkflowContext
{
    // 添加工作流事件(在当前 SuperStep 结束时触发)
    ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default);
    
    // 发送消息给下游 Executor(在下一个 SuperStep 处理)
    ValueTask SendMessageAsync(object message, string? targetId, CancellationToken cancellationToken = default);
    
    // 输出工作流结果
    ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken = default);
    
    // 请求在当前 SuperStep 结束时停止工作流
    ValueTask RequestHaltAsync();
    
    // 读取状态
    ValueTask<T?> ReadStateAsync<T>(string key, string? scopeName = null, CancellationToken cancellationToken = default);
    
    // 读取或初始化状态
    ValueTask<T> ReadOrInitStateAsync<T>(string key, Func<T> initialStateFactory, string? scopeName = null, CancellationToken cancellationToken = default);
    
    // 更新状态(排队更新,在 SuperStep 结束时应用)
    ValueTask QueueStateUpdateAsync<T>(string key, T value, string? scopeName = null, CancellationToken cancellationToken = default);
}

6. WorkflowEvent(工作流事件)

WorkflowEvent(工作流事件) 是工作流执行过程中产生的通知消息,类似于:

类比 说明
广播通知 向所有人广播系统状态变化
日志记录 记录系统执行过程中的关键节点
事件订阅 允许外部监听并响应特定事件
classDiagram
    class WorkflowEvent {
        +object? Data
        +ToString()
    }
    
    class ExecutorEvent {
        +string ExecutorId
    }
    
    class SuperStepEvent {
        +int StepNumber
    }
    
    class WorkflowStartedEvent
    class WorkflowOutputEvent
    class WorkflowErrorEvent
    class WorkflowWarningEvent
    
    class ExecutorInvokedEvent
    class ExecutorCompletedEvent
    class ExecutorFailedEvent
    
    class SuperStepStartedEvent
    class SuperStepCompletedEvent
    
    WorkflowEvent <|-- ExecutorEvent
    WorkflowEvent <|-- SuperStepEvent
    WorkflowEvent <|-- WorkflowStartedEvent
    WorkflowEvent <|-- WorkflowOutputEvent
    WorkflowEvent <|-- WorkflowErrorEvent
    WorkflowEvent <|-- WorkflowWarningEvent
    
    ExecutorEvent <|-- ExecutorInvokedEvent
    ExecutorEvent <|-- ExecutorCompletedEvent
    ExecutorEvent <|-- ExecutorFailedEvent
    
    SuperStepEvent <|-- SuperStepStartedEvent
    SuperStepEvent <|-- SuperStepCompletedEvent
  • 事件分类
事件层级 事件类型 说明
工作流级别 WorkflowStartedEvent 工作流开始执行
WorkflowOutputEvent 工作流产生输出
WorkflowErrorEvent 工作流发生错误
WorkflowWarningEvent 工作流产生警告
超步级别 SuperStepStartedEvent 超步开始
SuperStepCompletedEvent 超步完成
执行器级别 ExecutorInvokedEvent Executor 被调用
ExecutorCompletedEvent Executor 完成处理
ExecutorFailedEvent Executor 处理失败

7. 核心概念:Run(运行实例)

Run(运行实例)是 Workflow 的一次具体执行,类似于:

类比 说明
电影的一场放映 同一部电影可以放映多场
生产线的一个批次 同一条生产线可以生产多个批次
游戏的一局 同一个游戏可以玩多局
flowchart LR
    subgraph "Workflow(工作流定义)"
        W[流程定义]
    end
    
    subgraph "Runs(运行实例)"
        R1[Run 1]
        R2[Run 2]
        R3[Run 3]
    end
    
    W --> R1
    W --> R2
    W --> R3
  • Run 的核心特性
// 来自 Run.cs
public sealed class Run : IAsyncDisposable
{
    // 运行实例的唯一标识符
    public string RunId => this._runHandle.RunId;
    
    // 获取当前执行状态
    public ValueTask<RunStatus> GetStatusAsync(CancellationToken cancellationToken = default);
    
    // 获取所有产生的事件
    public IEnumerable<WorkflowEvent> OutgoingEvents => this._eventSink;
    
    // 获取自上次访问后的新事件
    public IEnumerable<WorkflowEvent> NewEvents { get; }
    
    // 恢复执行(带外部响应)
    public async ValueTask<bool> ResumeAsync(IEnumerable<ExternalResponse> responses, CancellationToken cancellationToken = default);
}
  • RunStatus(运行状态)
public enum RunStatus
{
    NotStarted,      // 尚未开始
    Idle,            // 空闲(已暂停,无待处理请求)
    PendingRequests, // 等待外部响应
    Ended,           // 已结束
    Running          // 正在运行
}
stateDiagram-v2
    [*] --> NotStarted
    NotStarted --> Running : 开始执行
    Running --> Idle : 暂停
    Running --> PendingRequests : 等待外部响应
    Running --> Ended : 完成/终止
    Idle --> Running : 恢复
    PendingRequests --> Running : 收到响应
    Ended --> [*]

8. Checkpoint(检查点)

Checkpoint(检查点) 是工作流在某个时刻的完整状态快照,类似于:

类比 说明
游戏存档 保存游戏进度,随时可以读档继续
照片 记录某一时刻的完整状态
书签 标记阅读进度,下次从这里继续
flowchart LR
    subgraph "工作流执行"
        S1[SuperStep 1] --> C1[Checkpoint 1]
        C1 --> S2[SuperStep 2]
        S2 --> C2[Checkpoint 2]
        C2 --> S3[SuperStep 3]
        S3 --> C3[Checkpoint 3]
    end
    
    subgraph "恢复执行"
        C2 -.->|从此处恢复| S3'[SuperStep 3]
    end
  • Checkpoint 的核心信息
// 来自 CheckpointInfo.cs
public sealed class CheckpointInfo
{
    // 运行实例的唯一标识符
    public string RunId { get; }
    
    // 检查点的唯一标识符
    public string CheckpointId { get; }
}

// 检查点的完整数据(来自 Checkpoint.cs)
internal sealed class Checkpoint
{
    public int StepNumber { get; }                                    // 超步编号
    public WorkflowInfo Workflow { get; }                             // 工作流信息
    public RunnerStateData RunnerData { get; }                        // 运行器状态
    public Dictionary<ScopeKey, PortableValue> StateData { get; }     // 状态数据
    public Dictionary<EdgeId, PortableValue> EdgeStateData { get; }   // 边状态数据
    public CheckpointInfo? Parent { get; }                            // 父检查点
}
  • Checkpoint 的使用场景
场景 说明
故障恢复 系统崩溃后从最近的检查点恢复
长时间运行 分段执行,每段结束保存进度
人机交互 等待用户输入时保存状态
调试回放 从任意检查点重新执行
版本分支 从同一个检查点创建多个分支执行
flowchart TB
    CP[Checkpoint] --> R1[恢复为 Run A]
    CP --> R2[恢复为 Run B]
    CP --> R3[恢复为 Run C]
    
    R1 -->|不同输入| O1[结果 A]
    R2 -->|不同输入| O2[结果 B]
    R3 -->|不同输入| O3[结果 C]

9. 核心概念关系图

让我们把所有核心概念联系起来,看看它们是如何协作的:

flowchart TB
    subgraph "定义层"
        WB[WorkflowBuilder<br>工作流建造器] -->|构建| W[Workflow<br>工作流]
        E1[Executor 1] --> WB
        E2[Executor 2] --> WB
        Edge[Edge 边] --> WB
    end
    
    subgraph "执行层"
        W -->|创建| R[Run<br>运行实例]
        R -->|包含多个| SS[SuperStep<br>超步]
        SS -->|调用| Ex[Executor 执行]
    end
    
    subgraph "运行时"
        Ex -->|使用| WC[WorkflowContext<br>工作流上下文]
        WC -->|产生| WE[WorkflowEvent<br>工作流事件]
        WC -->|管理| State[状态数据]
    end
    
    subgraph "持久化层"
        SS -->|保存| CP[Checkpoint<br>检查点]
        CP -->|包含| State
        CP -->|恢复| R
    end
  • 生命周期视角
sequenceDiagram
    participant Dev as 开发者
    participant WB as WorkflowBuilder
    participant W as Workflow
    participant R as Run
    participant SS as SuperStep
    participant E as Executor
    participant WC as WorkflowContext
    participant CP as Checkpoint
    
    Note over Dev,CP: 1.定义阶段
    Dev->>WB: 创建 WorkflowBuilder
    Dev->>WB: 添加 Executors
    Dev->>WB: 添加 Edges
    WB->>W: Build() 构建
    
    Note over Dev,CP: 2.执行阶段
    Dev->>W: 启动 Run
    W->>R: 创建运行实例
    
    loop 每个 SuperStep
        R->>SS: 开始超步
        SS->>E: 调用 Executor
        E->>WC: 使用上下文
        WC-->>E: 发送消息/读写状态
        E-->>SS: 完成处理
        SS->>CP: 保存检查点
    end
    
    Note over Dev,CP: 3.恢复阶段(可选)
    Dev->>CP: 加载检查点
    CP->>R: 恢复运行
  • 消息流视角,关键理解:
    • 消息驱动:Executor 之间通过消息传递数据
    • 异步批处理:同一 SuperStep 内的 Executor 可以并行执行
    • 边控制流向:Edge 决定消息从哪里到哪里
    • 状态隔离:每个 SuperStep 结束时应用状态更新
flowchart LR
    subgraph "SuperStep 1"
        I[输入消息] --> E1[Executor A]
        E1 -->|通过 Edge| M1[消息队列]
    end
    
    subgraph "SuperStep 2"
        M1 --> E2[Executor B]
        M1 --> E3[Executor C]
        E2 -->|通过 Edge| M2[消息队列]
        E3 -->|通过 Edge| M2
    end
    
    subgraph "SuperStep 3"
        M2 --> E4[Executor D]
        E4 --> O[输出结果]
    end

10. 总结

  • 记忆口诀
    • 工作流程边连接,执行器来做处理
    • 超步批量往前走,上下文中读写态
    • 事件通知全过程,运行实例跑一次
    • 检查点来存进度,随时恢复不怕挂
  • 核心概念速查表
概念 定义 关键类 核心职责
Executor 执行器/处理节点 Executor, Executor, FunctionExecutor 处理消息,产生输出
Edge 连接边 Edge, EdgeData, DirectEdgeData 定义消息流向和条件
Workflow 工作流定义 Workflow, WorkflowBuilder 组织 Executor 和 Edge
SuperStep 超步/批量处理周期 SuperStepEvent, SuperStepStartedEvent 批量处理消息
WorkflowContext 工作流上下文 IWorkflowContext 提供运行时服务
WorkflowEvent 工作流事件 WorkflowEvent, ExecutorEvent 通知执行状态
Run 运行实例 Run, RunStatus 管理一次执行
Checkpoint 检查点 CheckpointInfo, ICheckpointStore 保存和恢复状态

二、条件工作流

1. 业务场景:企业邮箱智能卫士

  • 角色:客服专员、内容安全团队、邮件助手 Agent
  • 输入:每天成百上千封顾客邮件,内容既包含正常咨询也夹杂垃圾信息
  • 流程:
    1. Spam Detection Agent 判定邮件是否为垃圾
    2. Conditional Edge 决策下一步:
      • Legit → Email Assistant Agent → SendEmail 执行器
      • Spam → HandleSpam 执行器,记录原因并触发人工跟进
  • 价值:极大降低人工分拣成本,同时保留风控可观测性,可扩展多级路由(Switch/Multi-Selection)。

2. 核心构建块速览

构建块 中文描述 关注点
Step / Executor 执行单元,输入类型 → 输出类型 SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
Edge 步骤之间的连接 AddEdge(A, B) 表示顺序执行
Conditional Edge 带决策函数的 Edge AddEdge(A, B, condition: Func<object?, bool>)
Shared State 跨步骤共享数据 context.QueueStateUpdateAsync() / ReadStateAsync()
WorkflowEvent 运行事件流 WorkflowOutputEvent, ExecutorCompletedEvent 等
3. 实践流程
flowchart LR
    A[客户邮件] --> B[SpamDetectionExecutor]
    B -->|IsSpam == false| C[EmailAssistantExecutor]
    C --> D[SendEmailExecutor]
    B -->|IsSpam == true| E[HandleSpamExecutor]
    D --> F[WorkflowOutputEvent]
    E --> F
  • 步骤 1:定义领域模型与共享状态键
using System.Text.Json.Serialization;
using Microsoft.Agents.AI.Workflows;

// 统一的共享状态 scope
internal static class EmailStateConstants
{
    public const string EmailStateScope = "EmailState";
}

/// <summary>
/// 垃圾邮件检测结果
/// Spam 判断结果(is_spam + reason)
/// </summary>
public sealed class DetectionResult
{
    [JsonPropertyName("is_spam")]
    public bool IsSpam { get; set; }

    /// <summary>
    /// 判定理由(用于审计和调试)
    /// </summary>
    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;
    
    /// <summary>
    /// 邮件ID(用于关联 Shared State 中的原始内容)
    /// </summary>
    [JsonIgnore]  // 不参与 JSON 序列化
    public string EmailId { get; set; } = string.Empty;
}

/// <summary>
/// 邮件内容(存储在 Shared State 中)
/// 在 Shared State 中保存原始正文,便于下游执行器读取
/// </summary>
internal sealed class Email
{
    [JsonPropertyName("email_id")]
    public string EmailId { get; set; } = string.Empty;

    [JsonPropertyName("email_content")]
    public string EmailContent { get; set; } = string.Empty;
}

/// <summary>
/// 邮件助手生成的回复内容
/// </summary>
public sealed class EmailResponse
{
    [JsonPropertyName("response")]
    public string Response { get; set; } = string.Empty;
}
  • 步骤 2:实现 SpamDetectionExecutor(入口节点)

    职责:

    1. 生成唯一 EmailId,把邮件正文写入 Shared State
    2. 调用 SpamDetectionAgent,拿到结构化 DetectionResult
    3. 把 EmailId 写回结果,供下游执行器继续查找 Shared State
var spamDetectionAgent = new ChatClientAgent(
    ChatClient,
    new ChatClientAgentOptions(instructions: "You are a spam detection assistant that labels spam emails with reasons.")
    {
        ChatOptions = new ChatOptions
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<DetectionResult>()
        }
    });

internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
    private readonly AIAgent _spamDetectionAgent;

    public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
    {
        _spamDetectionAgent = spamDetectionAgent;
    }
    
    public override async ValueTask<DetectionResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        var trackedEmail = new Email
        {
            EmailId = Guid.NewGuid().ToString("N"),
            EmailContent = message.Text
        };
    
        await context.QueueStateUpdateAsync(trackedEmail.EmailId, trackedEmail, scopeName: EmailStateConstants.EmailStateScope, cancellationToken);
    
        var agentResponse = await _spamDetectionAgent.RunAsync(message, cancellationToken: cancellationToken);
        var detection = JsonSerializer.Deserialize<DetectionResult>(agentResponse.Text)
            ?? throw new InvalidOperationException("无法解析 Spam Detection 响应。");
    
        detection.EmailId = trackedEmail.EmailId;
        return detection;
    }
}
  • 步骤 3:实现下游执行器(EmailAssistant + SendEmail + HandleSpam)

    • EmailAssistantExecutor:读取 Shared State 中的正文,再调用第二个 Agent 输出 JSON 回复

    • SendEmailExecutor:模拟发送,使用 context.YieldOutputAsync 输出成功消息

    • HandleSpamExecutor:命中垃圾时输出风险提示,若误入则抛出异常,帮助我们在调试阶段及时发现边误判

var emailAssistantAgent = new ChatClientAgent(
  ChatClient,
  new ChatClientAgentOptions(instructions: "You are an enterprise email assistant. Provide professional Chinese responses.")
  {
      ChatOptions = new ChatOptions
      {
          ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
      }
  });

internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
    private readonly AIAgent _emailAssistantAgent;

    public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
    {
        _emailAssistantAgent = emailAssistantAgent;
    }
    
    public override async ValueTask<EmailResponse> HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (message.IsSpam)
        {
            throw new InvalidOperationException("Spam 邮件不应进入 EmailAssistantExecutor。");
        }
    
        var email = await context.ReadStateAsync<Email>(message.EmailId, scopeName: EmailStateConstants.EmailStateScope, cancellationToken)
            ?? throw new InvalidOperationException("找不到对应 Email 内容。");
    
        var agentResponse = await _emailAssistantAgent.RunAsync(email.EmailContent, cancellationToken: cancellationToken);
        return JsonSerializer.Deserialize<EmailResponse>(agentResponse.Text)
            ?? throw new InvalidOperationException("无法解析 Email Assistant 响应。");
    }
}

internal sealed class SendEmailExecutor() : Executor<EmailResponse>("SendEmailExecutor")
{
    public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        await context.YieldOutputAsync($"Email sent: {message.Response}", cancellationToken);
    }
}

internal sealed class HandleSpamExecutor() : Executor<DetectionResult>("HandleSpamExecutor")
{
    public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        if (!message.IsSpam)
        {
            throw new InvalidOperationException("非垃圾邮件不应进入 HandleSpamExecutor。");
        }

        await context.YieldOutputAsync($"Spam captured: {message.Reason}", cancellationToken);
    }
}
  • 步骤 4:使用 Conditional Edge 构建工作流

    核心代码:

    • GetCondition(bool expected):返回 Func<object?, bool>,转型后比对 DetectionResult.IsSpam

    • AddEdge(spamDetection, emailAssistant, condition: GetCondition(false))

    • AddEdge(spamDetection, handleSpam, condition: GetCondition(true))

    • WithOutputFrom(handleSpam, sendEmail):串联事件输出

运行结果将随输入邮件内容决定走向,实现真正的 if-else 路由。

Func<object?, bool> BuildCondition(bool expectedSpamFlag) =>
detection => detection is DetectionResult dr && dr.IsSpam == expectedSpamFlag;

var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();

var conditionalWorkflow = new WorkflowBuilder(spamDetectionExecutor)
    .AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: BuildCondition(false))
    .AddEdge(emailAssistantExecutor, sendEmailExecutor)
    .AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: BuildCondition(true))
    .WithOutputFrom(handleSpamExecutor, sendEmailExecutor)
    .Build();
  • 步骤 5:运行工作流并观察事件

    • 使用 InProcessExecution.StreamAsync 获取 StreamingRun

    • 通过 run.TrySendMessageAsync(new TurnToken(emitEvents: true)) 打开事件推送

    • 订阅 run.WatchStreamAsync(),把 WorkflowEvent 统一输出

static async Task RunWorkflowAsync(Workflow conditionalWorkflow, string scenarioName, string emailBody)
{
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine($"场景:{scenarioName}");
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");

    await using StreamingRun run = await InProcessExecution.StreamAsync(conditionalWorkflow, new ChatMessage(ChatRole.User, emailBody));
    await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
    
    await foreach (WorkflowEvent evt in run.WatchStreamAsync())
    {
        switch (evt)
        {
            case ExecutorCompletedEvent completed:
                completed.Display();
                break;
            case WorkflowOutputEvent outputEvent:
                outputEvent.Display();
                break;
        }
    }
}

var legitimateEmail = @"客服团队你好,我想确认上周提交的采购订单是否已经发货,如果还有缺少信息请告知。";
await RunWorkflowAsync(conditionalWorkflow, "正常咨询 → EmailAssistant 分支", legitimateEmail);
  • 步骤 6:触发垃圾邮件分支以验证条件

    • 使用 Resources/spam.txt(与源码一致)或自定义明显的垃圾内容

    • 预期路径:SpamDetectionExecutor → HandleSpamExecutor

    • 输出信息应包含 Spam captured,表明 Conditional Edge 跳转正确

var spamEmail = @"令人惊喜的投资机会!只需支付保证金即可在 24 小时内获得 10 倍收益,点击可疑链接领取奖励。";
await RunWorkflowAsync(conditionalWorkflow, "垃圾邮件 → HandleSpam 分支", spamEmail);

4. 高级场景与最佳实践

  • 条件函数设计技巧

    • 始终进行类型检查:result is DetectionResult detection

    • 将条件函数提取为独立方法,避免在 AddEdge 中写长匿名函数

    • 尽量保持纯函数(无副作用),保证工作流可预测

  • 调试建议

    • 打印 ExecutorCompletedEvent 的输入输出摘要,若条件无法命中可快速定位

    • 可将 WorkflowEvent 推送到前端(SignalR/AGUI)实时可视化

  • 常见错误与解决方案

    • 条件未命中:确认 WithOutputFrom 是否把输出连接到正确节点
    • 状态缺失:运行前先确认入口执行器已把数据写入 Shared State
    • 类型不匹配:确保执行器泛型签名与实际输入输出一致

5. 实际应用示例:多级风控 + 工单系统

在企业环境中,可将工作流作为“一级过滤器”,再将输出接入更多节点:

  1. HandleSpamExecutor 输出后追加 RiskAuditExecutor,把 spam 详情写入 Cosmos DB 或内部审计系统
  2. 将 EmailAssistantExecutor 的结果包装成 WorkflowAgent,供客服 Copilot 直接调用,实现自动回信
  3. 若检测为高风险,可通过 AddEdge 连到 ManualReviewExecutor,在其中调用 Teams / AGUI 通知人工审批
var extendedWorkflow = new WorkflowBuilder(spamDetectionExecutor)
    .AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: BuildCondition(false))
    .AddEdge(emailAssistantExecutor, sendEmailExecutor)
    .AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: BuildCondition(true))
    .AddEdge(handleSpamExecutor, riskAuditExecutor)
    .AddEdge(riskAuditExecutor, manualReviewExecutor, condition: BuildCondition(true))
    .Build();

通过“Conditional Edge + 多级 Executor”,可以轻松演进到复杂的内容安全与客服协同系统。

三、分支工作流

1. 业务场景:企业智能邮件安全中心

前面我们实现了简单的二分法邮件过滤(垃圾/正常)。但在企业实际场景中,邮件安全判定往往需要更精细的分类:

  • 角色:企业安全团队、客服专员、AI 安全助手
  • 挑战:每天处理数千封邮件,其中包含:
    • 正常邮件(NotSpam):客户咨询、业务往来
    • 垃圾邮件(Spam):明显的诈骗、广告
    • 不确定邮件(Uncertain):可能是钓鱼邮件,需要人工审核

2. 业务流程

flowchart LR
    A[客户邮件] --> B[SpamDetectionExecutor]
    B -->|NotSpam| C[EmailAssistantExecutor]
    C --> D[SendEmailExecutor]
    B -->|Spam| E[HandleSpamExecutor]
    B -->|Uncertain| F[HandleUncertainExecutor]
    D --> G[WorkflowOutput]
    E --> G
    F --> G

3. 核心构建块

构建块 中文描述 本课关注点
AddSwitch Switch-Case 路由构建器 builder.AddSwitch(sourceStep, switchBuilder => ...)
AddCase 单个 Case 分支 switchBuilder.AddCase(condition, targetStep)
WithDefault Default 兜底分支 switchBuilder.WithDefault(defaultStep)
Enum 条件 基于枚举的精确匹配 SpamDecision.NotSpam / Spam / Uncertain
Shared State 跨步骤共享数据 存储邮件内容供下游读取

API 对比:

// 多个 Conditional Edge
builder
    .AddEdge(source, target1, condition: c => c.IsSpam == false)
    .AddEdge(source, target2, condition: c => c.IsSpam == true);

// Switch-Case
builder.AddSwitch(source, sb => sb
    .AddCase(c => c.Decision == NotSpam, target1)
    .AddCase(c => c.Decision == Spam, target2)
    .WithDefault(target3)
);

3. 实践流程

核心代码结构:

var workflow = new WorkflowBuilder(spamDetectionExecutor)
    .AddSwitch(spamDetectionExecutor, switchBuilder =>
        switchBuilder
            .AddCase(GetCondition(SpamDecision.NotSpam), emailAssistantExecutor)
            .AddCase(GetCondition(SpamDecision.Spam), handleSpamExecutor)
            .WithDefault(handleUncertainExecutor)  // 兜底机制
    )
    .AddEdge(emailAssistantExecutor, sendEmailExecutor)
    .WithOutputFrom(handleSpamExecutor, sendEmailExecutor, handleUncertainExecutor)
    .Build();
  • 步骤 1:定义领域模型与三分类枚举
public enum SpamDecision
{
    NotSpam,     // 正常邮件,可以自动回复
    Spam,        // 明显的垃圾邮件,需要拦截
    Uncertain    // 不确定的邮件,需要人工审核
}


public sealed class DetectionResult
{
    public bool IsSpam { get; set; }  // 只有两种状态
    
    /// <summary>
    /// 检测决策(NotSpam / Spam / Uncertain)
    /// </summary>
    [JsonPropertyName("spam_decision")]
    [JsonConverter(typeof(JsonStringEnumConverter))]  // JSON 序列化为字符串
    public SpamDecision spamDecision { get; set; }
}
  • 步骤 2:实现 Spam Detection Executor(支持三分类)

    核心职责

    1. 生成唯一 EmailId:为每封邮件分配唯一标识
    2. 保存到 Shared State:将邮件内容存储,供下游 Executor 读取
    3. 调用 AI Agent 检测:使用结构化输出获取 DetectionResult
    4. 返回检测结果:包含 spamDecision 枚举值和判定理由

    关键技术点

    • 结构化输出:ChatResponseFormat.ForJsonSchema()

    • 枚举序列化:JsonStringEnumConverter 确保 JSON 中使用字符串而非数字

    • 状态管理:context.QueueStateUpdateAsync() 写入邮件内容

    • 三分类提示词:指示 AI 在不确定时选择 Uncertain

var spamDetectionAgent = new ChatClientAgent(
    ChatClient,
    new ChatClientAgentOptions(
        instructions: @"你是一个垃圾邮件检测助手。判定规则:
        

- NotSpam: 明显的正常业务邮件(订单查询、售后咨询等)
- Spam: 明显的垃圾邮件(诈骗、广告、钓鱼)
- Uncertain: 无法明确判断,包含可疑元素但不确定(如含可疑链接但内容模糊)

对于模棱两可的情况,倾向于标记为 Uncertain 以保证安全。"
    )
    {
        ChatOptions = new ChatOptions
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<DetectionResult>()
        }
    }
);

/// <summary>
/// 垃圾邮件检测执行器
/// 输入: ChatMessage(邮件内容)
/// 输出: DetectionResult(三分类结果)
/// </summary>
internal sealed class SpamDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
    private readonly AIAgent _spamDetectionAgent;

    public SpamDetectionExecutor(AIAgent spamDetectionAgent) : base("SpamDetectionExecutor")
    {
        _spamDetectionAgent = spamDetectionAgent;
    }
    
    public override async ValueTask<DetectionResult> HandleAsync(
        ChatMessage message,
        IWorkflowContext context,
        CancellationToken cancellationToken = default)
    {
        // 1.生成唯一邮件ID并保存内容到 Shared State
        var trackedEmail = new Email
        {
            EmailId = Guid.NewGuid().ToString("N"),
            EmailContent = message.Text
        };
    
        await context.QueueStateUpdateAsync(
            trackedEmail.EmailId,
            trackedEmail,
            scopeName: EmailStateConstants.EmailStateScope,
            cancellationToken
        );
    
        // 2.调用 AI Agent 进行三分类检测
        var agentResponse = await _spamDetectionAgent.RunAsync(
            message,
            cancellationToken: cancellationToken
        );
    
        // 3.解析结构化输出
        var detection = JsonSerializer.Deserialize<DetectionResult>(agentResponse.Text)
            ?? throw new InvalidOperationException("无法解析 Spam Detection 响应");
    
        // 4.关联 EmailId(供下游 Executor 查找原始内容)
        detection.EmailId = trackedEmail.EmailId;
    
        return detection;
    }
}
  • 步骤 3:实现三个下游处理器,根据 Switch-Case 的三个分支,需要实现三个下游 Executor:
flowchart LR
    A[SpamDetectionExecutor] -->|NotSpam| B[EmailAssistantExecutor]
    B --> C[SendEmailExecutor]
    A -->|Spam| D[HandleSpamExecutor]
    A -->|Uncertain| E[HandleUncertainExecutor]

处理器职责

Executor 触发条件 主要职责
EmailAssistantExecutor NotSpam 调用 AI 生成专业回复
SendEmailExecutor EmailAssistant 之后 模拟发送邮件
HandleSpamExecutor Spam 记录拦截信息
HandleUncertainExecutor Uncertain (Default) 标记为待审核
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 1. 正常邮件分支:EmailAssistant + SendEmail
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
var emailAssistantAgent = new ChatClientAgent(
    chatClient,
    new ChatClientAgentOptions(
        instructions: "你是一个企业邮件助手,为客户邮件生成专业、友好的中文回复。"
    )
    {
        ChatOptions = new ChatOptions
        {
            ResponseFormat = ChatResponseFormat.ForJsonSchema<EmailResponse>()
        }
    }
);

/// <summary>
/// 邮件助手执行器(仅处理正常邮件)
/// 输入: DetectionResult(必须是 NotSpam)
/// 输出: EmailResponse(AI 生成的回复)
/// </summary>
internal sealed class EmailAssistantExecutor : Executor<DetectionResult, EmailResponse>
{
    private readonly AIAgent _emailAssistantAgent;

    public EmailAssistantExecutor(AIAgent emailAssistantAgent) : base("EmailAssistantExecutor")
    {
        _emailAssistantAgent = emailAssistantAgent;
    }
    
    public override async ValueTask<EmailResponse> HandleAsync(
        DetectionResult message,
        IWorkflowContext context,
        CancellationToken cancellationToken = default)
    {
        // 防御性检查:确保只处理正常邮件
        if (message.spamDecision == SpamDecision.Spam)
        {
            throw new InvalidOperationException(
                "EmailAssistantExecutor 不应处理垃圾邮件,请检查路由配置。"
            );
        }
    
        // 1.从 Shared State 读取原始邮件内容
        var email = await context.ReadStateAsync<Email>(
            message.EmailId,
            scopeName: EmailStateConstants.EmailStateScope,
            cancellationToken
        ) ?? throw new InvalidOperationException($"找不到 EmailId={message.EmailId} 的邮件内容");
    
        // 2.调用 AI Agent 生成回复
        var agentResponse = await _emailAssistantAgent.RunAsync(
            email.EmailContent,
            cancellationToken: cancellationToken
        );
    
        // 3.解析结构化输出
        return JsonSerializer.Deserialize<EmailResponse>(agentResponse.Text)
            ?? throw new InvalidOperationException("无法解析 Email Assistant 响应");
    }

}

/// <summary>
/// 邮件发送执行器(模拟发送)
/// 输入: EmailResponse
/// 输出: 工作流事件
/// </summary>
internal sealed class SendEmailExecutor() : Executor<EmailResponse>("SendEmailExecutor")
{
    public override async ValueTask HandleAsync(
        EmailResponse message,
        IWorkflowContext context,
        CancellationToken cancellationToken = default)
    {
        // 模拟邮件发送(实际项目中可调用 SMTP、SendGrid 等服务)
        await context.YieldOutputAsync(
            $"邮件已发送: {message.Response}",
            cancellationToken
        );
    }
}

/// <summary>
/// 2. 垃圾邮件分支 + 不确定邮件分支
/// 输入: DetectionResult(必须是 Spam)
/// 输出: 工作流事件
/// </summary>
internal sealed class HandleSpamExecutor() : Executor<DetectionResult>("HandleSpamExecutor")
{
    public override async ValueTask HandleAsync(
        DetectionResult message,
        IWorkflowContext context,
        CancellationToken cancellationToken = default)
    {
        // 防御性检查:确保只处理垃圾邮件
        if (message.spamDecision != SpamDecision.Spam)
        {
            throw new InvalidOperationException(
                "HandleSpamExecutor 只应处理 Spam 类型的邮件,请检查路由配置。"
            );
        }

        // 记录垃圾邮件(实际项目中可写入数据库或日志系统)
        await context.YieldOutputAsync(
            $"垃圾邮件已拦截: {message.Reason}",
            cancellationToken
        );
    }

}

/// <summary>
/// 不确定邮件处理执行器(Default Case)
/// 输入: DetectionResult(应该是 Uncertain,但也处理其他未匹配情况)
/// 输出: 工作流事件
/// </summary>
internal sealed class HandleUncertainExecutor() : Executor<DetectionResult>("HandleUncertainExecutor")
{
    public override async ValueTask HandleAsync(
        DetectionResult message,
        IWorkflowContext context,
        CancellationToken cancellationToken = default)
    {
        // 防御性检查:确保只处理不确定邮件
        if (message.spamDecision != SpamDecision.Uncertain)
        {
            throw new InvalidOperationException(
                "HandleUncertainExecutor 只应处理 Uncertain 类型的邮件(或作为 Default Case)。"
            );
        }

        // 1.从 Shared State 读取原始邮件内容(用于人工审核)
        var email = await context.ReadStateAsync<Email>(
            message.EmailId,
            scopeName: EmailStateConstants.EmailStateScope,
            cancellationToken
        );
    
        // 2.输出待审核信息
        await context.YieldOutputAsync(
            $"不确定邮件需人工审核:\n" +
            $"原因: {message.Reason}\n" +
            $"内容预览: {email?.EmailContent?.Substring(0, Math.Min(100, email.EmailContent.Length))}...",
            cancellationToken
        );
    }
}
  • 步骤 4:使用 AddSwitch 构建 Switch-Case 工作流

    为了保持代码简洁,我们定义一个工厂方法来生成条件函数,为什么这样设计?

    • 类型安全:使用 is 模式匹配确保类型正确
    • 可复用:通过参数化生成不同的条件函数
    • 可读性强:条件逻辑清晰明确
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 条件函数工厂方法
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
/// <summary>
/// 创建基于 SpamDecision 枚举的条件函数
/// </summary>
/// <param name="expectedDecision">期望的垃圾邮件判定结果</param>
/// <returns>条件函数,用于 Switch-Case 路由</returns>
Func<object?, bool> GetCondition(SpamDecision expectedDecision) =>
    detectionResult =>
        detectionResult is DetectionResult result &&
        result.spamDecision == expectedDecision;

// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 创建执行器实例
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
var spamDetectionExecutor = new SpamDetectionExecutor(spamDetectionAgent);
var emailAssistantExecutor = new EmailAssistantExecutor(emailAssistantAgent);
var sendEmailExecutor = new SendEmailExecutor();
var handleSpamExecutor = new HandleSpamExecutor();
var handleUncertainExecutor = new HandleUncertainExecutor();

// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 使用 AddSwitch 构建 Switch-Case 工作流
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
var switchCaseWorkflow = new WorkflowBuilder(spamDetectionExecutor)
    // AddSwitch: 定义 Switch-Case 路由
    .AddSwitch(spamDetectionExecutor, switchBuilder =>
        switchBuilder
            // Case 1: NotSpam → EmailAssistant
            .AddCase(
                GetCondition(expectedDecision: SpamDecision.NotSpam),
                emailAssistantExecutor
            )
            // Case 2: Spam → HandleSpam
            .AddCase(
                GetCondition(expectedDecision: SpamDecision.Spam),
                handleSpamExecutor
            )
            // Default: Uncertain (或任何未匹配的情况) → HandleUncertain
            .WithDefault(
                handleUncertainExecutor
            )
    )
    // EmailAssistant 之后自动发送邮件
    .AddEdge(emailAssistantExecutor, sendEmailExecutor)
    // 配置输出节点(三个终点执行器都会产生输出)
    .WithOutputFrom(handleSpamExecutor, sendEmailExecutor, handleUncertainExecutor)
    .Build();

5. 测试

  • 测试策略
邮件类型 预期路径 验证目标
正常邮件 NotSpam → EmailAssistant → SendEmail 验证 Case 1
垃圾邮件 Spam → HandleSpam 验证 Case 2
不确定邮件 Uncertain → HandleUncertain 验证 Default
  • 为了避免重复代码,我们定义一个统一的运行函数
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 通用工作流运行函数
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
/// <summary>
/// 运行工作流并输出事件流
/// </summary>
static async Task RunWorkflowAsync(
    Workflow workflow,
    string scenarioName,
    string emailContent)
{
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine($"测试场景:{scenarioName}");
    Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
    Console.WriteLine($"邮件内容:{emailContent.Substring(0, Math.Min(80, emailContent.Length))}...\n");

    await using StreamingRun run = await InProcessExecution.StreamAsync(
        workflow,
        new ChatMessage(ChatRole.User, emailContent)
    );
    
    // 发送 Turn Token,启用事件推送
    await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
    
    // 订阅事件流
    await foreach (WorkflowEvent evt in run.WatchStreamAsync())
    {
        switch (evt)
        {
            case ExecutorCompletedEvent completed:
                completed.Display();
                break;
    
            case WorkflowOutputEvent outputEvent:
                outputEvent.Display();
                break;
        }
    }    
    Console.WriteLine();
}
  • 测试正常邮件路径(Case 1: NotSpam)

    预期行为:

    1. SpamDetectionExecutor 判定为 NotSpam
    2. Switch 路由到 EmailAssistantExecutor
    3. EmailAssistantExecutor 生成回复
    4. SendEmailExecutor 发送邮件
    5. 输出:“邮件已发送: ...“
// 测试场景 1:正常客户咨询邮件
var legitimateEmail = @"
尊敬的客服团队:

您好!我是贵公司的长期客户,订单号为 #2025-001。

我想确认一下上周提交的采购订单是否已经安排发货。
如果需要补充任何信息,请随时告知。

期待您的回复,谢谢!

客户:张先生
";

await RunWorkflowAsync(
    switchCaseWorkflow,
    "正常邮件 → EmailAssistant → SendEmail",
    legitimateEmail
);
  • 测试垃圾邮件路径(Case 2: Spam)

    预期行为:

    1. SpamDetectionExecutor 判定为 Spam
    2. Switch 路由到 HandleSpamExecutor
    3. 输出:"垃圾邮件已拦截: ..."
    4. 不会进入 EmailAssistant 分支
// 测试场景 2:明显的垃圾邮件
var spamEmail = @"
恭喜您中奖啦!

您已被选中获得 100 万现金大奖!

立即点击以下链接领取:
http://suspicious-site.com/claim-prize

仅限今日有效,过期作废!
不需要任何手续费,完全免费!

快速行动,机不可失!
";

await RunWorkflowAsync(
    switchCaseWorkflow,
    "垃圾邮件 → HandleSpam",
    spamEmail
);
  • 测试不确定邮件路径(Default Case: Uncertain)

    预期行为:

    1. SpamDetectionExecutor 判定为 Uncertain
    2. Switch 路由到 HandleUncertainExecutor(Default Case)
    3. 输出:“不确定邮件需人工审核: ...“
    4. 包含邮件内容预览,便于人工判断

    为什么需要 Default Case?

    • 兜底机制:即使 AI 无法明确判断,也有对应处理流程

    • 安全优先:可疑邮件不会被错误归类为正常或垃圾

    • 人工介入:为需要深度审核的情况预留通道

// 测试场景 3:不确定的可疑邮件(来自源码的 ambiguous_email.txt)
var uncertainEmail = @"
主题:需要验证您的账户

尊敬的客户:

我们检测到您的账户存在异常活动,需要验证您的身份以确保账户安全。

请登录您的账户并完成验证流程,以继续使用服务。

账户详情:

- 用户名:johndoe@contoso.com
- 最后登录:08/15/2025
- 登录地点:西雅图,华盛顿州
- 登录设备:移动设备

这是一项自动安全措施。如果您认为此邮件是错误发送的,请立即联系我们的支持团队。

此致
安全团队
客户服务部门
";

await RunWorkflowAsync(
    switchCaseWorkflow,
    "不确定邮件 → HandleUncertain (Default)",
    uncertainEmail
);

6. Default Case 的最佳实践

推荐使用的场景:

  • 枚举值可能扩展:未来可能添加新的分类类型
  • 防御性编程:确保所有情况都有处理路径
  • 不确定判断:AI 模型输出可能包含"不确定"状态
  • 兜底机制:为意外情况提供安全降级

不需要 Default 的场景:

  • 枚举值完全穷尽且不会变化
  • 每个 Case 都已明确定义
  • 希望未匹配时抛出异常(快速失败)

Default Case 的三种处理策略

  • 策略 1:人工审核(推荐)

    优势:安全优先,可疑情况由人工判断

.WithDefault(humanReviewExecutor)
  • 策略 2:保守处理

    优势:适用于安全敏感场景

.WithDefault(treatAsSpamExecutor)  // 宁可误拦,不可放过
  • 策略 3:日志记录

    优势:适用于非关键路径

.WithDefault(logAndIgnoreExecutor)  // 记录后忽略

Default Case 的防御性检查,即使配置了 Default,执行器内部仍应进行验证:

为什么需要双重检查?

  • 捕获配置错误:及早发现路由配置问题
  • 明确职责边界:执行器清楚知道自己应该处理什么
  • 便于调试:输出警告信息,快速定位问题
public override async ValueTask HandleAsync(DetectionResult message, ...)
{
    // 推荐:检查是否是预期的类型
    if (message.spamDecision != SpamDecision.Uncertain)
    {
        // 记录警告或抛出异常
        Console.WriteLine($"警告:HandleUncertainExecutor 收到非预期类型: {message.spamDecision}");
    }
    
    // 继续处理...
}

7. 高级场景与最佳实践

  • 多维度分类路由,在实际企业场景中,可能需要更细粒度的分类:

    优势:

    • 更精细的安全分级
    • 不同风险级别有不同的响应策略
    • 易于根据业务需求调整分类标准
public enum EmailRiskLevel
{
    Safe,           // 完全安全
    LowRisk,        // 低风险(可能是营销邮件)
    MediumRisk,     // 中风险(可疑但不确定)
    HighRisk,       // 高风险(疑似钓鱼)
    Malicious       // 恶意(确认的威胁)
}

// 对应的 Switch-Case 工作流
.AddSwitch(riskAssessmentExecutor, sb => sb
    .AddCase(GetRiskCondition(Safe), autoReplyExecutor)
    .AddCase(GetRiskCondition(LowRisk), marketingFilterExecutor)
    .AddCase(GetRiskCondition(MediumRisk), humanReviewExecutor)
    .AddCase(GetRiskCondition(HighRisk), securityTeamAlertExecutor)
    .AddCase(GetRiskCondition(Malicious), blockAndReportExecutor)
    .WithDefault(escalateExecutor)
)
  • 动态 Case 构建,某些场景下,Case 分支可能需要根据配置动态生成:

    适用场景:

    • 需要热更新分类规则
    • 多租户系统,每个租户有不同的分类策略
    • A/B 测试不同的路由策略
// 从配置加载分类策略
var classificationRules = LoadClassificationRules();

var switchBuilder = new WorkflowBuilder(classifierExecutor);
var innerSwitchBuilder = switchBuilder.AddSwitch(classifierExecutor, sb => {
    foreach (var rule in classificationRules)
    {
        sb.AddCase(
            result => result is ClassificationResult r && r.Category == rule.Category,
            rule.TargetExecutor
        );
    }
    return sb.WithDefault(defaultExecutor);
});
  • 常见错误与解决方案

    • 错误 1:条件函数重叠

      解决方案:确保条件互斥或使用优先级顺序

    • 错误 2:忘记配置输出

      解决方案:确保所有终点执行器都在 WithOutputFrom 中

    • 错误 3:Default Case 中的逻辑错误

      解决方案:始终进行防御性检查

// 1.条件函数重叠
// 错误:两个条件可能同时为 true
.AddCase(r => r is DetectionResult dr && dr.Reason.Contains("spam"), executor1)
.AddCase(r => r is DetectionResult dr && dr.Reason.Contains("phishing"), executor2)
// 如果 Reason 同时包含 "spam" 和 "phishing",会路由到哪里?
    
// 正确:使用枚举确保互斥
.AddCase(GetCondition(SpamDecision.Spam), executor1)
.AddCase(GetCondition(SpamDecision.Phishing), executor2)
    
// 2:忘记配置输出
// 错误:HandleSpamExecutor 的输出没有配置
var workflow = new WorkflowBuilder(spamDetectionExecutor)
    .AddSwitch(...)
    .WithOutputFrom(sendEmailExecutor)  // 只配置了一个输出
    .Build();

// 正确:配置所有输出节点
.WithOutputFrom(handleSpamExecutor, sendEmailExecutor, handleUncertainExecutor)
   
// 3. Default Case 中的逻辑错误
// 错误:Default 执行器假设只会收到 Uncertain
public override async ValueTask HandleAsync(DetectionResult message, ...)
{
    // 直接使用,没有检查
    await ProcessUncertainEmail(message);  // 如果 message 是其他类型呢?
}

// 正确:验证输入类型
if (message.spamDecision != SpamDecision.Uncertain)
{
    throw new InvalidOperationException($"意外的决策类型: {message.spamDecision}");
}

8. 实际应用示例:企业级内容审核系统

在基础上,我们可以将 Switch-Case 模式扩展到更广泛的内容审核场景:

flowchart TD
    A[用户提交内容] --> B[内容审核Executor]
    B -->|安全| C[自动发布]
    B -->|轻微违规| D[自动过滤敏感词]
    B -->|中度违规| E[人工审核]
    B -->|严重违规| F[拒绝并记录]
    B -->|不确定| G[AI二次审核]

以下代码展示如何构建一个企业级内容审核工作流:

// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 内容审核分类枚举
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
public enum ContentModerationType
{
    Safe,           // 安全内容,可直接发布
    MinorViolation, // 轻微违规,自动过滤后发布
    ModerateRisk,   // 中度风险,需要人工审核
    SevereViolation,// 严重违规,拒绝发布
    Uncertain       // 不确定,需要二次审核
}

// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 构建多级内容审核工作流
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
var contentModerationWorkflow = new WorkflowBuilder(contentAnalysisExecutor)
    .AddSwitch(contentAnalysisExecutor, sb => sb
        // Case 1: 安全内容 → 直接发布
        .AddCase(
            GetModerationCondition(ContentModerationType.Safe),
            autoPublishExecutor
        )
        // Case 2: 轻微违规 → 过滤后发布
        .AddCase(
            GetModerationCondition(ContentModerationType.MinorViolation),
            filterAndPublishExecutor
        )
        // Case 3: 中度风险 → 人工审核
        .AddCase(
            GetModerationCondition(ContentModerationType.ModerateRisk),
            humanModerationExecutor
        )
        // Case 4: 严重违规 → 拒绝并记录
        .AddCase(
            GetModerationCondition(ContentModerationType.SevereViolation),
            rejectAndLogExecutor
        )
        // Default: 不确定 → AI 二次审核
        .WithDefault(
            aiSecondaryReviewExecutor
        )
    )
    // 配置所有可能的输出节点
    .WithOutputFrom(
        autoPublishExecutor,
        filterAndPublishExecutor,
        humanModerationExecutor,
        rejectAndLogExecutor,
        aiSecondaryReviewExecutor
    )
    .Build();

扩展点,基于这个框架,可以进一步扩展:

  • 多级审核流程
// AI 二次审核后可能再次路由到人工审核
.AddEdge(aiSecondaryReviewExecutor, humanModerationExecutor, 
         condition: result => result.ConfidenceScore < 0.7)
  • 审核结果统计
// 每个执行器都可以记录统计数据到 Shared State
await context.QueueStateUpdateAsync("stats", new ModerationStats
{
    TotalProcessed = total,
    AutoApproved = autoApproved,
    HumanReviewed = humanReviewed
});
  • 实时监控仪表盘
// 通过 WorkflowEvent 推送到前端实时显示
await context.AddEventAsync(new ModerationDashboardEvent
{
    ModerationType = result.Type,
    ProcessingTime = elapsed,
    ExecutorName = this.Name
});

实际部署建议,在生产环境中使用时:

  • 日志记录:所有分支都记录详细日志
  • 性能监控:跟踪每个 Case 的执行时间
  • A/B 测试:对比不同分类策略的效果
  • 人工反馈:将人工审核结果反馈给 AI 模型训练

四、循环工作流

1. 业务场景:智能客服工单质检与自动改进

  • 背景:某电商平台每天产生大量客服工单回复。AI 助手先生成回复草稿,然后经过多维度质检(礼貌度、准确性、合规性),不合格则自动改进,直到达标或转人工处理。
  • 角色:回复生成 Agent、质检 Agent、智能改进 Executor、人工客服(兜底)
  • 痛点:
    • 初稿常见问题:语气生硬、信息不全、包含敏感词
    • 需要多维度评分:礼貌度 ≥ 85、准确性 ≥ 90、合规性 100%
    • 需要可控的重试次数,避免无限循环
    • 需要可观测的状态,便于人工介入和审计
  • 目标:构建可配置的 Loop,自动迭代"生成 → 质检 → 改进",只要满足门槛即可退出,否则转人工处理

示例输入数据:

var tickets = new []
{
    new { Id = "TKT-2025-001", Query = "我买的手机充不进电了", Category = "电子产品", Priority = "High" },
    new { Id = "TKT-2025-002", Query = "订单3天没发货,能退款吗", Category = "物流问题", Priority = "Medium" }
};

2. 核心概念

概念 中文释义 在 Loop 中的作用
Loop Edge 循环边 将质检结果重新路由到生成/改进节点,实现多次尝试
Self-Correction Pipeline 自我修正管道 组合生成、质检、改进三个步骤,自动闭环
Multi-Dimension Quality 多维度质检 礼貌度、准确性、合规性三重评分,全部达标才通过
Max Attempts 最大尝试次数 防止无限循环,触发转人工策略
WorkflowContext State 工作流上下文状态 记录 AttemptCount、各维度得分、问题列表
Streaming Events 流式事件 实时观测每轮执行情况,生成审计日志

3. 实践流程

  • 步骤 1:准备业务数据与质检基线

    我们使用三份工单请求,模拟不同优先级和问题类型,并设置多维度质检阈值。

using System.Text.Json.Serialization;

internal record TicketRequest(string Id, string Query, string Category, string Priority);

// 质检结果的结构化输出模型
internal class QualityReportDto
{
    [JsonPropertyName("politenessScore")]
    public int PolitenessScore { get; set; }
    

    [JsonPropertyName("accuracyScore")]
    public int AccuracyScore { get; set; }
    
    [JsonPropertyName("compliancePassed")]
    public bool CompliancePassed { get; set; }
    
    [JsonPropertyName("issues")]
    public List<QualityIssueDto> Issues { get; set; } = new();

}

internal class QualityIssueDto
{
    [JsonPropertyName("type")]
    public string Type { get; set; } = "";
    

    [JsonPropertyName("description")]
    public string Description { get; set; } = "";
    
    [JsonPropertyName("scoreImpact")]
    public int ScoreImpact { get; set; }

}

internal record QualityIssue(string Type, string Description, int ScoreImpact);

var ticketRequests = new []
{
    new TicketRequest("TKT-2025-001", "我买的手机充不进电了,什么情况?", "电子产品", "High"),
    new TicketRequest("TKT-2025-002", "订单已经3天没发货,能退款吗?", "物流问题", "Medium"),
    new TicketRequest("TKT-2025-003", "会员积分为什么突然清零了?", "账户问题", "Low")
};

// 质检标准:礼貌度 ≥ 85,准确性 ≥ 90,合规性必须 100%
// 注意:阈值设置较高,配合第一次生成简化版本,确保能体现循环改进过程
const int politenessThreshold = 85;
const int accuracyThreshold = 90;

new
{
    工单总数 = ticketRequests.Length,
    礼貌度阈值 = politenessThreshold,
    准确性阈值 = accuracyThreshold,
    合规性要求 = "100% 通过",
    示例工单 = ticketRequests.First()
}.Display();
  • 步骤 2:实现基础循环(生成 → 质检)

    先定义两个 Executor:

    1. ReplyDraftExecutor:根据客户问题生成回复草稿(模拟 AI 产出)。
    2. QualityCheckExecutor:执行多维度质检,根据得分决定是否继续循环。

    Loop 逻辑:当任一维度不达标时,发送 QCSignal.Revise,将结果回流到生成节点。

    AI 集成说明,AI 的三大作用:

    • 生成回复 (ReplyDraftExecutor)
      • 使用 AI 根据客户问题生成专业的客服回复
      • Prompt 包含:客户问题、产品类别、优先级、回复要求
    • 质检评分 (QualityCheckExecutor)
      • AI 担任质检专家角色,对回复进行多维度评分
      • 返回结构化 JSON:礼貌度、准确性、合规性 + 具体问题列表
      • 自动解析评分结果,判断是否通过质量门槛
    • 智能改进 (IntelligentImproveExecutor)
      • 根据质检报告中的问题列表,AI 针对性优化回复内容
      • 礼貌度低 → 增加感谢语和尊称
      • 准确性低 → 补充具体解决方案和时间
      • 合规性问题 → 移除敏感词,规范表述

    技术亮点:

    • 使用结构化输出 (GetResponseAsync) 确保类型安全和自动反序列化

    • 渐进式生成策略:第一次故意生成简化版本(简短、缺少礼貌用语),确保触发循环改进

    • 高阈值质检:礼貌度≥85、准确性≥90,配合初始低质量输出,体现多次迭代价值

    • 上下文传递:通过 WorkflowContext 保存改进后的内容

    • 迭代优化:每次改进都基于上一次的质检反馈

internal enum QCSignal
{
    Init,
    Revise
}

internal record ReplyDraft(string TicketId, string Content, int Attempt);
internal record QualityReport(string TicketId, int PolitenessScore, int AccuracyScore, bool CompliancePassed, IReadOnlyList<QualityIssue> Issues);
internal record TicketOutcome(string TicketId, string Status, int Attempts, QualityReport FinalReport);

internal sealed class BaselineReplyDraftExecutor : Executor<QCSignal>
{
    private readonly TicketRequest _ticket;
    private readonly IChatClient _chatClient;

    public BaselineReplyDraftExecutor(TicketRequest ticket, IChatClient chatClient) : base("BaselineReplyDraft")
    {
        _ticket = ticket;
        _chatClient = chatClient;
    }
    
    public override async ValueTask HandleAsync(QCSignal message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        int attempt = await context.ReadOrInitStateAsync("attempt", () => 0, cancellationToken);
        attempt++;
        await context.QueueStateUpdateAsync("attempt", attempt, cancellationToken);
    
        var prompt = $"""
            你是一位电商客服。请针对以下客户问题生成一条简短回复:
    
            客户问题:{_ticket.Query}
            产品类别:{_ticket.Category}
    
            直接返回回复内容,不要添加任何前缀或说明。
            """;


        var response = await _chatClient.GetResponseAsync(prompt, cancellationToken: cancellationToken);       
        Console.WriteLine($"第 {attempt} 次生成回复草稿完成");
        var content = response.Text ?? "抱歉,我们会尽快处理您的问题。";     
        
        Console.WriteLine($"回复内容:{content}");   
    
        var draft = new ReplyDraft(_ticket.Id, content, attempt);
    
        await context.SendMessageAsync(draft, targetId: "BaselineQualityCheck", cancellationToken);
    }

}

internal sealed class BaselineQualityCheckExecutor : Executor<ReplyDraft>
{
    private readonly int _politenessThreshold;
    private readonly int _accuracyThreshold;
    private readonly IChatClient _chatClient;

    public BaselineQualityCheckExecutor(int politenessThreshold, int accuracyThreshold, IChatClient chatClient) 
        : base("BaselineQualityCheck")
    {
        _politenessThreshold = politenessThreshold;
        _accuracyThreshold = accuracyThreshold;
        _chatClient = chatClient;
    }
    
    public override async ValueTask HandleAsync(ReplyDraft draft, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        int attempt = await context.ReadOrInitStateAsync("attempt", () => 1, cancellationToken);
        
        // 使用 AI 进行多维度质检评分(结构化输出,严格标准)
        var prompt = $"""
        你是一位严格的客服质检专家。请对以下客服回复进行多维度评分(0-100分):
    
        回复内容:{draft.Content}
    
        评分维度和严格标准:
        1. 礼貌度(0-100):必须包含称呼语(您、亲)、感谢语(感谢、谢谢)、结束语(祝、期待)等,语气亲和温暖
           - 缺少任何一项扣20分
           - 语气生硬、机械扣10-30分
        
        2. 准确性(0-100):必须提供具体的解决方案、明确的处理时间或有效的后续步骤
           - 只说"会处理"但无具体方案扣30分
           - 无明确时间承诺扣20分
           - 信息过于笼统扣10-20分
        
        3. 合规性(通过/不通过):不得包含敏感词、不当表述、推诿责任的话语
           - 发现任何敏感词或不当表述直接判定为"不通过"
    
        请对每个维度进行严格评分,并在issues字段中列出所有发现的问题。
        """;
    
        // 使用结构化输出:GetResponseAsync<T> 自动生成 JSON Schema 并反序列化
        var response = await _chatClient.GetResponseAsync<QualityReportDto>(prompt, cancellationToken: cancellationToken);
        var reportDto = response.Result;
    
        // 转换为业务模型
        var issues = reportDto.Issues.Select(i => new QualityIssue(i.Type, i.Description, i.ScoreImpact)).ToList();
        var report = new QualityReport(draft.TicketId, reportDto.PolitenessScore, reportDto.AccuracyScore, reportDto.CompliancePassed, issues);
    
        await context.AddEventAsync(new BaselineQualityScoreEvent(draft.TicketId, attempt, reportDto.PolitenessScore, reportDto.AccuracyScore, reportDto.CompliancePassed), cancellationToken);
    
        await context.QueueStateUpdateAsync("attempt", ++attempt, cancellationToken);
    
        if (reportDto.PolitenessScore >= _politenessThreshold && reportDto.AccuracyScore >= _accuracyThreshold && reportDto.CompliancePassed)
        {
            await context.YieldOutputAsync(new TicketOutcome(draft.TicketId, "Approved", attempt, report), cancellationToken);
        }
        else
        {
            await context.SendMessageAsync(QCSignal.Revise, targetId: "BaselineReplyDraft", cancellationToken);
        }
    }

}

internal sealed class BaselineQualityScoreEvent : WorkflowEvent
{
    public BaselineQualityScoreEvent(string ticketId, int attempt, int politenessScore, int accuracyScore, bool compliancePassed)
        : base(new { TicketId = ticketId, Attempt = attempt, PolitenessScore = politenessScore, AccuracyScore = accuracyScore, CompliancePassed = compliancePassed })
    {
    }
}

Console.WriteLine("基础循环执行器定义完成(已集成 AI 结构化输出 + 严格质检标准)");
  • 步骤 3:构建 Workflow 并验证基础 Loop,使用 WorkflowBuilder 将两个步骤连接成闭环,并运行单次请求,观察得分随循环变化。
// 获取 AI 客户端
var targetTicket = ticketRequests.First();
var draftExecutor = new BaselineReplyDraftExecutor(targetTicket, chatClient);
var qcExecutor = new BaselineQualityCheckExecutor(politenessThreshold, accuracyThreshold, chatClient);

var baselineWorkflow = new WorkflowBuilder(draftExecutor)
    .AddEdge(draftExecutor, qcExecutor)
    .AddEdge(qcExecutor, draftExecutor)
    .WithOutputFrom(qcExecutor)
    .Build();

await using (var baselineRun = await InProcessExecution.StreamAsync(baselineWorkflow, QCSignal.Init)){
var scoreTimeline = new List<object>();

await foreach (var evt in baselineRun.WatchStreamAsync())
{
    // 强制中断(最多5次尝试)
    if (scoreTimeline.Count == 5)
    {
        Console.WriteLine("强制中断工作流执行(已完成5次评估)");
        break;
    }
    switch (evt)
    {
        case BaselineQualityScoreEvent scoreEvent:
            dynamic payload = scoreEvent.Data!;
            scoreTimeline.Add(new { 
                尝试次数 = payload.Attempt, 
                礼貌度 = payload.PolitenessScore,
                准确性 = payload.AccuracyScore,
                合规性 = payload.CompliancePassed ? "✅" : "❌"
            });
            Console.WriteLine($"AI 质检结果 => 礼貌度:{payload.PolitenessScore} 准确性:{payload.AccuracyScore} 合规性:{(payload.CompliancePassed ? "通过" : "不通过")}");
            break;
        case WorkflowOutputEvent outputEvent:
            Console.WriteLine("工作流完成");
            outputEvent.Data.Display();
            break;
    }
}
  • 步骤 4:扩展自我修正节点

    为了解决"质检发现问题但无修复动作"的缺陷,我们新增 IntelligentImproveExecutor:

    • 根据质检报告的 Issues 列表,针对性改进回复内容
    • 礼貌度问题 → 优化语气,添加称呼和感谢语
    • 准确性问题 → 补充具体信息和解决方案
    • 合规性问题 → 移除敏感词,规范表述
    • 将改进后的草稿反馈给质检节点,形成更智能的 Loop
internal sealed class AdaptiveReplyDraftExecutor : Executor<QCSignal>
{
  private readonly TicketRequest _ticket;
  private readonly IChatClient _chatClient;

-   public AdaptiveReplyDraftExecutor(TicketRequest ticket, IChatClient chatClient) : base("AdaptiveReplyDraft")
    {
        _ticket = ticket;
        _chatClient = chatClient;
    }

    public override async ValueTask HandleAsync(QCSignal message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        int attempt = await context.ReadOrInitStateAsync("attempt", () => 0, cancellationToken);
        attempt++;
        await context.QueueStateUpdateAsync("attempt", attempt, cancellationToken);

        // 使用 AI 生成客服回复(渐进式生成策略)
        var prompt = attempt == 1 
            ? $"""
            你是一位电商客服。请针对以下客户问题生成一条简短回复(刻意保持简短、缺少礼貌用语):
      
            客户问题:{_ticket.Query}
            产品类别:{_ticket.Category}
      
            要求:
            1. 只用1-2句话回答,不要称呼语和感谢语
            2. 只说结论,不提供具体处理时间
            3. 字数控制在30字以内
      
            直接返回回复内容,不要添加任何前缀或说明。
            """
            : $"""
            你是一位专业的电商客服。请针对以下客户问题生成一条改进后的回复:
      
            客户问题:{_ticket.Query}
            产品类别:{_ticket.Category}
            优先级:{_ticket.Priority}
      
            要求:
            1. 语气亲和、专业,使用恰当的称呼和感谢语
            2. 提供具体的解决方案或处理时间
            3. 符合客服规范,不包含敏感词
            4. 字数控制在80-100字
      
            直接返回回复内容,不要添加任何前缀或说明。
            """;
      
        var response = await _chatClient.GetResponseAsync(prompt, cancellationToken: cancellationToken);
        var content = response.Text ?? "抱歉,我们会尽快处理您的问题。";
      
        Console.WriteLine($"第 {attempt} 次生成回复草稿 (策略: {(attempt == 1 ? "简化版" : "完整版")})");
        Console.WriteLine($"回复内容:{content}");
      
        var draft = new ReplyDraft(_ticket.Id, content, attempt);
        await context.SendMessageAsync(draft, targetId: "AdaptiveQualityCheck", cancellationToken);

    }
  }

internal sealed class IntelligentImproveExecutor : Executor<QualityReport>
{
    private readonly TicketRequest _ticket;
    private readonly IChatClient _chatClient;

    public IntelligentImproveExecutor(TicketRequest ticket, IChatClient chatClient) : base("IntelligentImprove")
    {
        _ticket = ticket;
        _chatClient = chatClient;
    }
    
    public override async ValueTask HandleAsync(QualityReport report, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        int attempt = await context.ReadOrInitStateAsync("attempt", () => 1, cancellationToken);
    
        // 构建改进提示词,基于质检反馈
        var issuesSummary = string.Join("\n", report.Issues.Select(i => $"- {i.Type}: {i.Description}"));
        
        var prompt = $"""
        你是一位客服优化专家。请根据以下质检反馈,改进客服回复内容:
    
        原始问题:{_ticket.Query}
        产品类别:{_ticket.Category}
        优先级:{_ticket.Priority}
    
        当前评分:
        - 礼貌度:{report.PolitenessScore}/100 (要求≥{politenessThreshold})
        - 准确性:{report.AccuracyScore}/100 (要求≥{accuracyThreshold})
        - 合规性:{(report.CompliancePassed ? "通过" : "不通过")}
    
        发现的问题:
        {issuesSummary}
    
        请生成一条改进后的客服回复,针对性解决上述问题:
        1. 如果礼貌度不足,增加称呼语、感谢语,使用更亲和的表述
        2. 如果准确性不足,补充具体的解决方案、处理时间、后续步骤
        3. 如果合规性不通过,移除敏感词,规范表述
        4. 字数控制在80-100字
    
        直接返回改进后的回复内容,不要添加任何前缀或说明。
        """;
    
        var response = await _chatClient.GetResponseAsync(prompt, cancellationToken: cancellationToken);
        var improvedContent = response.Text ?? "抱歉,我们会尽快处理您的问题。";
    
        await context.AddEventAsync(new LoopProgressEvent(_ticket.Id, attempt, report.PolitenessScore, report.AccuracyScore, report.CompliancePassed, "Improve"), cancellationToken);
        Console.WriteLine($"第 {attempt} 次智能改进完成");
        Console.WriteLine($"改进后内容:{improvedContent}");
    
        // 触发下一次生成(使用改进后的内容作为上下文)
        await context.SendMessageAsync(QCSignal.Revise, targetId: "AdaptiveReplyDraft", cancellationToken);
    }

}

internal sealed class LoopProgressEvent : WorkflowEvent
{
    public LoopProgressEvent(string ticketId, int attempt, int politenessScore, int accuracyScore, bool compliancePassed, string stage)
        : base(new { ticketId, attempt, politenessScore, accuracyScore, compliancePassed, stage })
    {
    }
}

Console.WriteLine("自适应生成器和智能改进器定义完成");

internal sealed class AdaptiveQualityCheckExecutor : Executor<ReplyDraft>
{
    private readonly int _politenessThreshold;
    private readonly int _accuracyThreshold;
    private readonly int _maxAttempts;
    private readonly IChatClient _chatClient;

    // 默认最大尝试次数为5次
    public AdaptiveQualityCheckExecutor(int politenessThreshold, int accuracyThreshold, IChatClient chatClient) 
        : this(politenessThreshold, accuracyThreshold, 5, chatClient)
    {
    }
    
    public AdaptiveQualityCheckExecutor(int politenessThreshold, int accuracyThreshold, int maxAttempts, IChatClient chatClient) 
        : base("AdaptiveQualityCheck")
    {
        _politenessThreshold = politenessThreshold;
        _accuracyThreshold = accuracyThreshold;
        _maxAttempts = maxAttempts;
        _chatClient = chatClient;
    }
    
    public override async ValueTask HandleAsync(ReplyDraft draft, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        int attempt = await context.ReadOrInitStateAsync("attempt", () => 1, cancellationToken);
        
        // 使用 AI 进行多维度质检评分(结构化输出,严格标准)
        var prompt = $"""
        你是一位严格的客服质检专家。请对以下客服回复进行多维度评分(0-100分):
    
        回复内容:{draft.Content}
    
        评分维度和严格标准:
        1. 礼貌度(0-100):必须包含称呼语(您、亲)、感谢语(感谢、谢谢)、结束语(祝、期待)等,语气亲和温暖
           - 缺少任何一项扣20分
           - 语气生硬、机械扣10-30分
        
        2. 准确性(0-100):必须提供具体的解决方案、明确的处理时间或有效的后续步骤
           - 只说"会处理"但无具体方案扣30分
           - 无明确时间承诺扣20分
           - 信息过于笼统扣10-20分
        
        3. 合规性(通过/不通过):不得包含敏感词、不当表述、推诿责任的话语
           - 发现任何敏感词或不当表述直接判定为"不通过"
    
        请对每个维度进行严格评分,并在issues字段中列出所有发现的问题。
        """;
    
        // 使用结构化输出:GetResponseAsync<T> 自动生成 JSON Schema 并反序列化
        var response = await _chatClient.GetResponseAsync<QualityReportDto>(prompt, cancellationToken: cancellationToken);
        var reportDto = response.Result;
    
        // 转换为业务模型
        var issues = reportDto.Issues.Select(i => new QualityIssue(i.Type, i.Description, i.ScoreImpact)).ToList();
        var report = new QualityReport(draft.TicketId, reportDto.PolitenessScore, reportDto.AccuracyScore, reportDto.CompliancePassed, issues);
    
        await context.AddEventAsync(new AdaptiveQualityScoreEvent(draft.TicketId, attempt, reportDto.PolitenessScore, reportDto.AccuracyScore, reportDto.CompliancePassed), cancellationToken);
    
        if (reportDto.PolitenessScore >= _politenessThreshold && reportDto.AccuracyScore >= _accuracyThreshold && reportDto.CompliancePassed)
        {
            await context.YieldOutputAsync(new TicketOutcome(draft.TicketId, "Approved", attempt, report), cancellationToken);
        }
        else if (attempt >= _maxAttempts)
        {
            await context.AddEventAsync(new AdaptiveMaxAttemptsReachedEvent(draft.TicketId, _maxAttempts), cancellationToken);
            await context.RequestHaltAsync();//质检失败:已达到最大尝试次数
        }
        else
        {
            await context.QueueStateUpdateAsync("attempt", attempt + 1, cancellationToken);
            
            // 发送质检报告到改进环节
            await context.SendMessageAsync(report, targetId: "IntelligentImprove", cancellationToken);
        }
    }

}

internal sealed class AdaptiveQualityScoreEvent : WorkflowEvent
{
    public AdaptiveQualityScoreEvent(string ticketId, int attempt, int politenessScore, int accuracyScore, bool compliancePassed)
        : base(new { TicketId = ticketId, Attempt = attempt, PolitenessScore = politenessScore, AccuracyScore = accuracyScore, CompliancePassed = compliancePassed })
    {
    }
}

internal sealed class AdaptiveMaxAttemptsReachedEvent : WorkflowEvent
{
    public AdaptiveMaxAttemptsReachedEvent(string ticketId, int maxAttempts)
        : base(new { TicketId = ticketId, MaxAttempts = maxAttempts })
    {
    }
}

Console.WriteLine("自适应质检执行器定义完成(已集成 AI 结构化输出 + 严格质检标准)");
  • 步骤 5:运行自我修正 Loop(流式监控)

    使用 RunStreamingAsync 获取实时事件,区分 Review 阶段与 AutoFix 阶段,输出更丰富的轨迹信息。

var adaptiveDraft = new AdaptiveReplyDraftExecutor(targetTicket, chatClient);
var adaptiveQC = new AdaptiveQualityCheckExecutor(politenessThreshold, accuracyThreshold, chatClient);
var intelligentImprove = new IntelligentImproveExecutor(targetTicket, chatClient);

var adaptiveWorkflow = new WorkflowBuilder(adaptiveDraft)
    .AddEdge(adaptiveDraft, adaptiveQC)
    .AddEdge(adaptiveQC, intelligentImprove)
    .AddEdge(intelligentImprove, adaptiveDraft)
    .WithOutputFrom(adaptiveQC)
    .Build();

await using (var adaptiveRun = await InProcessExecution.StreamAsync(adaptiveWorkflow, QCSignal.Init)){
var adaptiveTimeline = new List<object>();

await foreach (var evt in adaptiveRun.WatchStreamAsync())
{
    switch (evt)
    {
        case AdaptiveQualityScoreEvent scoreEvent:
            dynamic scoreData = scoreEvent.Data!;
            adaptiveTimeline.Add(new { 
                工单 = scoreData.TicketId, 
                尝试 = scoreData.Attempt, 
                礼貌度 = scoreData.PolitenessScore,
                准确性 = scoreData.AccuracyScore,
                合规 = scoreData.CompliancePassed ? "✅" : "❌",
                阶段 = "质检"
            });
            Console.WriteLine($"AI 质检结果 => 礼貌度:{scoreData.PolitenessScore} 准确性:{scoreData.AccuracyScore} 合规性:{(scoreData.CompliancePassed ? "通过" : "不通过")}");
            break;
        case WorkflowOutputEvent outputEvent:
            Console.WriteLine("AI 自我修正流程结束");
            outputEvent.Data.Display();
            break;
    }
}

4. 高级场景与最佳实践

  • Loop 设计守则

    • 明确退出条件:结合最大尝试次数、多维度阈值、人工信号,避免无限循环。

    • 多维度质检:礼貌度、准确性、合规性等多个指标独立评估,全部达标才通过

    • 状态最小可见:使用 ReadOrInitStateAsync + QueueStateUpdateAsync 将指标限定在必要范围,防止脏读。

    • 阶段事件:自定义 LoopProgressEvent,方便前端 UI 或监控系统订阅。

    • 转人工兜底:超过最大尝试次数后,通过 RequestHaltAsync 触发人工介入。

  • 常见错误与解决

    • 忘记清理状态:循环结束后可调用 QueueClearScopeAsync 清理历史,避免下一次 run 复用旧数据。

    • 类型不匹配:Edge 之间的消息类型必须与 Executor 泛型对应,必要时添加 Adapter。

    • 重复触发 Halt:RequestHaltAsync 只在关键节点调用一次,防止产生过多暂停事件。

    • 质检标准不合理:阈值设置过高导致永远无法通过,过低则失去质量保障意义。

  • 性能优化建议

    • 并行质检:可在 Loop 内拆分礼貌度/准确性/合规性多条并行链路,然后使用 Fan-In 汇总。

    • 缓存改进策略:若相同问题类型多次出现,可在状态中缓存改进模板以加速循环。

    • 指标上报:结合 OpenTelemetry 记录 Loop 时长、成功率、转人工率。

    • 自适应阈值:根据历史数据动态调整质检标准,平衡效率与质量。

  • AI 评分最佳实践

    • 结构化输出:使用 JSON 格式确保 AI 返回可解析的结果,避免自由文本。

    • Prompt 工程:明确评分标准和范围(0-100),提供评分示例。

    • 异常处理:JSON 解析失败时使用默认评分,记录日志便于排查。

    • 成本控制:评估 Token 消耗,考虑使用更小的模型(如 gpt-4o-mini)进行质检。

    • 一致性保障:在 Prompt 中强调评分标准的一致性,避免同样内容得分波动过大。

    • 上下文传递:将改进历史传递给 AI,让后续改进更有针对性。

五、并行工作流

1. 业务场景:电商多平台价格监控与决策系统

  • 背景:跨境电商公司需要实时监控同一商品在亚马逊、eBay、Shopee等多个平台的定价策略,在检测到竞争对手降价时快速做出响应决策。
  • 角色:Amazon价格Agent、eBay价格Agent、Shopee价格Agent、定价策略聚合Executor。
  • 挑战:
    • 多平台API查询需要在3秒内全部返回(串行执行需10+秒);
    • 每个平台的数据格式不同,需要标准化处理;
    • 聚合后需要给出智能调价建议,而非简单罗列数据。
  • 目标:构建 Fan-out(并发查询)+ Fan-in(策略汇总)工作流,实现一次查询、并行抓取、智能决策的企业级模式。

2. 核心概念速览

概念 中文释义 在并发模式中的作用
Fan-out Edge 并发分支 将商品查询请求广播给多个平台Agent
Fan-in Edge 汇聚边 收集所有平台价格数据,交由策略Executor分析
ChatClientAgent LLM 执行器 模拟平台API返回,生成价格与库存信息
Aggregation Executor 聚合执行器 组合多平台数据,计算最优定价策略
Streaming Events 流式事件 实时监控各平台查询状态,快速定位超时

3. 实践流程

  • 步骤 1:建模业务问题与输入数据

    先把商品查询请求抽象成 PriceQuery,确保 Fan-out 阶段广播的是结构化数据,包含商品ID、区域等关键参数。

internal record PriceQuery(string ProductId, string ProductName, string TargetRegion);

var priceQuery = new PriceQuery(
    ProductId: "IPHONE15-PRO-256",
    ProductName: "iPhone 15 Pro 256GB",
    TargetRegion: "US"
);
  • 步骤 2:定义 Agent 阵列与自定义 Executor

    • PlatformPriceExecutor 封装平台查询逻辑,模拟各平台API响应;

    • PriceQueryStartExecutor 负责广播查询请求并发放 TurnToken;

    • PricingStrategyExecutor 做 Fan-in 汇总,生成智能定价建议报告。

// 定义自定义 Executor 包装 Agent 功能
internal sealed class PlatformPriceExecutor(string id, IChatClient chatClient, string platformInstructions) : Executor<ChatMessage>(id)
{
    private readonly IChatClient _chatClient = chatClient;
    private readonly string _instructions = platformInstructions;

    public override async ValueTask HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        var messages = new List<ChatMessage>
        {
            new(ChatRole.System, _instructions),
            message
        };
    
        var response = await _chatClient.GetResponseAsync(messages, cancellationToken: cancellationToken);
        var replyMessage = new ChatMessage(ChatRole.Assistant, response.Text ?? string.Empty)
        {
            AuthorName = this.Id
        };
        
        await context.SendMessageAsync(replyMessage, cancellationToken: cancellationToken);
        Console.WriteLine($"{this.Id} 完成查询");
    }

}

var amazonExecutor = new PlatformPriceExecutor(
    "AmazonPriceAgent",
    chatClient,
    "你是Amazon平台价格查询Agent。返回格式:价格=$XXX,库存状态=充足/紧张,配送说明=Prime会员免运费/标准配送。"
);

var ebayExecutor = new PlatformPriceExecutor(
    "eBayPriceAgent",
    chatClient,
    "你是eBay平台价格查询Agent。返回格式:价格=$XXX,商品状态=全新/二手XX新,运费说明=包邮/买家承担。"
);

var shopeeExecutor = new PlatformPriceExecutor(
    "ShopeePriceAgent",
    chatClient,
    "你是Shopee平台价格查询Agent。返回格式:价格=$XXX(含税),区域=东南亚/台湾,促销信息=满减活动/无。"
);

internal sealed class PriceQueryStartExecutor() : Executor<PriceQuery>(nameof(PriceQueryStartExecutor))
{
    public override async ValueTask HandleAsync(PriceQuery query, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        var userPrompt = $@"商品ID: {query.ProductId}
商品名称: {query.ProductName}
目标区域: {query.TargetRegion}

请查询该商品在你的平台上的当前价格、库存状态和配送信息。";
        await context.SendMessageAsync(new ChatMessage(ChatRole.User, userPrompt), cancellationToken: cancellationToken);
        await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken: cancellationToken);
        Console.WriteLine("Fan-out 价格查询广播已发送");
    }
}

// 修改为接收单个 ChatMessage,而不是 List<ChatMessage>
internal sealed class PricingStrategyExecutor(int targetCount) : Executor<ChatMessage>(nameof(PricingStrategyExecutor))
{
    private readonly List<ChatMessage> _messages = [];
    private readonly int _targetCount = targetCount;

    public override async ValueTask HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._messages.Add(message);
        Console.WriteLine($"已收集 {_messages.Count}/{_targetCount} 个平台数据 - 来自 {message.AuthorName}");
    
        if (this._messages.Count == this._targetCount)
        {
            var platformData = string.Join("\n", this._messages.Select(m => $"• {m.AuthorName}: {m.Text}"));
            var strategyReport = $@"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

多平台价格汇总(共 {this._messages.Count} 个平台)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

{platformData}

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
智能定价建议
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
基于以上数据,建议分析竞争对手价格区间,制定差异化定价策略。
考虑因素:库存压力、配送成本、平台佣金率、目标利润率。";
            

            await context.YieldOutputAsync(strategyReport, cancellationToken);
            Console.WriteLine("Fan-in 定价策略生成完成");
        }
    }
}
  • 步骤 3:构建 Fan-out / Fan-in 工作流

    使用 WorkflowBuilder 将广播、并发 Agent、聚合三个阶段串联成完整拓扑,并指定输出来自聚合 Executor。

public Workflow GetWorkflow()
{
    var startExecutor = new PriceQueryStartExecutor();
    var strategyExecutor = new PricingStrategyExecutor(3);

    var priceMonitorWorkflow = new WorkflowBuilder(startExecutor)
        .AddFanOutEdge(startExecutor, [amazonExecutor, ebayExecutor, shopeeExecutor])
        .AddFanInEdge([amazonExecutor, ebayExecutor, shopeeExecutor], strategyExecutor)
        .WithOutputFrom(strategyExecutor)
        .Build();
    
    Console.WriteLine("价格监控 Workflow 构建完成");
    return priceMonitorWorkflow;
}
  • 步骤 4:Execute vs Stream,观察并发执行细节
  • 先执行一次非流式任务获取最终报告,再切换到流式模式及时感知每个 Executor 的生命周期事件。
flowchart TD
  PriceQueryStartExecutor["PriceQueryStartExecutor (Start)"];
  AmazonPriceAgent["AmazonPriceAgent"];
  eBayPriceAgent["eBayPriceAgent"];
  ShopeePriceAgent["ShopeePriceAgent"];
  PricingStrategyExecutor["PricingStrategyExecutor"];

  fan_in_PricingStrategyExecutor_719E34D9((fan-in))
  AmazonPriceAgent --> fan_in_PricingStrategyExecutor_719E34D9;
  ShopeePriceAgent --> fan_in_PricingStrategyExecutor_719E34D9;
  eBayPriceAgent --> fan_in_PricingStrategyExecutor_719E34D9;
  fan_in_PricingStrategyExecutor_719E34D9 --> PricingStrategyExecutor;
  PriceQueryStartExecutor --> AmazonPriceAgent;
  PriceQueryStartExecutor --> eBayPriceAgent;
  PriceQueryStartExecutor --> ShopeePriceAgent;
var pricingReport = await InProcessExecution.RunAsync(priceMonitorWorkflow, priceQuery);
new
{
    priceQuery.ProductName,
    priceQuery.TargetRegion,
    定价策略报告 = pricingReport
}.Display();

var status =  await pricingReport.GetStatusAsync();

await pricingReport.DisposeAsync();

Console.WriteLine("StreamAsync:实时监控平台查询进度");
var priceMonitorWorkflow = GetWorkflow();
await using (var streamingRun = await InProcessExecution.StreamAsync(priceMonitorWorkflow, priceQuery))
{
    await foreach (WorkflowEvent evt in streamingRun.WatchStreamAsync())
    {
        switch (evt)
        {
            case ExecutorInvokedEvent started:
                Console.WriteLine($"{started.ExecutorId} 启动");
                break;
            case ExecutorCompletedEvent completed:
                Console.WriteLine($"{completed.ExecutorId} 完成");
                break;
            case WorkflowOutputEvent output:
                Console.WriteLine("Fan-in 汇总输出:");
                Console.WriteLine(output.Data);
                break;
        }
    }
    await streamingRun.DisposeAsync();
}
Console.WriteLine("并发执行演示完成");

4. 高级实践与最佳实践

  • 性能监控:在聚合 Executor 中追加 Stopwatch,记录最慢平台API响应时间,作为SLA监控依据。
  • 动态平台:agentRoster 可根据商品类目动态调整,例如3C产品加入京东Agent,服装类加入Zalando Agent。
  • 降级策略:搭配自我修正模式,当某个平台API超时时可使用缓存数据或标记为"暂无数据"。
  • 实时告警:streamingRun 输出可接入监控系统,当检测到竞对降价超过5%时触发Slack/钉钉通知。
  • 缓存优化:为高频查询商品设置Redis缓存(TTL=30秒),减少API调用成本。

六、多选工作流

1. 业务场景:内容多渠道分发引擎

  • 背景:营销团队需要根据内容长度、风险级别、语言类型,将一份稿件同时推送到不同媒介。
  • 角色:分类器 Executor、长文渠道、短文渠道、合规审核。
  • 挑战:
    • 一个决策可能需要同时推送多个渠道;
    • 合规风险需要额外介入,但不能阻塞其他渠道;
    • 必须记录每次路由的去向,方便 BI 分析。
  • 目标:构建 Multi-Selection 工作流,让分类器一次决策即可触发多个下游执行器。

2. 核心概念

概念 中文释义 在 Multi-Selection 中的作用
Target Assigner 目标分配器 返回需要触发的下游索引集合
Partition Fan-out 分区式并发 按照 IEnumerable 结果动态决定分支
Shared Output 多路输出 可以一次返回多个渠道的执行结果
Conditional Logging 条件日志 记录每次路由路径,支持审计

3. 实践流程

  • 步骤 1:建模内容请求与样本数据
internal record ContentSubmission(string Id, string Title, string Category, int Length, bool ContainsRisk, string Language);
internal sealed class DistributionPlan
{
    public required string SubmissionId { get; init; }
    public bool PublishToLongForm { get; init; }
    public bool PublishToShortForm { get; init; }
    public bool EscalateToModerator { get; init; }
    public string Reason { get; init; } = string.Empty;
}

ContentSubmission[] submissions =
{
    new("CNT-1001", "6月能耗报告", "Operations", 1200, false, "zh-CN"),
    new("CNT-1002", "618 预热短文案", "Marketing", 260, false, "zh-CN"),
    new("CNT-1003", "跨境补贴政策说明", "Compliance", 620, true, "en-US")
};
  • 步骤 2:定义分类器与渠道 Executor

    为演示重点放在路由逻辑,此处分类器使用简单规则(长度/风险),生产环境可替换为多 Agent 推理或参考源码中的 JSON Schema LLM。

using System.Threading;

internal sealed class ContentClassifierExecutor() : Executor<ContentSubmission, DistributionPlan>(nameof(ContentClassifierExecutor))
{
    public override ValueTask<DistributionPlan> HandleAsync(ContentSubmission submission, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        bool publishLong = submission.Length > 600;
        bool publishShort = submission.Length <= 600 || submission.Category is "Marketing";
        bool needModerator = submission.ContainsRisk || submission.Category is "Compliance";

        return ValueTask.FromResult(new DistributionPlan
        {
            SubmissionId = submission.Id,
            PublishToLongForm = publishLong,
            PublishToShortForm = publishShort,
            EscalateToModerator = needModerator,
            Reason = needModerator ? "命中风险或合规模块" : "常规稿件"
        });
    }

}

internal sealed class LongFormChannelExecutor() : Executor<DistributionPlan>(nameof(LongFormChannelExecutor))
{
    public override async ValueTask HandleAsync(DistributionPlan plan, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        await context.YieldOutputAsync($"" + plan.SubmissionId + " 发布到长文渠道", cancellationToken);
    }
}

internal sealed class ShortFormChannelExecutor() : Executor<DistributionPlan>(nameof(ShortFormChannelExecutor))
{
    public override async ValueTask HandleAsync(DistributionPlan plan, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        await context.YieldOutputAsync($"" + plan.SubmissionId + " 发布到短文渠道", cancellationToken);
    }
}

internal sealed class ModeratorSignalExecutor() : Executor<DistributionPlan>(nameof(ModeratorSignalExecutor))
{
    public override async ValueTask HandleAsync(DistributionPlan plan, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        await context.YieldOutputAsync($"{plan.SubmissionId} 触发人工审核:{plan.Reason}", cancellationToken);
    }
}
  • 步骤 3:构建 Multi-Selection 工作流

    关键在于 Target Assigner,根据 DistributionPlan 返回需要触发的目标索引集合。

var classifier = new ContentClassifierExecutor();
var longFormExecutor = new LongFormChannelExecutor();
var shortFormExecutor = new ShortFormChannelExecutor();
var moderatorExecutor = new ModeratorSignalExecutor();

static Func<DistributionPlan?, int, IEnumerable<int>> BuildChannelRouter() => (plan, targetCount) =>
{
    if (plan is null)
    {
        return Array.Empty<int>();
    }

    List<int> targets = [];
    if (plan.PublishToLongForm) targets.Add(0);
    if (plan.PublishToShortForm) targets.Add(1);
    if (plan.EscalateToModerator) targets.Add(2);
    return targets;

};

var multiSelectionWorkflow = new WorkflowBuilder(classifier)
    .AddFanOutEdge(
        classifier,
        [longFormExecutor, shortFormExecutor, moderatorExecutor],
        BuildChannelRouter())
    .WithOutputFrom(longFormExecutor, shortFormExecutor, moderatorExecutor)
    .Build();

Console.WriteLine("Multi-Selection Workflow 构建完成");
flowchart TD
  ContentClassifierExecutor["ContentClassifierExecutor (Start)"];
  LongFormChannelExecutor["LongFormChannelExecutor"];
  ShortFormChannelExecutor["ShortFormChannelExecutor"];
  ModeratorSignalExecutor["ModeratorSignalExecutor"];
  ContentClassifierExecutor --> LongFormChannelExecutor;
  ContentClassifierExecutor --> ShortFormChannelExecutor;
  ContentClassifierExecutor --> ModeratorSignalExecutor;
  • 步骤 4:运行工作流并观察路由结果

    分别演示批量执行与流式监控,确认一条稿件可以同时触发多条通路。

Console.WriteLine("批量执行 3 条稿件");
foreach (var submission in submissions)
{
    await using var run = await InProcessExecution.StreamAsync(multiSelectionWorkflow, submission);
    List<string> outputs = [];

    await foreach (WorkflowEvent evt in run.WatchStreamAsync())
    {
        if (evt is WorkflowOutputEvent outputEvent)
        {
            outputs.Add(outputEvent.Data?.ToString() ?? string.Empty);
        }
    }
    
    new
    {
        submission.Id,
        submission.Category,
        submission.Length,
        发布路径 = outputs
    }.Display();
}

Console.WriteLine("单条稿件的实时事件");
var spotlightSubmission = submissions.Last();
await using (var spotlightRun = await InProcessExecution.StreamAsync(multiSelectionWorkflow, spotlightSubmission)){
    await foreach (WorkflowEvent evt in spotlightRun.WatchStreamAsync())
    {
        switch (evt)
        {
            case ExecutorInvokedEvent started:
                Console.WriteLine($"{started.ExecutorId} 启动");
                break;
            case ExecutorCompletedEvent completed:
                Console.WriteLine($" {completed.ExecutorId} 完成");
                break;
            case WorkflowOutputEvent outputEvent:
                Console.WriteLine($"输出: {outputEvent.Data}");
                break;
        }
    }
}
Console.WriteLine("Multi-Selection 演示结束");

4. 最佳实践

  • 使用 JSON Schema LLM:将 ContentClassifierExecutor 替换为 ChatClientAgent + ChatResponseFormat.ForJsonSchema,提升弹性。
  • 记录路由链路:利用 WorkflowContext.QueueStateUpdateAsync 保留每一步所选通路,方便追溯。
  • 组合其他模式:与并发执行结合,实现“多路选择 + 每路继续并行”复杂场景。
  • 可观测性:将 outputs 写入 Application Insights,构建渠道 KPI 看板。

七、Map Reduce 工作流

1. 业务场景:企业安全白皮书汇总

  • 背景:安全团队每周生成数万字白皮书,需要自动拆分章节、并行摘要并最终生成执行摘要。
  • 角色:拆分 Executor、多个摘要 Agent、共识归并、发布器。
  • 挑战:
    • 文本极长,单次推理成本高;
    • 需要保持段落顺序,避免信息错位;
    • 汇总阶段要补充 KPI 与推荐动作。
  • 目标:用 MapReduce 将巨文拆为可控块,Mapper 并行摘要,Reducer 组合并打上标签。

2. 核心概念

概念 中文释义 在本课的作用
Chunk Splitter 文本拆分器 Map 阶段入口,写入共享状态并广播事件
DocumentSummarizer 摘要执行器 Map 阶段,处理分配到的段落
Consensus Reducer 归并器 Reduce 阶段,收敛所有摘要并排序
Publisher 发布器 将 Reduce 结果格式化输出
Shared State 共享状态 存放段落文本、总段数,用于协调并发

3. 实践流程

  • 步骤 1:准备长文档样本
internal static class DocumentState
{
    public const string Scope = "DocMapReduce";
    public const string TotalChunksKey = "TOTAL_CHUNKS";
}

internal record ChunkEnvelope(string ChunkStateKey, string Text, int Order);
internal sealed class ChunkReadyEvent(string ChunkStateKey, int Order) : WorkflowEvent
{
    public string ChunkStateKey { get; } = ChunkStateKey;
    public int Order { get; } = Order;
}
internal sealed class ChunkSummaryCompletedEvent(int Order, string Summary) : WorkflowEvent
{
    public int Order { get; } = Order;
    public string Summary { get; } = Summary;
}
internal sealed class ReduceCompletedEvent(IReadOnlyList<string> Summaries) : WorkflowEvent
{
    public IReadOnlyList<string> Summaries { get; } = Summaries;
}
  • 步骤 2:实现 Map 阶段(拆分 + 摘要)
internal sealed class ChunkSplitterExecutor(string[] summarizerIds) : Executor<string>(nameof(ChunkSplitterExecutor))
{
    private readonly string[] _summarizerIds = summarizerIds;

    public override async ValueTask HandleAsync(string manuscript, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        var paragraphs = manuscript
            .Split(["\r\n\r\n", "\n\n"], StringSplitOptions.RemoveEmptyEntries)
            .Select(p => p.Trim())
            .Where(p => !string.IsNullOrWhiteSpace(p))
            .ToArray();
    
        await context.QueueStateUpdateAsync(DocumentState.TotalChunksKey, paragraphs.Length, scopeName: DocumentState.Scope, cancellationToken);
        Console.WriteLine($"Map 阶段:段落数 = {paragraphs.Length}");
    
        for (int i = 0; i < paragraphs.Length; i++)
        {
            var targetId = this._summarizerIds[i % this._summarizerIds.Length];
            var chunkStateKey = $"chunk_{i}"; // 简化键,避免ID耦合
            var envelope = new ChunkEnvelope(chunkStateKey, paragraphs[i], i);
    
            await context.QueueStateUpdateAsync(chunkStateKey, envelope, scopeName: DocumentState.Scope, cancellationToken);
            Console.WriteLine($"发送 ChunkReadyEvent(order={i}) 到 {targetId}");
            await context.SendMessageAsync(new ChunkReadyEvent(chunkStateKey, i), targetId: targetId, cancellationToken: cancellationToken);
        }
    
        Console.WriteLine($"Map 阶段:已拆分 {paragraphs.Length} 个段落");
    }
}

internal sealed class DocumentSummarizerExecutor(string id, string reducerId) : Executor<ChunkReadyEvent>(id)
{
    private readonly string _reducerId = reducerId;

    public override async ValueTask HandleAsync(ChunkReadyEvent message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"{this.Id} 收到 ChunkReadyEvent(order={message.Order})");
        var envelope = await context.ReadStateAsync<ChunkEnvelope>(message.ChunkStateKey, scopeName: DocumentState.Scope, cancellationToken);
        var summary = Summarize(envelope!.Text);
        await context.SendMessageAsync(new ChunkSummaryCompletedEvent(envelope.Order, summary), targetId: this._reducerId, cancellationToken: cancellationToken);
        Console.WriteLine($"{this.Id} 完成段落 {envelope.Order}");
    }
    
    private static string Summarize(string text)
    {
        var sentences = text.Split(['。', '!', '?'], StringSplitOptions.RemoveEmptyEntries);
        var focus = sentences.Length > 0 ? sentences[0] : text;
        var trimmed = focus.Length > 80 ? focus[..80] + "..." : focus;
        var normalized = trimmed.Replace("\r", string.Empty).Replace("\n", " ").Trim();
        return $"• {normalized}";
    }
}
  • 步骤 3:实现 Reduce & Publish 阶段
internal sealed class ConsensusReducerExecutor(string id, string publisherId) : Executor<ChunkSummaryCompletedEvent>(id)
{
    private readonly SortedDictionary<int, string> _summaries = new();
    private readonly string _publisherId = publisherId;
    private int? _expectedChunks;

    public override async ValueTask HandleAsync(ChunkSummaryCompletedEvent message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        _expectedChunks ??= await context.ReadStateAsync<int>(DocumentState.TotalChunksKey, scopeName: DocumentState.Scope, cancellationToken);
        _summaries[message.Order] = message.Summary;
        Console.WriteLine($"Reduce 进度: {_summaries.Count}/{_expectedChunks}");
    
        if (_expectedChunks.HasValue && _summaries.Count >= _expectedChunks.Value)
        {
            var ordered = _summaries.OrderBy(kvp => kvp.Key).Select(kvp => kvp.Value).ToList();
            await context.SendMessageAsync(new ReduceCompletedEvent(ordered), targetId: this._publisherId, cancellationToken: cancellationToken);
            Console.WriteLine("Reduce 阶段完成");
        }
    }
}

internal sealed class DocumentPublisherExecutor(string id) : Executor<ReduceCompletedEvent>(id)
{
    public override async ValueTask HandleAsync(ReduceCompletedEvent message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        var builder = new StringBuilder();
        builder.AppendLine("《本周安全白皮书摘要》");
        builder.AppendLine(new string('━', 30));
        int order = 1;
        foreach (var summary in message.Summaries)
        {
            builder.AppendLine($"{order++}. {summary}");
        }
        builder.AppendLine(new string('━', 30));
        builder.AppendLine("推荐动作:请检查 OTA 签名、同步 SOC 新规则、与供应链共享告警。");

        await context.YieldOutputAsync(builder.ToString(), cancellationToken);
    }
}
  • 步骤 4:构建 Workflow 并运行

    三组 Map 执行器并行处理段落,Reduce 负责聚合,Publish 统一输出。

string[] summarizerIds = ["mapper_secops", "mapper_rnd", "mapper_compliance"];
var reducerId = "reducer_consensus";
var publisherId = "publisher_whitepaper";

var splitter = new ChunkSplitterExecutor(summarizerIds);
var summarizers = summarizerIds.Select(id => new DocumentSummarizerExecutor(id, reducerId)).ToArray();
var reducer = new ConsensusReducerExecutor(reducerId, publisherId);
var publisher = new DocumentPublisherExecutor(publisherId);

var mapReduceWorkflow = new WorkflowBuilder(splitter)
    .AddFanOutEdge(splitter, [..summarizers])
    .AddFanInEdge([..summarizers], reducer)
    .AddEdge(reducer, publisher)
    .WithOutputFrom(publisher)
    .Build();
flowchart TD
  ChunkSplitterExecutor["ChunkSplitterExecutor (Start)"];
  mapper_secops["mapper_secops"];
  mapper_rnd["mapper_rnd"];
  mapper_compliance["mapper_compliance"];
  reducer_consensus["reducer_consensus"];
  publisher_whitepaper["publisher_whitepaper"];

  fan_in_reducer_consensus_26615B5E((fan-in))
  mapper_compliance --> fan_in_reducer_consensus_26615B5E;
  mapper_rnd --> fan_in_reducer_consensus_26615B5E;
  mapper_secops --> fan_in_reducer_consensus_26615B5E;
  fan_in_reducer_consensus_26615B5E --> reducer_consensus;
  ChunkSplitterExecutor --> mapper_secops;
  ChunkSplitterExecutor --> mapper_rnd;
  ChunkSplitterExecutor --> mapper_compliance;
  reducer_consensus --> publisher_whitepaper;
string manuscript = """
第一段:本周我们在生产环境中检测到三起高危漏洞利用尝试,分别针对身份验证与API速率限制。我们已临时封禁相关IP,并更新WAF规则以缓解风险。

第二段:研发团队完成了对零信任网络访问策略的回顾,新增设备基线检查与会话时长限制,预计下周进入灰度发布阶段。

第三段:合规方面,依据最新的行业标准,我们优化了日志保留周期与隐私数据访问审批流程,减少人工审批时间。

第四段:对外联动方面,已与供应链伙伴同步威胁情报,建议对固件OTA签名策略进行交叉验证,并提升告警共享频率。
""";
Console.WriteLine("长文档样本文本已准备");

await using (var run = await InProcessExecution.StreamAsync(mapReduceWorkflow, manuscript)){
    await foreach (WorkflowEvent evt in run.WatchStreamAsync())
    {
        switch (evt)
        {
            case ExecutorInvokedEvent started:
                Console.WriteLine($"{started.ExecutorId} started");
                break;
            case ExecutorCompletedEvent completed:
                Console.WriteLine($"{completed.ExecutorId} completed");
                break;
            case WorkflowOutputEvent outputEvent:
                Console.WriteLine("最终摘要:");
                Console.WriteLine(outputEvent.Data);
                break;
            default:
                Console.WriteLine($"事件: {evt.GetType().Name}");
                break;
        }
    }
}

Console.WriteLine("MapReduce 演示完成");

4. 最佳实践

  • 共享状态命名空间:使用 Scope 隔离任务,避免与其他 Workflow 相互影响。
  • 持久化中间结果:长文档/大数据场景可参考源码将 Map/Reduce 中间件落地磁盘。
  • LLM 评估:Map 阶段可接入 AIClientHelper.GetDefaultChatClient(),通过 ChatResponseFormat 生成结构化摘要。
  • 可视化:利用 mapReduceWorkflow.ToMermaidString() 输出拓扑,供架构评审使用。

八、子工作流

1. 业务场景:客户投诉智能处理流水线

  • 背景:企业客服中心每天收到大量投诉,需要根据类型(产品质量/物流问题)分发到不同的标准化处理子流程。
  • 角色:投诉分类器 → 产品质量子流程 / 物流问题子流程 → 合规审核 → 情绪评估。
  • 目标:将处理子流程封装为可复用的 Workflow,主流程统一管理路由和审核,所有处理记录写入共享状态。
  • AI 能力:使用真实 AI 生成回复模板、分析投诉情绪、提供处理建议。

2. 核心概念

概念 中文释义 场景价值
Sub-Workflow 子工作流 封装标准化处理流程,多渠道复用
ExecutorBinding 执行器绑定 将子工作流作为单个 Executor 暴露
Conditional Routing 条件路由 根据投诉类型分发到不同子流程
Shared State 共享状态 投诉记录在各执行器间传递和更新
AI Integration AI 集成 真实 AI 生成回复模板和分析建议

3. 实践路径

  • 步骤 1:建模投诉数据与共享状态
// 投诉数据模型
internal record CustomerComplaint(
    string OrderId,
    string CustomerName,
    string ComplaintText,
    DateTime SubmittedAt
);

// 共享状态:投诉处理记录(各执行器会更新此对象)
internal class ComplaintProcessingRecord
{
    public CustomerComplaint Original { get; set; }
    public string Category { get; set; } = "未分类";
    public string Handler { get; set; } = "待分配";
    public List<string> ProcessingSteps { get; set; } = new();
    public string AIGeneratedResponse { get; set; } = "";
    public string ComplianceStatus { get; set; } = "待审核";
    public string SentimentScore { get; set; } = "未评估";
}

var complaint = new CustomerComplaint(
    OrderId: "ORD-2025-8821",
    CustomerName: "张先生",
    ComplaintText: "收到的手机屏幕有明显划痕,要求退货退款",
    SubmittedAt: DateTime.Now
);

var processingRecord = new ComplaintProcessingRecord { Original = complaint };
  • 步骤 2:实现产品质量处理子工作流(评估 → 退换货判定 → AI生成回复)
// === 产品质量子工作流 ===
// 1. 问题评估执行器
internal sealed class ProductEvaluationExecutor() : Executor<ComplaintProcessingRecord, ComplaintProcessingRecord>(nameof(ProductEvaluationExecutor))
{
    public override ValueTask<ComplaintProcessingRecord> HandleAsync(ComplaintProcessingRecord record, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine("正在评估产品质量问题...");
        record.ProcessingSteps.Add("[产品评估] 检测到屏幕外观缺陷,符合质量问题定义");
        record.Handler = "产品质量团队";
        return ValueTask.FromResult(record);
    }
}

// 2. 退换货判定执行器
internal sealed class ReturnPolicyExecutor() : Executor<ComplaintProcessingRecord, ComplaintProcessingRecord>(nameof(ReturnPolicyExecutor))
{
    public override ValueTask<ComplaintProcessingRecord> HandleAsync(ComplaintProcessingRecord record, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine("判定退换货政策...");
        var daysFromOrder = (DateTime.Now - record.Original.SubmittedAt).Days;
        if (daysFromOrder <= 7)
        {
            record.ProcessingSteps.Add("[退换货判定] 符合7天无理由退货政策,批准全额退款");
        }
        else
        {
            record.ProcessingSteps.Add("[退换货判定] 超过退货期限,建议换货或部分补偿");
        }
        return ValueTask.FromResult(record);
    }
}

// 3. AI生成回复执行器
internal sealed class AIResponseGeneratorExecutor(IChatClient chatClient) : Executor<ComplaintProcessingRecord, ComplaintProcessingRecord>(nameof(AIResponseGeneratorExecutor))
{
    public override async ValueTask<ComplaintProcessingRecord> HandleAsync(ComplaintProcessingRecord record, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine("AI 正在生成客户回复...");
        var prompt = $@"你是专业的客服主管。根据以下投诉处理信息,生成一封正式、有同理心的客户回复邮件(150字内):

客户:{record.Original.CustomerName}
订单号:{record.Original.OrderId}
投诉内容:{record.Original.ComplaintText}
处理步骤:
{string.Join("\n", record.ProcessingSteps)}

要求:

1. 表达歉意和理解

2. 说明处理方案

3. 提供后续联系方式

4. 语气真诚、专业";

       var response = await chatClient.GetResponseAsync(prompt, cancellationToken: cancellationToken);
       record.AIGeneratedResponse = response.Text ?? "AI 生成失败";
       record.ProcessingSteps.Add($"[AI 回复] 已生成客户回复模板({record.AIGeneratedResponse.Length}字)");
       return record;

   }
}

var chatClient = AIClientHelper.GetDefaultChatClient();

var productEvalExecutor = new ProductEvaluationExecutor();
var returnPolicyExecutor = new ReturnPolicyExecutor();
var aiResponseExecutor = new AIResponseGeneratorExecutor(chatClient);

var productQualitySubWorkflow = new WorkflowBuilder(productEvalExecutor)
    .AddEdge(productEvalExecutor, returnPolicyExecutor)
    .AddEdge(returnPolicyExecutor, aiResponseExecutor)
    .WithOutputFrom(aiResponseExecutor)
    .Build();

var productQualitySubExecutor = productQualitySubWorkflow.BindAsExecutor("ProductQualitySubWorkflow");
  • 步骤 3:实现物流问题处理子工作流(追踪 → 延迟分析 → AI生成回复)
// === 物流问题子工作流 ===
// 1. 物流追踪执行器
internal sealed class LogisticsTrackingExecutor() : Executor<ComplaintProcessingRecord, ComplaintProcessingRecord>(nameof(LogisticsTrackingExecutor))
{
    public override ValueTask<ComplaintProcessingRecord> HandleAsync(ComplaintProcessingRecord record, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine("正在查询物流信息...");
        record.ProcessingSteps.Add("[物流追踪] 包裹在中转站滞留3天,当前状态:运输中");
        record.Handler = "物流运营团队";
        return ValueTask.FromResult(record);
    }
}

// 2. 延迟分析执行器
internal sealed class DelayAnalysisExecutor() : Executor<ComplaintProcessingRecord, ComplaintProcessingRecord>(nameof(DelayAnalysisExecutor))
{
    public override ValueTask<ComplaintProcessingRecord> HandleAsync(ComplaintProcessingRecord record, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine("分析延迟原因...");
        record.ProcessingSteps.Add("[延迟分析] 延迟原因:暴雨导致道路封闭,预计2天内恢复配送");
        record.ProcessingSteps.Add("[补偿方案] 提供50元优惠券 + 免运费");
        return ValueTask.FromResult(record);
    }
}

var logisticsTrackExecutor = new LogisticsTrackingExecutor();
var delayAnalysisExecutor = new DelayAnalysisExecutor();
var logisticsAIResponseExecutor = new AIResponseGeneratorExecutor(chatClient);

var logisticsSubWorkflow = new WorkflowBuilder(logisticsTrackExecutor)
    .AddEdge(logisticsTrackExecutor, delayAnalysisExecutor)
    .AddEdge(delayAnalysisExecutor, logisticsAIResponseExecutor)
    .WithOutputFrom(logisticsAIResponseExecutor)
    .Build();

var logisticsSubExecutor = logisticsSubWorkflow.BindAsExecutor("LogisticsSubWorkflow");
  • 步骤 4:构建主工作流(分类器 → 条件路由 → 合规审核 → 情绪评估)
// 1. 投诉分类执行器
internal sealed class ComplaintClassifierExecutor() : Executor<ComplaintProcessingRecord, ComplaintProcessingRecord>(nameof(ComplaintClassifierExecutor))
{
    public override ValueTask<ComplaintProcessingRecord> HandleAsync(ComplaintProcessingRecord record, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine("正在分类投诉类型...");
        // 简单规则分类(实际可用 AI 分类)
        if (record.Original.ComplaintText.Contains("划痕") || record.Original.ComplaintText.Contains("质量") || record.Original.ComplaintText.Contains("退货"))
        {
            record.Category = "产品质量";
        }
        else if (record.Original.ComplaintText.Contains("延迟") || record.Original.ComplaintText.Contains("物流") || record.Original.ComplaintText.Contains("配送"))
        {
            record.Category = "物流问题";
        }
        else
        {
            record.Category = "其他";
        }
        record.ProcessingSteps.Add($"[分类器] 投诉类型识别为:{record.Category}");
        return ValueTask.FromResult(record);
    }
}

// 2. 合规审核执行器
internal sealed class ComplianceCheckExecutor() : Executor<ComplaintProcessingRecord, ComplaintProcessingRecord>(nameof(ComplianceCheckExecutor))
{
    public override ValueTask<ComplaintProcessingRecord> HandleAsync(ComplaintProcessingRecord record, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine("合规审核中...");
        var checks = new List<string>();
        if (record.AIGeneratedResponse.Contains("歉意")) checks.Add("包含道歉");
        if (record.AIGeneratedResponse.Length <= 300) checks.Add("字数合规");
        if (!record.AIGeneratedResponse.Contains("法律") && !record.AIGeneratedResponse.Contains("诉讼"))
            checks.Add("无敏感法律词汇");
        

        record.ComplianceStatus = checks.Count >= 2 ? "审核通过" : "需人工复审";
        record.ProcessingSteps.Add($"[合规审核] {record.ComplianceStatus} - {string.Join(", ", checks)}");
        return ValueTask.FromResult(record);
    }

}

// 3. 情绪评估执行器(使用 AI)
internal sealed class SentimentAnalysisExecutor(IChatClient chatClient) : Executor<ComplaintProcessingRecord, ComplaintProcessingRecord>(nameof(SentimentAnalysisExecutor))
{
    public override async ValueTask<ComplaintProcessingRecord> HandleAsync(ComplaintProcessingRecord record, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine("AI 情绪评估中...");
        var prompt = $@"分析以下客服回复的情绪基调,用一个词概括(如:友好、专业、冷淡、热情):

{record.AIGeneratedResponse}

只返回一个词,不要解释。";
        

        var response = await chatClient.GetResponseAsync(prompt, cancellationToken: cancellationToken);
        record.SentimentScore = response.Text?.Trim() ?? "中性";
        record.ProcessingSteps.Add($"[情绪评估] AI 评估语气为:{record.SentimentScore}");
        return record;
    }
}

var classifierExecutor = new ComplaintClassifierExecutor();
var complianceExecutor = new ComplianceCheckExecutor();
var sentimentExecutor = new SentimentAnalysisExecutor(chatClient);

// 主工作流:分类 → 条件路由到子流程 → 合规 → 情绪
var mainWorkflow = new WorkflowBuilder(classifierExecutor)
    .AddEdge<ComplaintProcessingRecord>(classifierExecutor, productQualitySubExecutor, 
        condition: record => record.Category == "产品质量")
    .AddEdge<ComplaintProcessingRecord>(classifierExecutor, logisticsSubExecutor, 
        condition: record => record.Category == "物流问题")
    .AddEdge(productQualitySubExecutor, complianceExecutor)
    .AddEdge(logisticsSubExecutor, complianceExecutor)
    .AddEdge(complianceExecutor, sentimentExecutor)
    .WithOutputFrom(sentimentExecutor)
    .Build();
  • 步骤 5:执行工作流并观察共享状态流转
Console.WriteLine("开始执行投诉处理流水线...");
Console.WriteLine(new string('━', 60));

await using (var streaming = await InProcessExecution.StreamAsync(mainWorkflow, processingRecord))
{
    await foreach (WorkflowEvent evt in streaming.WatchStreamAsync())
    {
        switch (evt)
        {
            case ExecutorInvokedEvent started:
                Console.WriteLine($"\n{started.ExecutorId} 开始执行");
                break;
            case ExecutorCompletedEvent completed when completed.Data is ComplaintProcessingRecord rec:
                Console.WriteLine($"   共享状态更新 → 处理步骤数:{rec.ProcessingSteps.Count}");
                break;
            case WorkflowOutputEvent outputEvt when outputEvt.Data is ComplaintProcessingRecord finalRecord:
                Console.WriteLine("\n" + new string('━', 60));
                Console.WriteLine("投诉处理完成!最终处理记录:\n");
                new {
                    订单号 = finalRecord.Original.OrderId,
                    客户 = finalRecord.Original.CustomerName,
                    投诉类型 = finalRecord.Category,
                    处理团队 = finalRecord.Handler,
                    处理步骤数 = finalRecord.ProcessingSteps.Count,
                    合规状态 = finalRecord.ComplianceStatus,
                    情绪评分 = finalRecord.SentimentScore
                }.Display();
                

                Console.WriteLine("\n处理步骤详情:");
                foreach (var step in finalRecord.ProcessingSteps)
                {
                    Console.WriteLine($"  {step}");
                }
                
                Console.WriteLine("\nAI 生成的客户回复:");
                Console.WriteLine(new string('─', 60));
                Console.WriteLine(finalRecord.AIGeneratedResponse);
                Console.WriteLine(new string('─', 60));
                break;
        }
    }
}

4. 最佳实践

  • 标准化接口:子工作流使用统一的状态对象(如 ComplaintProcessingRecord),便于主流程复用和扩展。
  • 拆分粒度:保持子工作流 3-4 个步骤,聚焦单一职责(产品质量 vs 物流问题)。
  • 共享状态设计:使用引用类型(class)而非值类型(record),确保各执行器修改同一对象。
  • AI 集成策略:将 IChatClient 注入执行器构造函数,便于测试时 Mock。
  • 条件路由:通过 AddEdge 的 condition 参数实现动态分流,避免硬编码。
  • 可观测性:在共享状态中记录 ProcessingSteps,便于审计和调试。
  • 多渠道复用:此子工作流可被"电话客服系统"、"在线聊天机器人"、"邮件处理系统"共同复用。