Spiga

Net企业级AI项目4:NL2SQL

2026-01-31 23:49:16

NL2SQL(Natural Language to SQL) 是一项将人类自然语言问题自动转换为结构化 SQL 查询语句的技术。简单来说,就是让用户用大白话(如“上个月销售额最高的产品是什么?”)直接查询数据库,无需学习 SQL 语法。

今天我们的任务就是给我们的企业智能助理扩展 NL2SQL 的能力。

一、动态数据源架构

1. 面向多数据库源的架构

一家企业会有很多不同的系统,而各个系统所用的数据库会是各种各样的,它们不仅数据库产品不一样,同产品的数据库可能版本也不一样,查询数据的时候可能存在一些差别。

我们智能助理需要拥有以下这些功能:

  • 动态配置多个业务数据库
  • 通过自然语言,分析全局源数据,定位到需要链接的数据库(如 ERP\HR\OA)
  • 最小权限连接上数据库(只读,禁止使用 DDL、DML)
  • 分析目标库内部的表结构信息
  • 方言适配(不同类型的数据库 SQL 语法差异)

ps:只做只读的实现,并不是模型不能生成插入、更新、删除等 SQL,主要是不建议用自然语言来做数据库的操作,主要是:

  • 模型会有幻觉,操作数据库存在风险
  • 无法保证事务
  • 无法触发一些关联任务,比如生成订单后,需要通知扣减库存。如果是直接操作订单表,库存的扣减事件就无法触发

如果需要实现自然语音操作数据,可以使用工具调用或者 MCP 来完成,我们的项目只实现生成 SQL 查询语句。

2. ReAct 认知模型

ReAct 模型就是一个会动脑筋、会动手、还会从错误中学习的超级助理的工作方法!

它的任务就是听懂你的话,然后从数据库里找到答案。它可聪明了,做事非常有条理,就像一个大侦探,每次破案都遵循三个步骤:

  • 思考(Think - 推理)

    当用户说“帮我查一下仓库里还有多少台 iPhone 15?”

    助理就会想一想:“这个问题是什么意思呢?我需要去“库存表”里看看,要找“iPhone 15”的数量”。

  • 行动(Act - 行动)

    想好之后,它就立刻行动起来!也就是生成 SQL 查询语句去查找。

  • 检查(Observe - 观察):行动之后,它会仔细检查结果。比如,它可能发现“生成的 SQL ”写错了,它会进行修正。

总之,当你告诉它要什么,它就用这个“思考-行动”的魔法,自己想办法从数据库里把答案给你找出来。

3. 多态数据库设计

  • 全局数据源元数据注册表:
    • 身份标识与路由
    • 语义描述
    • 连接字符串管理
    • 方言适配
  • 连接隔离和最小权限原则
    • 控制平面(AI系统自身的数据库、读写权限)
    • 数据平面(外部业务数据库)
  • 动态数据库连接工厂
    • 我们使用 Dapper 来实现,因为要支持多态数据库,强类型的 EF Core 不太好实现

4. 实现动态数据源管理

  • 实现领域模型

创建一个名为 Qjy.AICopilot.Core.DataAnalysis 的类库,并创建 BusinessDatabase 实体,用来添加支持的数据库,以及配置数据库链接字符串。

我们项目支持三种数据库类型:PostgreSql、SqlServer 和 MySql,所以我们先创建一个 DbProviderType 枚举,然后创建 BusinessDatabase 实体。

//Qjy.AICopilot.Core.DataAnalysis/Aggregates/BusinessDatabase/DbProviderType.cs
/// <summary>
/// 数据库提供程序类型
/// </summary>
public enum DbProviderType
{
    /// <summary>
    /// PostgreSQL 数据库
    /// </summary>
    PostgreSql = 1,

    /// <summary>
    /// Microsoft SQL Server
    /// </summary>
    SqlServer = 2,

    /// <summary>
    /// MySQL
    /// </summary>
    MySql = 3
}

//Qjy.AICopilot.Core.DataAnalysis/Aggregates/BusinessDatabase/BusinessDatabase.cs
/// <summary>
/// 业务数据库聚合根
/// 代表一个可被AI Agent访问的外部数据源
/// </summary>
public class BusinessDatabase : IAggregateRoot
{
    protected BusinessDatabase() { }

    public BusinessDatabase(string name, string description, string connectionString, DbProviderType provider)
    {
        Id = Guid.NewGuid();
        Name = name;
        Description = description;
        ConnectionString = connectionString;
        Provider = provider;
        IsEnabled = true;
        CreatedAt = DateTime.UtcNow;
    }

    public Guid Id { get; private set; }

    /// <summary>
    /// 数据库标识名称
    /// 用于在多库路由时作为唯一Key
    /// </summary>
    public string Name { get; private set; } = null!;

    /// <summary>
    /// 数据库业务描述 (如: "包含所有销售订单、客户资料及发货记录")
    /// 该字段将被注入到System Prompt中,辅助LLM进行意图路由判断
    /// </summary>
    public string Description { get; private set; } = null!;

    /// <summary>
    /// 连接字符串
    /// </summary>
    public string ConnectionString { get; private set; } = null!;

    /// <summary>
    /// 数据库类型
    /// </summary>
    public DbProviderType Provider { get; private set; }

    /// <summary>
    /// 是否启用
    /// </summary>
    public bool IsEnabled { get; private set; }

    public DateTime CreatedAt { get; private set; }

    /// <summary>
    /// 更新连接信息
    /// </summary>
    public void UpdateConnection(string connectionString, DbProviderType provider)
    {
        ConnectionString = connectionString;
        Provider = provider;
    }

    /// <summary>
    /// 更新描述信息
    /// </summary>
    public void UpdateInfo(string name, string description)
    {
        Name = name;
        Description = description;
    }
}
  • 配置数据库映射关系,然后重新生成数据库
//Qjy.AICopilot.EntityFrameworkCore/Configuration/DataAnalysis/BusinessDatabaseConfiguration.cs
public class BusinessDatabaseConfiguration : IEntityTypeConfiguration<BusinessDatabase>
{
    public void Configure(EntityTypeBuilder<BusinessDatabase> builder)
    {
        builder.ToTable("business_databases");

        builder.HasKey(b => b.Id);
        builder.Property(b => b.Id).HasColumnName("id");

        builder.Property(b => b.Name)
            .IsRequired()
            .HasMaxLength(100)
            .HasColumnName("name");
        
        // 保证名称唯一,便于路由查找
        builder.HasIndex(b => b.Name).IsUnique();

        builder.Property(b => b.Description)
            .IsRequired()
            .HasMaxLength(500)
            .HasColumnName("description");

        builder.Property(b => b.ConnectionString)
            .IsRequired()
            .HasMaxLength(1000)
            .HasColumnName("connection_string");

        builder.Property(b => b.Provider)
            .IsRequired()
            .HasConversion<string>() // 存储枚举字符串,增强可读性
            .HasMaxLength(50)
            .HasColumnName("provider");

        builder.Property(b => b.IsEnabled)
            .IsRequired()
            .HasColumnName("is_enabled");

        builder.Property(b => b.CreatedAt)
            .IsRequired()
            .HasColumnName("created_at");
    }
}

//Qjy.AICopilot.EntityFrameworkCore/AiCopilotDbContext.cs
public DbSet<BusinessDatabase> BusinessDatabases => Set<BusinessDatabase>();

//Qjy.AICopilot.EntityFrameworkCore/DataQueryService.cs
public IQueryable<BusinessDatabase> BusinessDatabases => dbContext.BusinessDatabases.AsNoTracking();

//Qjy.AICopilot.Services.Common/Contracts/IDataQueryService.cs
public IQueryable<BusinessDatabase> BusinessDatabases { get; }
  • 创建 Dapper 基础设施层
    • 在 Infrastructure,创建 Qjy.AICopilot.Dapper 类库项目
    • 添加 Dapper,3种数据库需要的引用包,以及 Hosting 抽象包(用于配置 SQL 安全服务和数据库连接器)。
<ItemGroup>
  <PackageReference Include="Dapper" Version="2.1.66" />
  <PackageReference Include="Microsoft.Data.SqlClient" Version="6.1.3" />
  <PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="10.0.1" />
  <PackageReference Include="MySql.Data" Version="9.5.0" />
  <PackageReference Include="Npgsql" Version="10.0.1" />
</ItemGroup>
  • 实现 SQL 安全验证器
//Qjy.AICopilot.Dapper/ISqlGuardrail.cs
public interface ISqlGuardrail
{
    /// <summary>
    /// 验证SQL语句是否安全
    /// </summary>
    /// <param name="sql">待执行的SQL</param>
    /// <returns>验证结果,包含是否通过及错误信息</returns>
    (bool IsSafe, string? ErrorMessage) Validate(string sql);
}

//Qjy.AICopilot.Dapper/KeywordSqlGuardrail.cs
/// <summary>
/// 基于关键词黑名单的SQL安全服务
/// 注意:这只是第一道防线,不能完全替代数据库层面的权限控制,数据库提供只读帐号
/// </summary>
public class KeywordSqlGuardrail : ISqlGuardrail
{
    // 定义高危操作关键词
    private static readonly string[] ForbiddenKeywords = 
    [
        "DROP", "TRUNCATE", "DELETE", "UPDATE", "INSERT", 
        "ALTER", "GRANT", "REVOKE", "CREATE", "EXEC", "EXECUTE",
        "MERGE", "REPLACE", "UPSERT"
    ];

    public (bool IsSafe, string? ErrorMessage) Validate(string sql)
    {
        if (string.IsNullOrWhiteSpace(sql))
            return (false, "SQL语句为空");

        // 移除注释,防止通过注释绕过检测 (简单的 -- 或 /* */)
        // 生产环境建议使用更完善的 SQL Parser 库进行 AST 分析
        var cleanSql = RemoveComments(sql).ToUpperInvariant();

        foreach (var keyword in ForbiddenKeywords)
        {
            // 使用单词边界匹配,避免误杀 (例如: "SELECT * FROM UPDATE_LOG" 不应被拦截)
            var regex = new Regex($@"\b{keyword}\b", RegexOptions.IgnoreCase);
            // \b{keyword}\b 确保匹配的是完整的单词
            if (regex.IsMatch(cleanSql))
            {
                return (false, $"安全拦截:检测到禁止的关键字 '{keyword}'。Agent 仅允许执行查询操作。");
            }
        }

        // 检查是否包含分号,防止多语句执行注入 (如: SELECT * FROM Users; DROP TABLE Logs)
        // 大多数 ORM 在单次执行中只允许一条语句,但进行显式检查更为安全
        if (cleanSql.Count(c => c == ';') > 1 || (cleanSql.Contains(';') && !cleanSql.TrimEnd().EndsWith(';')))
        {
            return (false, "安全拦截:禁止在单次调用中执行多条 SQL 语句。");
        }

        return (true, null);
    }

    private static string RemoveComments(string sql)
    {
        // 移除 -- 单行注释
        var noSingleLine = Regex.Replace(sql, "--.*", "");
        // 移除 /* */ 多行注释
        var noComments = Regex.Replace(noSingleLine, @"/\*[\s\S]*?\*/", "");
        return noComments;
    }
}
  • 实现数据库连接器
    • 先定义一个接口,提供3个方法(获取数据库连接、获取数据库架构信息,以及执行查询并返回动态列表)
    • 然后提供 Dappar 的接口实现
//Qjy.AICopilot.Services.Common/Contracts/IDatabaseConnector.cs
public interface IDatabaseConnector
{
    /// <summary>
    /// 获取数据库连接(不打开,仅创建对象)
    /// </summary>
    IDbConnection GetConnection(BusinessDatabase database);

    /// <summary>
    /// 获取数据库架构信息(表名、列名等)
    /// </summary>
    Task<IEnumerable<dynamic>> GetSchemaInfoAsync(BusinessDatabase database, CancellationToken cancellationToken = default);

    /// <summary>
    /// 执行查询并返回动态列表
    /// </summary>
    /// <param name="database">目标数据库配置</param>
    /// <param name="sql">SQL语句</param>
    /// <param name="parameters">参数</param>
    /// <param name="cancellationToken">取消令牌</param>
    /// <returns>动态对象列表(IEnumerable of dynamic)</returns>
    Task<IEnumerable<dynamic>> ExecuteQueryAsync(BusinessDatabase database, string sql, object? parameters = null, CancellationToken cancellationToken = default);
}

//Qjy.AICopilot.Dapper/DapperDatabaseConnector.cs
public class DapperDatabaseConnector(ISqlGuardrail sqlGuardrail, ILogger<DapperDatabaseConnector> logger) : IDatabaseConnector
{
    public IDbConnection GetConnection(BusinessDatabase database)
    {
        var connectionString = database.ConnectionString;

        return database.Provider switch
        {
            DbProviderType.PostgreSql => new NpgsqlConnection(connectionString),
            DbProviderType.SqlServer => new SqlConnection(connectionString),
            DbProviderType.MySql => new MySqlConnection(connectionString),
            _ => throw new NotSupportedException($"不支持的数据库提供程序: {database.Provider}")
        };
    }

    public async Task<IEnumerable<dynamic>> GetSchemaInfoAsync(BusinessDatabase database, CancellationToken cancellationToken = default)
    {
        // 获取所有用户表的元数据SQL
        string sql = database.Provider switch
        {
            DbProviderType.PostgreSql => @"
                SELECT table_name, table_schema 
                FROM information_schema.tables 
                WHERE table_schema = 'public' AND table_type = 'BASE TABLE';",

            _ => throw new NotSupportedException("不支持的数据库类型")
        };

        return await ExecuteQueryAsync(database, sql, cancellationToken: cancellationToken);
    }

    public async Task<IEnumerable<dynamic>> ExecuteQueryAsync(BusinessDatabase database, string sql, object? parameters = null, CancellationToken cancellationToken = default)
    {
        // 1. 安全检查
        var guardResult = sqlGuardrail.Validate(sql);
        if (!guardResult.IsSafe)
        {
            logger.LogWarning("SQL安全拦截: {Reason}. SQL: {Sql}", guardResult.ErrorMessage, sql);
            throw new InvalidOperationException(guardResult.ErrorMessage);
        }

        // 2. 创建连接
        using var connection = GetConnection(database);
        
        try
        {
            // 3. 执行查询
            // 使用 CommandDefinition 支持 CancellationToken
            var command = new CommandDefinition(sql, parameters, cancellationToken: cancellationToken);
            
            // Dapper 的 QueryAsync 返回的是 IEnumerable<dynamic>
            // 这对于无法预知列名的动态查询非常合适
            var result = await connection.QueryAsync(command);
            
            return result;
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "在数据库 {DbName} 上执行 SQL 失败。SQL: {Sql}", database.Name, sql);
            throw; // 抛出异常供上层 Agent 捕获并进行自我修正
        }
    }
}
  • 注入 Dappar
//Qjy.AICopilot.Dapper/DependencyInjection.cs
public static class DependencyInjection
{
    public static void AddDapper(this IHostApplicationBuilder builder)
    {
        // 注册 SQL 安全服务
        builder.Services.AddSingleton<ISqlGuardrail, KeywordSqlGuardrail>();
        
        // 注册 数据库连接器
        builder.Services.AddSingleton<IDatabaseConnector, DapperDatabaseConnector>();
    }
}

二、实现数据分析插件

1. 架构设计

我们采用项目中已经提供的插件框架,来实现一个数据分析插件。

  • 认知漏斗模型

    • 先用宽口径扫描(表名+表注释),轻量级的获取表名+注释
    • 然后用窄口径聚焦(3~5候选表的详细结构),重量级的获取表结构(数据库名称、数据表名数组)
    • 最后执行验证
  • Token 阶段策略

    比如一些表存在 nvchar(max) 这样的字段,里面存放的内容可能非常多,如果把需要的数据都加载出来,可能会占用大量的 Token。一方面会挤占上下文空间,另外也会浪费 Token,所以针对这样的内容,读取数据时需要做数据截断。

2. 实现插件

//Qjy.AICopilot.DataAnalysisService/Plugins/DataAnalysisPlugin.cs
// 用于映射元数据查询结果
public record ColumnMetadata
{
    public string ColumnName { get; set; } = "";
    public string DataType { get; set; } = "";
    public bool IsPrimaryKey { get; set; }
    public string? Description { get; set; }
}

/// <summary>
/// 数据分析插件
/// 提供数据库元数据探索和SQL执行能力,是Text-to-SQL的核心组件。
/// </summary>
public class DataAnalysisPlugin(IDatabaseConnector dbConnector, ILogger<DataAnalysisPlugin> logger) : AgentPluginBase
{
    public override string Description => "提供数据库结构查询和SQL执行能力,用于回答涉及业务数据的统计分析问题。";

    // 辅助方法:根据名称获取数据库配置
    // 这个方法不暴露给 AI,仅供内部使用
    private async Task<BusinessDatabase> GetDatabaseAsync(IServiceProvider sp, string databaseName, CancellationToken ct)
    {
        var dataQuery = sp.GetRequiredService<IDataQueryService>();
        var queryable = dataQuery.BusinessDatabases.Where(d => d.Name == databaseName);
        var db = await dataQuery.FirstOrDefaultAsync(queryable);

        if (db == null)
        {
            throw new ArgumentException($"未找到名称为 '{databaseName}' 的数据库。请检查名称是否正确。");
        }

        if (!db.IsEnabled)
        {
            throw new InvalidOperationException($"数据库 '{databaseName}' 已被禁用。");
        }

        return db;
    }
    
    [Description("获取指定数据库中所有表的名称和描述。这是探索数据库结构的第一步。")]
    public async Task<string> GetTableNamesAsync(IServiceProvider sp, [Description("目标数据库的名称")] string databaseName)
    {
        try
        {
            // 获取数据库配置
            var db = await GetDatabaseAsync(sp, databaseName, CancellationToken.None);

            // 根据数据库类型构建查询元数据的 SQL
            var sql = string.Empty;
            switch (db.Provider)
            {
                case DbProviderType.PostgreSql:
                    // PostgreSQL: 从 information_schema 获取表名,关联 pg_description 获取注释
                    sql = @"
                        SELECT 
                            t.table_name AS ""TableName"",
                            obj_description(pgc.oid) AS ""Description""
                        FROM information_schema.tables t
                        INNER JOIN pg_class pgc ON t.table_name = pgc.relname
                        WHERE t.table_schema = 'public' 
                          AND t.table_type = 'BASE TABLE';";
                    break;
                case DbProviderType.SqlServer:
                    // SQL Server
                    break;
                default:
                    return $"错误:不支持的数据库类型 {db.Provider}";
            }

            // 执行查询
            // 这里使用了基础设施层的 ExecuteQueryAsync,它返回 IEnumerable<dynamic>
            var result = await dbConnector.ExecuteQueryAsync(db, sql);

            // 序列化结果
            return result.ToJson();
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "获取表名失败。Database: {DbName}", databaseName);
            return $"获取表名时发生错误: {ex.Message}";
        }
    }
    
    
    // 内部辅助方法:查询单个表的列元数据
    private async Task<List<ColumnMetadata>> GetColumnsAsync(BusinessDatabase db, string tableName)
    {
        var sql = string.Empty;
        switch (db.Provider)
        {
            case DbProviderType.PostgreSql:
                // PostgreSQL 元数据查询
                // 包含列名、类型、是否主键
                // 注意:此处简化了查询,实际生产中可能需要更复杂的关联来获取外键
                sql = @"
            SELECT 
                c.column_name AS ""ColumnName"",
                c.data_type AS ""DataType"",
                CASE WHEN tc.constraint_type = 'PRIMARY KEY' THEN 1 ELSE 0 END AS ""IsPrimaryKey"",
                pg_catalog.col_description(format('%s.%s', c.table_schema, c.table_name)::regclass::oid, c.ordinal_position) AS ""Description""
            FROM information_schema.columns c
            LEFT JOIN information_schema.key_column_usage kcu 
                ON c.table_name = kcu.table_name AND c.column_name = kcu.column_name
            LEFT JOIN information_schema.table_constraints tc 
                ON kcu.constraint_name = tc.constraint_name AND tc.constraint_type = 'PRIMARY KEY'
            WHERE c.table_name = @TableName AND c.table_schema = 'public';";
                break;
            case DbProviderType.SqlServer:
                // SQL Server 元数据查询
                break;
            default:
                return [];
        }

        var result = await dbConnector.ExecuteQueryAsync(db, sql, new { TableName = tableName });

        // Dapper 返回的是 dynamic,需要手动映射到强类型
        var columns = new List<ColumnMetadata>();
        foreach (var row in result)
        {
            var dict = (IDictionary<string, object>)row;
            columns.Add(new ColumnMetadata
            {
                ColumnName = dict["ColumnName"] as string ?? "",
                DataType = dict["DataType"] as string ?? "",
                IsPrimaryKey = Convert.ToInt32(dict["IsPrimaryKey"]) == 1,
                Description = dict["Description"] as string ?? ""
            });
        }

        return columns;
    }
    
    [Description("获取指定表的详细结构定义(DDL),包含列名、数据类型、主键和外键信息。")]
    public async Task<string> GetTableSchemaAsync(
        IServiceProvider sp,
        [Description("目标数据库的名称")] string databaseName,
        [Description("需要查询的表名列表,如 'Orders, Customers'")] string[] tableNames)
    {
        if (tableNames.Length == 0)
        {
            return "错误:请提供至少一个表名。";
        }

        try
        {
            var db = await GetDatabaseAsync(sp, databaseName, CancellationToken.None);
            var ddlBuilder = new StringBuilder();

            foreach (var tableName in tableNames)
            {
                // 1. 查询列信息
                var columns = await GetColumnsAsync(db, tableName);

                if (!columns.Any())
                {
                    ddlBuilder.AppendLine($"-- 警告: 表 '{tableName}' 不存在或没有列。");
                    continue;
                }

                // 2. 构建 CREATE TABLE 语句
                ddlBuilder.AppendLine($"CREATE TABLE {tableName} (");

                var columnDefs = new List<string>();
                foreach (var col in columns)
                {
                    // 格式: ColumnName DataType [PK/FK] [Comment]
                    var colDef = $"  {col.ColumnName} {col.DataType}";

                    if (col.IsPrimaryKey) colDef += " PRIMARY KEY";

                    // 如果有描述,作为注释添加,帮助 AI 理解字段含义
                    if (!string.IsNullOrWhiteSpace(col.Description))
                    {
                        colDef += $" -- {col.Description}";
                    }

                    columnDefs.Add(colDef);
                }

                ddlBuilder.AppendLine(string.Join(",\n", columnDefs));
                ddlBuilder.AppendLine(");");
                ddlBuilder.AppendLine();
            }

            return ddlBuilder.ToString();
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "获取表结构失败。Database: {DbName}", databaseName);
            return $"获取表结构时发生错误: {ex.Message}";
        }
    }
    
    [Description("在指定数据库上执行查询 SQL 语句,并返回 JSON 格式的结果。")]
    public async Task<string> ExecuteSqlQueryAsync(
        IServiceProvider sp,
        [Description("目标数据库的名称")] string databaseName,
        [Description("要执行的 SQL 查询语句 (仅限 SELECT,不需要人类可读,去除换行符)")] string sqlQuery)
    {
        // 1. 基础校验
        if (string.IsNullOrWhiteSpace(sqlQuery)) return "错误:SQL 语句不能为空。";

        try
        {
            var db = await GetDatabaseAsync(sp, databaseName, CancellationToken.None);

            // 2. 执行查询
            var data = await dbConnector.ExecuteQueryAsync(db, sqlQuery);

            // 3. 结果处理策略
            var dataList = data.ToList();
            var rowCount = dataList.Count;

            // 没有数据
            if (rowCount == 0)
            {
                return "查询执行成功,但未返回任何结果 (0 rows)。";
            }

            // 数据量过大保护
            const int maxRowsReturn = 50; // 硬编码限制,最多返回 50 行
            if (rowCount > maxRowsReturn)
            {
                // 仅取前 50 行
                var truncatedList = dataList.Take(maxRowsReturn).ToList();

                return $"查询成功。结果集过大 (共 {rowCount} 行),已截断为前 {maxRowsReturn} 行以适应上下文。\nJSON结果: {truncatedList.ToJson()}";
            }

            // 正常返回
            return dataList.ToJson();
        }
        catch (InvalidOperationException ex) // 捕获安全拦截异常
        {
            logger.LogWarning("SQL 执行被拦截: {Message}", ex.Message);
            return $"安全警告: 查询被系统拒绝。原因: {ex.Message}";
        }
        catch (Exception ex)
        {
            // 这里是 ReAct 模式中“错误自愈”的关键!
            // 我们必须返回详细的数据库错误信息(如 "Column 'xxx' not found"),
            // 这样 Agent 才能看到错误 -> 思考原因 -> 修正 SQL -> 重试。
            logger.LogError(ex, "SQL 执行异常");
            return $"SQL 执行错误: {ex.Message}\n请检查你的 SQL 语法、表名或列名是否正确,并参考之前的 Schema 定义进行修正。";
        }
    }
}

//Qjy.AICopilot.Services.Common/Expansions/JsonExpansion.CS
public static class JsonExpansion
{
    private static readonly JsonSerializerOptions DefaultOptions = new()
    {
        // 正式环境使用 WriteIndented = false 压缩 JSON,节省 Token
        WriteIndented = true,
        // 不转义中文字符串,避免编码问题
        Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,        
        PropertyNamingPolicy = JsonNamingPolicy.CamelCase
    };

    public static string ToJson(this object obj)
    {
        return JsonSerializer.Serialize(obj, DefaultOptions);
    }
}

3. 数据库方言设置

我们创建一个方言策略工厂

//Qjy.AICopilot.DataAnalysisService/SqlDialectFactory.cs
/// <summary>
/// SQL 方言策略工厂
/// 负责提供特定数据库类型的提示词片段,用于注入 System Prompt。
/// </summary>
public static class SqlDialectFactory
{
    /// <summary>
    /// 获取指定数据库类型的方言指南
    /// </summary>
    public static string GetInstructions(DbProviderType providerType)
    {
        return providerType switch
        {
            DbProviderType.PostgreSql => PostgreSqlInstructions,
            DbProviderType.SqlServer => SqlServerInstructions,
            DbProviderType.MySql => MySqlInstructions,
            _ => "请使用标准的 ANSI SQL 语法。"
        };
    }

    // PostgreSQL 专用指令
    private const string PostgreSqlInstructions = """
                                                   - **分页**: 必须使用 `LIMIT {n} OFFSET {m}` 语法。
                                                   - **字符串连接**: 必须使用双竖线 `||` (例如 `first_name || ' ' || last_name`)。
                                                   - **当前时间**: 使用 `NOW()` 或 `CURRENT_TIMESTAMP`。
                                                   - **标识符引用**: PostgreSQL 对大小写敏感。如果表名或列名包含大写字母,**必须**使用双引号包裹 (例如 `"UserTable"`)。
                                                   - **类型转换**: 使用 `::type` 语法 (例如 `price::numeric`) 或 `CAST()`。
                                                   - **正则匹配**: 使用 `~` (区分大小写) 或 `~*` (不区分大小写)。
                                                   """;

    // SQL Server 专用指令
    private const string SqlServerInstructions = """
                                                  - **分页**: 必须使用 `TOP {n}` (例如 `SELECT TOP 10 *`) 或 `OFFSET-FETCH` 子句。
                                                  - **字符串连接**: 使用加号 `+` 或 `CONCAT()` 函数。
                                                  - **当前时间**: 使用 `GETDATE()`。
                                                  - **标识符引用**: 建议使用方括号 `[]` 包裹 (例如 `[UserTable]`)。
                                                  - **日期计算**: 使用 `DATEDIFF(day, start, end)` 或 `DATEADD(...)`。
                                                  - **注意**: 不支持 `LIMIT` 关键字。
                                                  """;

    // MySQL 专用指令
    private const string MySqlInstructions = """
                                              - **分页**: 使用 `LIMIT {n}, {m}`。
                                              - **字符串连接**: 必须使用 `CONCAT()` 函数,不支持 `||` 或 `+`。
                                              - **标识符引用**: 使用反引号 `` ` `` 包裹。
                                              - **日期**: 使用 `NOW()` 或 `CURDATE()`。
                                              """;
}

该工厂类在创建数据分析智能体时用到

4. 注入插件

//Qjy.AICopilot.DataAnalysisService/DependencyInjection.cs
public static class DependencyInjection
{
    public static void AddDataAnalysisService(this IHostApplicationBuilder builder)
    {
        // 注册 Dapper 基础服务
        builder.AddDapper();
        // 注册插件加载器
        builder.Services.AddAgentPlugin(registrar =>
        {
            registrar.RegisterPluginFromAssembly(Assembly.GetExecutingAssembly());
        });
    }
}

5. 生成模拟数据

  • 创建2个模式数据库
//Qjy.AICopilot.MigrationWorkApp/SeedData/DataAnalysisData.cs
public static class DataAnalysisData
{
    public static IEnumerable<BusinessDatabase> GetDatabases()
    {
        // 模拟一个 ERP 数据库连接
        // 注意:在实际开发环境中,这里应该指向一个真实存在的测试库
        // 本示例假设本地有一个名为 'erp_demo' 的 PostgreSQL 数据库
        var erpDb = new BusinessDatabase(
            "ERP_Core",
            "负责管理产品主数据、客户关系与销售业务。包含:SKU基础信息、价格表、客户档案、销售订单(订单号/金额/状态)以及账面库存总量。当用户询问“卖了多少”、“谁买的”、“订单金额”、“还有多少库存(总量)”时,请选择此库。",
            "Host=localhost;Port=5432;Database=erp_demo;Username=postgres;Password=123456",
            DbProviderType.PostgreSql
        );

        // 模拟一个 WMS 数据库连接
        var wmsDb = new BusinessDatabase(
            "WMS_Warehouse",
            "负责管理仓库内的具体作业与实物流动。包含:具体的货架/货位管理、拣货打包记录、包裹重量、快递运单号、出入库的详细操作流水。当用户询问“货物在哪个货架”、“包裹发走了没”、“快递单号是多少”、“何时入库的”时,请选择此库。",
            "Host=localhost;Port=5432;Database=wms_demo;Username=postgres;Password=123456",
            DbProviderType.PostgreSql
        );
        return [erpDb];
    }
}

//Qjy.AICopilot.MigrationWorkApp/Worker.cs
public class Worker(
    IServiceProvider serviceProvider,
    IHostApplicationLifetime hostApplicationLifetime) : BackgroundService
{
     private static async Task SeedDataAsync(
         AiCopilotDbContext dbContext,
         RoleManager<IdentityRole> roleManager,
         UserManager<IdentityUser> userManager,
         CancellationToken cancellationToken)
 	{
        // ....
        // 创建默认业务数据库
        if (!await dbContext.BusinessDatabases.AnyAsync(cancellationToken: cancellationToken))
        {
            await dbContext.BusinessDatabases.AddRangeAsync(DataAnalysisData.GetDatabases(), cancellationToken);
        }

        await dbContext.SaveChangesAsync(cancellationToken);
    }
}
  • 提供种子 sql
--Qjy.AICopilot.AppHost/Sql/erp-init.sql
-- =============================================================================
-- ERP Demo Database Initialization for AI Copilot Testing
-- 描述: 这是一个模拟电子产品销售的 ERP 数据库,设计用于测试 AI 的数据分析和 SQL 生成能力。
-- =============================================================================

DROP DATABASE IF EXISTS erp_demo;
CREATE DATABASE erp_demo;

\c erp_demo;

-- =============================================================================
-- 1. 商品主表 (base_products)
-- =============================================================================
CREATE TABLE base_products (
                               product_id SERIAL PRIMARY KEY,
                               product_name VARCHAR(200) NOT NULL,
                               sku_code VARCHAR(50) UNIQUE NOT NULL,
                               category VARCHAR(50),
                               brand VARCHAR(50),
                               std_price DECIMAL(10, 2), -- 标准零售价
                               cost_price DECIMAL(10, 2), -- 进货成本
                               status VARCHAR(20) DEFAULT 'Active',
                               created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

COMMENT ON TABLE base_products IS '商品基础信息表,存储所有 SKU 的核心数据';
COMMENT ON COLUMN base_products.product_id IS '商品唯一标识 ID,用于关联库存和订单明细';
COMMENT ON COLUMN base_products.sku_code IS '库存单元编码 (Stock Keeping Unit),业务侧常用的唯一商品编码';
COMMENT ON COLUMN base_products.category IS '商品分类,例如:Electronics(电子), Furniture(家具), Accessories(配件)';
COMMENT ON COLUMN base_products.std_price IS '标准建议零售价 (MSRP),注意:实际订单成交价可能会低于此价格';
COMMENT ON COLUMN base_products.cost_price IS '商品进货成本价,用于计算毛利 (Profit = Revenue - Cost)';
COMMENT ON COLUMN base_products.status IS '商品状态: Active(在售), Discontinued(停产), OutOfStock(缺货)';

-- =============================================================================
-- 2. 客户表 (customers) - 用于客户画像分析
-- =============================================================================
CREATE TABLE customers (
                           customer_id SERIAL PRIMARY KEY,
                           customer_name VARCHAR(100) NOT NULL,
                           contact_email VARCHAR(100),
                           region VARCHAR(50),
                           industry VARCHAR(50),
                           level VARCHAR(20) DEFAULT 'Standard',
                           join_date DATE DEFAULT CURRENT_DATE
);

COMMENT ON TABLE customers IS '客户信息表,用于分析客户分布和购买行为';
COMMENT ON COLUMN customers.region IS '客户所属大区,如:North China(华北), East China(华东), South China(华南)';
COMMENT ON COLUMN customers.industry IS '客户所属行业,用于 B2B 分析,如:Technology(科技), Education(教育), Retail(零售)';
COMMENT ON COLUMN customers.level IS '客户等级: Standard(普通), VIP(重要客户), Partner(合作伙伴)';

-- =============================================================================
-- 3. 仓库库存表 (stock_inventory)
-- =============================================================================
CREATE TABLE stock_inventory (
                                 id SERIAL PRIMARY KEY,
                                 product_id INT NOT NULL,
                                 warehouse_name VARCHAR(50) NOT NULL,
                                 quantity INT DEFAULT 0,
                                 last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                                 CONSTRAINT fk_inv_product FOREIGN KEY (product_id) REFERENCES base_products(product_id)
);

COMMENT ON TABLE stock_inventory IS '实时库存表,记录各仓库的商品持有量';
COMMENT ON COLUMN stock_inventory.warehouse_name IS '仓库名称,例如:Hangzhou_Main_WH(杭州主仓), Beijing_Branch_WH(北京分仓)';
COMMENT ON COLUMN stock_inventory.quantity IS '当前可用物理库存数量';

-- =============================================================================
-- 4. 销售订单主表 (orders) - 用于销售趋势分析
-- =============================================================================
CREATE TABLE orders (
                        order_id SERIAL PRIMARY KEY,
                        order_no VARCHAR(50) UNIQUE NOT NULL,
                        customer_id INT NOT NULL,
                        order_date TIMESTAMP NOT NULL,
                        total_amount DECIMAL(12, 2) DEFAULT 0,
                        status VARCHAR(20) DEFAULT 'Pending',
                        CONSTRAINT fk_order_customer FOREIGN KEY (customer_id) REFERENCES customers(customer_id)
);

COMMENT ON TABLE orders IS '销售订单主表,记录交易发生的时间、客户和总金额';
COMMENT ON COLUMN orders.order_no IS '业务订单号,格式通常为 ORD-YYYYMMDD-XXX';
COMMENT ON COLUMN orders.total_amount IS '订单总金额 (实际支付金额),是该订单所有明细项 subtotal 的总和';
COMMENT ON COLUMN orders.status IS '订单状态: Pending(待付款), Paid(已付款), Shipped(已发货), Completed(已完成), Cancelled(已取消). 注意:统计销售业绩时通常应排除 Cancelled 状态';

-- =============================================================================
-- 5. 订单明细表 (order_items) - 用于商品销售分析
-- =============================================================================
CREATE TABLE order_items (
                             item_id SERIAL PRIMARY KEY,
                             order_id INT NOT NULL,
                             product_id INT NOT NULL,
                             quantity INT NOT NULL,
                             unit_price DECIMAL(10, 2) NOT NULL,
                             subtotal DECIMAL(12, 2) GENERATED ALWAYS AS (quantity * unit_price) STORED,
                             CONSTRAINT fk_item_order FOREIGN KEY (order_id) REFERENCES orders(order_id),
                             CONSTRAINT fk_item_product FOREIGN KEY (product_id) REFERENCES base_products(product_id)
);

COMMENT ON TABLE order_items IS '订单明细表,记录每一笔订单中包含的具体商品和数量';
COMMENT ON COLUMN order_items.unit_price IS '实际成交单价 (Actual Selling Price),可能与商品表的标准售价不同(如有折扣)';
COMMENT ON COLUMN order_items.subtotal IS '行小计金额 = 数量 * 实际成交单价';

-- =============================================================================
-- 数据填充
-- =============================================================================

-- =============================================================================
-- 数据填充 (Mock Data Seed)
-- =============================================================================

-- 1. 商品数据 (Products)
-- 包含:电子产品、电脑、配件、家具等不同类目,用于测试分类统计
INSERT INTO base_products (product_name, sku_code, category, brand, std_price, cost_price) VALUES
                                                                                               ('iPhone 15 Pro', 'AP-IP15P-256', 'Electronics', 'Apple', 8999.00, 7500.00),
                                                                                               ('iPhone 15 Plus', 'AP-IP15PL-128', 'Electronics', 'Apple', 6999.00, 5800.00),
                                                                                               ('MacBook Pro M3', 'AP-MBP-M3-14', 'Electronics', 'Apple', 12999.00, 10500.00),
                                                                                               ('Dell XPS 15', 'DELL-XPS-9530', 'Computers', 'Dell', 11500.00, 9000.00),
                                                                                               ('Logitech MX Master 3S', 'LOGI-MX3S', 'Accessories', 'Logitech', 899.00, 500.00),
                                                                                               ('Sony WH-1000XM5', 'SNY-XM5-BLK', 'Audio', 'Sony', 2499.00, 1800.00),
                                                                                               ('ErgoChair Pro', 'FRN-CHR-PRO', 'Furniture', 'Autonomous', 3500.00, 2000.00);

-- 2. 客户数据 (Customers)
-- 包含:不同行业、不同等级(VIP/Standard)和不同地区的客户,用于画像分析
INSERT INTO customers (customer_name, region, industry, level, join_date) VALUES
                                                                              ('TechFlow Inc.', 'East China', 'Technology', 'VIP', '2023-01-10'),      -- 华东科技公司 (VIP)
                                                                              ('EduGlobal School', 'North China', 'Education', 'Partner', '2023-03-15'), -- 华北教育机构 (合作伙伴)
                                                                              ('Retail Solutions Ltd.', 'South China', 'Retail', 'Standard', '2023-05-20'), -- 华南零售商
                                                                              ('David Li', 'East China', 'Individual', 'Standard', '2023-06-01'),        -- 个人用户
                                                                              ('Green Energy Corp', 'West China', 'Energy', 'VIP', '2023-11-11');        -- 西部能源公司

-- 3. 库存数据 (Inventory)
-- 模拟:部分商品充足,部分商品缺货 (Stockout) 的场景
INSERT INTO stock_inventory (product_id, warehouse_name, quantity) VALUES
                                                                       (1, 'Hangzhou_Main_WH', 50), (1, 'Beijing_Branch_WH', 15), -- iPhone 15 Pro 库存充足
                                                                       (2, 'Hangzhou_Main_WH', 5),  (2, 'Beijing_Branch_WH', 0),  -- iPhone 15 Plus 北京仓缺货
                                                                       (3, 'Hangzhou_Main_WH', 20),                                -- MacBook 只有杭州仓有货
                                                                       (5, 'Hangzhou_Main_WH', 200), (5, 'Guangzhou_Branch_WH', 100); -- 鼠标在多地仓库都有大量库存

-- 4. 订单与明细数据 (Orders & Items)
-- 场景覆盖:企业采购、教育订单、取消订单、近期高额订单

-- 订单 1: 企业批量采购 (Apple 产品)
-- 测试点:大额订单、批量折扣逻辑(售价低于标价)
INSERT INTO orders (order_no, customer_id, order_date, status) VALUES ('ORD-20231001-001', 1, '2023-10-01 10:00:00', 'Completed');
INSERT INTO order_items (order_id, product_id, quantity, unit_price) VALUES
                                                                         (1, 1, 10, 8800.00), -- 购买10台 iPhone,单价 8800 (原价 8999)
                                                                         (1, 3, 5, 12500.00); -- 购买 5台 MacBook,单价 12500 (原价 12999)
UPDATE orders SET total_amount = (SELECT SUM(subtotal) FROM order_items WHERE order_id = 1) WHERE order_id = 1;

-- 订单 2: 教育机构采购 (Dell 电脑 + 配件)
-- 测试点:跨类目购买 (Computers + Accessories)、状态为"已发货"
INSERT INTO orders (order_no, customer_id, order_date, status) VALUES ('ORD-20231111-002', 2, '2023-11-11 14:30:00', 'Shipped');
INSERT INTO order_items (order_id, product_id, quantity, unit_price) VALUES
                                                                         (2, 4, 20, 11000.00), -- 20台 Dell XPS
                                                                         (2, 5, 20, 800.00);   -- 20个 罗技鼠标
UPDATE orders SET total_amount = (SELECT SUM(subtotal) FROM order_items WHERE order_id = 2) WHERE order_id = 2;

-- 订单 3: 个人用户购买 (已取消)
-- 测试点:在统计销售额时,应当排除状态为 Cancelled 的订单
INSERT INTO orders (order_no, customer_id, order_date, status) VALUES ('ORD-20231201-003', 4, '2023-12-01 09:15:00', 'Cancelled');
INSERT INTO order_items (order_id, product_id, quantity, unit_price) VALUES (3, 6, 1, 2499.00); -- 索尼耳机
UPDATE orders SET total_amount = (SELECT SUM(subtotal) FROM order_items WHERE order_id = 3) WHERE order_id = 3;

-- 订单 4: 近期高价值订单
-- 测试点:时间范围查询 (例如查询"最近一个月")、高客单价商品
INSERT INTO orders (order_no, customer_id, order_date, status) VALUES ('ORD-20231220-004', 5, '2023-12-20 16:00:00', 'Paid');
INSERT INTO order_items (order_id, product_id, quantity, unit_price) VALUES (4, 7, 10, 3200.00); -- 10把人体工学椅
UPDATE orders SET total_amount = (SELECT SUM(subtotal) FROM order_items WHERE order_id = 4) WHERE order_id = 4;
  • Asprise 项目启动时执行 sql
//Qjy.AICopilot.AppHost/AppHost.cs
//  定义一个固定的密码参数 (Secret)
var password = builder.AddParameter("pg-password", secret: true);

var postgresdb = builder.AddPostgres("postgres", password: password)
    .WithHostPort(5432)
    .WithDataVolume("postgres-aicopilot")
    .WithBindMount("./Sql", "/docker-entrypoint-initdb.d")
    .WithPgWeb(pgAdmin => pgAdmin.WithHostPort(5050))
    .AddDatabase("ai-copilot");
//Qjy.AICopilot.AppHost/appsettings.json
{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning",
      "Aspire.Hosting.Dcp": "Warning"
    }
  },
  "Parameters": {
    "pg-password": "123456"
  }
}

三、三元认知模型

目前,我们认知模型已经可以完成工具调用、知识库检索,而现在又需要在这基础上添加数据库查询功能的识别,即:

  • 当用户询问上季度销售额,查询库存里有多少台 IPhone15时,需要调用数据分析执行器
  • 用户提出帮我请个假时,是执行工具调用
  • 用户询问公司的请假制度怎么样时,这时就是知识库检索

1. 数据分析专家提示词

从提示词中,我们可以看到这是一个具备 ReAct 分析能力的 DBA 专家。

  • DDL 表结构,注入到系统提示词
  • 思考-行动-观察
  • 认知漏斗:探索、构建、修正
  • 结构探索:sales、product 语句构建 DDL 语句表结构,构建 SQL 语句自愈修正,如果失败、重新生成修正后的SQL
//Qjy.AICopilot.MigrationWorkApp.SeedData/AiGatewayData.cs 
//ConversationTemplates()方法,部分代码
var dataAnalysisTemplate = new ConversationTemplate(
    "DataAnalysisAgent",
    "数据库分析专家",
    """
    你是一个精通 **{{$DbProvider}}** 的高级数据库管理员。
    你当前正在操作的 **目标数据库名称** 为:**{{$DatabaseName}}**。

    你的核心职责分为两步:
    1. **数据获取**:构造精准的 SQL 语句查询业务数据。
    2. **数据转化**:对查询结果进行语义解释,并设计最佳的可视化展示方案。

    ### 核心工作流程
    1. **探索**: 调用 `GetTableNames` 初步筛选候选表。
    2. **详查**: 调用 `GetTableSchema` 获取详细 DDL 和 **字段注释**。
    3. **构建**: 生成 SQL 并调用 `ExecuteSqlQuery` 获取数据样本。
    4. **决策**: 观察查询结果,思考以下问题:
    - 这些字段的业务含义是什么?(特别是状态码、类型值)
    - 这组数据适合用图表展示吗?(趋势用折线图,分布用饼图,对比用柱状图,明细用表格)
    5. **输出**: 调用 `OutputResult` 输出数据分析结果和可视化决策结果,获取最终指示。

    ### 核心交互原则
    1. **过程透明(允许)**:在调用工具或构建 SQL 的过程中,你可以简要向用户解释你的思路。
    2. **禁止解读(严格)**:**严禁** 对数据具体数值进行趋势分析或总结。

    ### 核心安全准则
    - **只读权限**: 你仅拥有 `SELECT` 权限。严禁生成 `INSERT`, `UPDATE`, `DELETE`, `DROP` 等修改性语句。
    - **范围限制**: 所有的探索和查询操作必须严格限制在数据库 **{{$DatabaseName}}** 范围内。

    ### 数据库方言规范
    当前连接的数据库遵循以下语法标准,请严格遵守:
    {{$DialectInstructions}}

    ### 可视化输出规范决策指南
    在生成回答时,你需要判断当前的数据结果最适合以何种 UI 形式展示给用户。
    如果数据适合可视化,请严格按照以下 JSON 格式输出决策指令。

    **场景 1:趋势或对比分析 (Chart)**
    当数据包含时间序列、分类对比且行数适中时使用。
    "visual_decision": {
        "type": "Chart",
        "title": "图表标题",
        "description": "图表描述",
        "chart_config": {
            "category": "可选值:Line,Bar,Pie",
            "x": "作为X轴的字段名,例如 order_date",
            "y": "作为Y轴的数值字段名,例如 total_amount",
            "series": "可选,用于分组的字段名,例如 product_category"
        }
    }

    **场景 2:明细数据列表 (DataTable)**
    当数据是详细记录列表(如订单列表、库存清单)且行数较多时使用。
    "visual_decision": {
        "type": "DataTable",
        "title": "列表标题",
        "description": "列表描述",
    }

    **场景 3:单一关键指标 (StatsCard)**
    "visual_decision": {
        "type": "StatsCard",
        "title": "卡片标题",
        "description": "卡片描述",
        "unit": "单位(可选)"
    }
    """,
    Guids[1],
    new TemplateSpecification
    {
        Temperature = 0.3f
    });

2. 认知模型设计

数据分析意图列表:Analysis.*

//Qjy.AICopilot.MigrationWorkApp.SeedData/AiGatewayData.cs 
//ConversationTemplates()方法,部分代码
var item1 = new ConversationTemplate(
    "IntentRoutingAgent",
    "三元意图识别路由代理",
    """
    你是一个企业级智能任务调度中心。你的核心职责是精准分析用户的自然语言输入,识别出用户的意图,并将其映射到【可用意图列表】中的一个或多个条目。

    你所处的环境包含三类核心能力:
    1. **工具 (Action)**: 执行具体的业务操作(如发邮件、订会议)。
    2. **知识库 (Knowledge)**: 检索非结构化的文档、制度、Wiki(如查询报销标准、操作手册)。
    3. **数据分析 (Analysis)**: 直接查询数据库中的结构化业务数据(如统计销售额、查询库存、列出订单)。

    ### 你的思维链
    面对用户输入,请务必严格按以下步骤进行内部推理:

    1. **分析需求类型**: 
       - 用户是想“做一件事”(Action)?
       - 还是想“查一些资料/制度”(Knowledge)?
       - 还是想“看具体的业务数据/报表”(Analysis)?

    2. **区分“知识”与“数据” (关键)**:
       - 如果问题是关于“是什么”、“怎么做”、“流程定义”等静态信息 -> 倾向于 Knowledge。
       - 如果问题涉及“多少”、“状态”、“列表”、“统计”、“同比/环比”等动态数值 -> 倾向于 Analysis。
       - *示例*:“如何申请退款?”是 Knowledge;“这个月有多少退款单?”是 Analysis。

    3. **匹配意图**:
       - 扫描【可用意图列表】,寻找最契合的条目。
       - 如果涉及数据查询,根据数据库的描述(Description)选择最合适的业务库(Analysis.{DbName})。

    4. **决策与组合**:
       - 绝大多数情况下,只需返回一个最匹配的意图。
       - 如果用户意图复合(例如“先查库存(Analysis),然后发邮件(Action)”),请同时返回多个意图。
       - 如果无法匹配任何工具、知识库或数据库,返回 `General.Chat`。

    ### 输出规范
    你必须输出一个严格的 JSON 数组。数组中的每个对象代表一个识别出的意图。

    JSON 对象字段说明:
    - `intent`: (string) 必须完全匹配【可用意图列表】中的代码。
      - 工具类格式: `Action.{PluginName}`
      - 知识类格式: `Knowledge.{KbName}`
      - 数据类格式: `Analysis.{DatabaseName}`
    - `confidence`: (float) 0.0 到 1.0 之间的置信度。
    - `reasoning`: (string) 你选择该意图的简短理由,请说明你为什么认为应该查文档而不是查库(反之亦然)。
    - `query`: (string, 可选) 
      - 对于 `Knowledge` 意图:提取用于向量检索的关键词。
      - 对于 `Analysis` 意图:保留用户的原始自然语言问题,以便后续 Text-to-SQL 模块处理。

    ### 示例 1 (数据分析)
    输入: "帮我查一下杭州仓库现在还有多少台 iPhone 15 Pro?"
    输出:
    [
        {
            "intent": "Analysis.WMS_Production",
            "confidence": 0.98,
            "reasoning": "用户询问具体产品的库存数量,这是一个动态的业务数据查询需求。根据描述,WMS_Production 库包含实时库存信息。",
            "query": "杭州仓库 iPhone 15 Pro 库存数量"
        }
    ]

    ### 示例 2 (混合意图)
    输入: "请假制度里关于病假是怎么规定的?顺便看下我今年还剩几天病假。"
    输出:
    [
        {
            "intent": "Knowledge.EmployeeHandbook",
            "confidence": 0.95,
            "reasoning": "用户前半句询问'规定',属于静态制度查询。",
            "query": "病假规定"
        },
        {
            "intent": "Analysis.HR_Core",
            "confidence": 0.90,
            "reasoning": "用户后半句询问'剩余病假天数',属于个人动态数据查询,需查询 HR 数据库。",
            "query": "当前用户剩余病假天数"
        }
    ]

    ### 可用意图列表
    {{$IntentList}}
    """,
    Guids[0],
    new TemplateSpecification
    {
        // 保持 0 温度,确保路由决策的确定性和 JSON 格式的稳定性
        Temperature = 0.0f
    });

3. 重构意图分类构建器

public class IntentRoutingAgentBuilder
{
    private const string AgentName = "IntentRoutingAgent";

    private readonly ChatAgentFactory _agentFactory;
    private readonly IDataQueryService _dataQueryService;

    // 缓存静态的工具意图列表,避免重复反射扫描
    private readonly string _toolIntentListString;

    public IntentRoutingAgentBuilder(
        ChatAgentFactory agentFactory,
        AgentPluginLoader pluginLoader,
        IDataQueryService dataQueryService)
    {
        _agentFactory = agentFactory;
        _dataQueryService = dataQueryService;

        // 添加系统内置意图
        var sb = new StringBuilder();
        sb.AppendLine("- General.Chat: 闲聊、打招呼、情感交互或无法归类的问题。");

        // 扫描插件系统,添加工具意图
        // 这里我们假设每个 Plugin 对应一个大类意图,实际项目中可以做得更细致
        var allPlugins = pluginLoader.GetAllPlugin();
        foreach (var plugin in allPlugins)
        {
            // 格式:- Action.{PluginName}: {Description}
            sb.AppendLine($"- Action.{plugin.Name}: {plugin.Description}");
        }
        _toolIntentListString = sb.ToString();
    }

    /// <summary>
    /// 获取知识库意图列表
    /// </summary>
    private async Task<string> GetKnowledgeIntentListAsync()
    {
        var sb = new StringBuilder();

        // 查询所有启用的知识库
        var kbs = await _dataQueryService.ToListAsync(_dataQueryService.KnowledgeBases);

        foreach (var kb in kbs)
        {
            // 格式:- Knowledge.{KbName}: {Description}            
            // 示例:- Knowledge.HrPolicy: 公司员工手册、报销制度和考勤规定。
            sb.AppendLine($"- Knowledge.{kb.Name}: {kb.Description}");
        }

        return sb.ToString();
    }

    /// <summary>
    /// 获取数据分析意图列表
    /// </summary>
    private async Task<string> GetDataAnalysisIntentListAsync()
    {
        var sb = new StringBuilder();

        // 查询所有启用的业务数据库
        var queryable = _dataQueryService.BusinessDatabases.Where(b => b.IsEnabled);
        var dbs = await _dataQueryService.ToListAsync(queryable);

        foreach (var db in dbs)
        {
            // 格式:- Analysis.{DbName}: {Description}
            // 示例:- Analysis.ERP_Core: 包含销售订单、客户资料及发货记录。
            sb.AppendLine($"- Analysis.{db.Name}: {db.Description}");
        }

        return sb.ToString();
    }

    public async Task<ChatClientAgent> BuildAsync()
    {
        var intents = new StringBuilder();

        // 1. 添加工具意图 (Plugin)
        intents.Append(_toolIntentListString);

        // 2. 添加知识库意图 (RAG)
        intents.Append(await GetKnowledgeIntentListAsync());

        // 3. 添加数据分析意图 (Text-to-SQL)
        intents.Append(await GetDataAnalysisIntentListAsync());

        var agent = await _agentFactory.CreateAgentAsync(AgentName,
            template =>
            {
                // 渲染 System Prompt
                // 确保我们在 Prompt 模板中预留了 {{$IntentList}} 占位符
                template.SetSystemPrompt(template.SystemPrompt
                    .Replace("{{$IntentList}}", intents.ToString()));
            }, isSaveChatMessage: false);

        return agent;
    }
}

四、构建数据分析执行器

1. 扩展生成上下文对象

将数据分析分支扩展到上下对象中

//Qjy.AICopilot.AiGatewayService/Workflows/GenerationContext.cs
//部分代码
public enum BranchType
{
    Tools,          // 工具定义分支
    Knowledge,       // 知识检索分支
    DataAnalysis    // 数据分析分支
}

public record BranchResult
{
    /// <summary>
    /// 数据分析结果(当 Type == DataAnalysis 时有值)
    /// 通常包含 JSON 数据
    /// </summary>
    public string? DataAnalysis { get; init; }
    
    // 数据分析结果工厂方法
    public static BranchResult FromDataAnalysis(string result) =>
        new() { Type = BranchType.DataAnalysis, DataAnalysis = result };
}

public class GenerationContext
{
    /// <summary>
    /// 聚合后的数据分析上下文
    /// </summary>
    public string DataAnalysisContext { get; set; } = string.Empty;
}

2. 数据分析构建器

//Qjy.AICopilot.AiGatewayService/Agents/DataAnalysisAgentBuilder.cs;
public class DataAnalysisAgentBuilder(
    ChatAgentFactory agentFactory,
    AgentPluginLoader pluginLoader)
{
    private const string TemplateName = "DataAnalysisAgent";

    /// <summary>
    /// 构建针对特定数据库优化的 DBA Agent
    /// </summary>
    /// <param name="database">目标数据库实体,用于决策方言策略</param>
    /// <returns>配置好的 ChatClientAgent 实例</returns>
    public async Task<ChatClientAgent> BuildAsync(BusinessDatabase database)
    {
        // 1. 获取方言策略
        // 根据数据库类型 (PG/SQLServer),获取对应的提示片段
        var dialectInstructions = SqlDialectFactory.GetInstructions(database.Provider);
        var providerName = database.Provider.ToString();

        // 2. 创建 Agent 并执行模版替换
        // ChatAgentFactory 会加载基础模版,我们通过回调函数进行模版替换
        var agent = await agentFactory.CreateAgentAsync(TemplateName, template =>
        {
            // 我们只替换与"数据库类型"相关的占位符。
            // 此时 Agent 尚不知道具体的查询任务是什么。
            template.SetSystemPrompt(template.SystemPrompt
                .Replace("{{$DbProvider}}", providerName)
                .Replace("{{$DatabaseName}}", database.Name)
                .Replace("{{$DialectInstructions}}", dialectInstructions));
        }, options =>
        {
            // 挂载 "DataAnalysisPlugin" 中的所有工具
            // 这样 Agent 就拥有了 GetTableNames, ExecuteSqlQuery 
            options.Tools = pluginLoader.GetAITools(nameof(DataAnalysisPlugin));
        }, false);

        return agent;
    }
}

3. 数据分析执行器

职责:处理 Analysis.* 意图,实例化 DBA Agent,执行 Text-to-SQL 任务

//Qjy.AICopilot.AiGatewayService/Workflows/DataAnalysisExecutor.cs
public class DataAnalysisExecutor(
    DataAnalysisAgentBuilder agentBuilder,
    IDataQueryService dataQuery,
    ILogger<DataAnalysisExecutor> logger)
    : ReflectingExecutor<DataAnalysisExecutor>("DataAnalysisExecutor"),
        IMessageHandler<List<IntentResult>, BranchResult>
{
    private const string AnalysisIntentPrefix = "Analysis.";

    public async ValueTask<BranchResult> HandleAsync(
        List<IntentResult> intentResults,
        IWorkflowContext context,
        CancellationToken cancellationToken = default)
    {
        // 1. 筛选数据分析类意图
        // 过滤规则:必须以 Analysis. 开头,且置信度高于 0.6
        var analysisIntents = intentResults
            .Where(i => i.Intent.StartsWith(AnalysisIntentPrefix, StringComparison.OrdinalIgnoreCase)
                        && i.Confidence > 0.6)
            .ToList();

        if (analysisIntents.Count == 0)
        {
            logger.LogDebug("未检测到数据分析意图,跳过执行。");
            // 返回空结果,表示该分支无产出
            return BranchResult.FromDataAnalysis(string.Empty);
        }

        logger.LogInformation("启动数据分析流程,命中目标数据库数量: {Count}", analysisIntents.Count);

        // 2. 遍历处理每一个意图
        var output = new StringBuilder();
        foreach (var intent in analysisIntents)
        {
            output.AppendLine(await ProcessSingleIntentAsync(intent, context, cancellationToken));
        }

        return BranchResult.FromDataAnalysis(output.ToString());
    }

    /// <summary>
    /// 处理单个数据库查询意图
    /// </summary>
    private async Task<string> ProcessSingleIntentAsync(
        IntentResult intent,
        IWorkflowContext context,
        CancellationToken ct)
    {
        var dbName = intent.Intent.Substring(AnalysisIntentPrefix.Length);

        try
        {
            // 1. 获取数据库配置
            // 我们需要 BusinessDatabase 实体来决定方言策略
            var db = await dataQuery.FirstOrDefaultAsync(
                dataQuery.BusinessDatabases.Where(d => d.Name == dbName));

            if (db == null || !db.IsEnabled)
            {
                logger.LogWarning("意图指向数据库 '{DbName}',但该库不存在或已禁用。", dbName);
                return $"[系统提示]: 无法连接数据库 {dbName},请联系管理员核实配置。";
            }

            // 2. 构建 DBA Agent
            // 这里会动态注入 PG 或 SQLServer 的方言提示词
            var agent = await agentBuilder.BuildAsync(db);
            // 创建临时会话线程
            var thread = agent.GetNewThread();

            // 4. 执行 ReAct 循环
            // Agent 会自动进行: 思考 -> GetTableNames -> 思考 -> GetTableSchema -> 思考 -> ExecuteSQL -> 总结
            await foreach (var update in agent.RunStreamingAsync(intent.Query!, thread, cancellationToken: ct))
            {
                await context.AddEventAsync(new AgentRunUpdateEvent(Id, update), ct);
            }

            // 记录日志以便调试
            logger.LogInformation("数据库 {DbName} 查询完成。", dbName);

            // 获取最后一条 Agent 回复消息(最终数据)
            var messages = thread.GetService<IList<ChatMessage>>()!;
            var output = messages.LastOrDefault(message => message.Role == ChatRole.Assistant);
            return output != null ? output.Text : "[系统错误]: 无法获取查询结果。";
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "执行数据分析意图失败。Database: {DbName}", dbName);
            return $"[系统错误]: 查询数据库 {dbName} 时发生异常 - {ex.Message}";
        }
    }
}

4. 升级上下文聚合策略

职责:作为 Fan-in 节点,接收来自所有并行分支的 BranchResult。

只有当接收到的结果数量达到预期(3个)时,才进行合并并触发下游。

//Qjy.AICopilot.AiGatewayService/Workflows/ContextAggregatorExecutor.cs
public class ContextAggregatorExecutor(ILogger<ContextAggregatorExecutor> logger)
    : ReflectingExecutor<ContextAggregatorExecutor>("ContextAggregatorExecutor"),
        IMessageHandler<BranchResult>
{
    // 内部状态:用于跨方法调用累积结果
    private readonly List<BranchResult> _accumulatedResults = [];

    // 硬编码预期分支数:Tools + Knowledge + DataAnalysis = 3
    private const int ExpectedBranchCount = 3;

    public async ValueTask HandleAsync(
        BranchResult branchResult,
        IWorkflowContext context,
        CancellationToken cancellationToken = default)
    {
        // 1. 累积状态
        // 注意:FanInEdge 可能一次性送来所有结果,也可能分批送来
        // 因此我们需要 AddRange 并检查总数
        _accumulatedResults.AddRange(branchResult);

        // 2. 完备性检查
        if (_accumulatedResults.Count >= ExpectedBranchCount)
        {
            logger.LogInformation("并行分支汇聚完成,开始合并上下文。");

            // 3. 恢复原始请求 (从全局状态中读取)
            var request = await context.ReadStateAsync<ChatStreamRequest>("ChatStreamRequest", "Chat", cancellationToken);
            if (request == null) throw new InvalidOperationException("无法获取原始会话请求");

            var genContext = new GenerationContext { Request = request };

            // 4. 合并数据
            foreach (var result in _accumulatedResults)
            {
                switch (result.Type)
                {
                    case BranchType.Tools when result.Tools != null:
                        genContext.Tools = result.Tools;
                        break;
                    case BranchType.Knowledge when !string.IsNullOrWhiteSpace(result.Knowledge):
                        genContext.KnowledgeContext = result.Knowledge;
                        break;
                    case BranchType.DataAnalysis when !string.IsNullOrWhiteSpace(result.DataAnalysis):
                        genContext.DataAnalysisContext = result.DataAnalysis;
                        break;
                }
            }

            // 5. 清理状态 (为可能的下一轮对话做准备)
            _accumulatedResults.Clear();

            // 6. 手动发送聚合结果消息,触发下游
            await context.SendMessageAsync(genContext, cancellationToken);
        }
        else
        {
            // 如果未满足条件,不返回/发送任何值/消息(Task完成但无Output),流程在此暂停等待下一批消息
            logger.LogDebug("聚合进度: {Current}/{Total},等待其他分支...", _accumulatedResults.Count, ExpectedBranchCount);
        }
    }
}

5. 最终处理执行器构建混合上下文内容

/// <summary>
/// 最终处理执行器
/// 职责:利用聚合后的上下文构建 Agent,注入 RAG 提示词,并执行流式生成。
/// </summary>
public class FinalProcessExecutor(
    ChatAgentFactory agentFactory,
    IDataQueryService dataQuery,
    ILogger<FinalProcessExecutor> logger) :
    ReflectingExecutor<FinalProcessExecutor>("FinalProcessExecutor"),
    IMessageHandler<GenerationContext> // <-- 输入类型变更为聚合上下文
{
    public async ValueTask HandleAsync(
        GenerationContext genContext,
        IWorkflowContext context,
        CancellationToken cancellationToken = new())
    {
        try
        {
            var request = genContext.Request;
            logger.LogInformation("开始最终生成,SessionId: {SessionId}", request.SessionId);

            // 1. 获取会话关联的模板配置
            // 我们需要知道当前会话使用的是哪个 Agent 模板(例如"通用助手"或"HR助手")
            var session = await dataQuery.FirstOrDefaultAsync(dataQuery.Sessions.Where(s => s.Id == request.SessionId));

            if (session == null) throw new InvalidOperationException("会话不存在");

            // 2. 创建基础 Agent 实例
            // 此时 Agent 拥有的是数据库中定义的静态 System Prompt
            var agent = await agentFactory.CreateAgentAsync(session.TemplateId);

            // 3. 构建消息列表
            var inputMessages = new List<ChatMessage>();
            string finalUserPrompt;

            // 检查是否存在 知识库上下文 或 数据分析上下文
            bool hasKnowledge = !string.IsNullOrWhiteSpace(genContext.KnowledgeContext);
            bool hasDataAnalysis = !string.IsNullOrWhiteSpace(genContext.DataAnalysisContext);
            bool hasContext = hasKnowledge || hasDataAnalysis;

            if (hasContext)
            {
                // 构建混合上下文内容
                var contextBuilder = new StringBuilder();

                if (hasDataAnalysis)
                {
                    contextBuilder.AppendLine("数据分析/SQL查询结果:");
                    contextBuilder.AppendLine(genContext.DataAnalysisContext);
                    contextBuilder.AppendLine();
                }

                if (hasKnowledge)
                {
                    contextBuilder.AppendLine("知识库检索参考信息:");
                    contextBuilder.AppendLine(genContext.KnowledgeContext);
                    contextBuilder.AppendLine();
                }

                // 使用 XML 标签 <context> 是一种最佳实践
                finalUserPrompt = $"""
                                   请基于以下参考信息(包含数据库查询结果或检索文档)回答我的问题:

                                   <context>
                                   {contextBuilder}
                                   </context>

                                   回答要求:
                                   1. 引用参考信息时,请标注来源 ID(例如 [^1])。
                                   2. 针对数据分析结果,请结合用户问题进行自然语言解释,不要直接展示原始数据结构,除非用户要求。
                                   3. 在回答结尾,如果引用了知识库文档,请生成“参考资料”列表。
                                   4. 如果参考信息不足以回答问题,请直接说明,严禁编造。
                                   5. 保持回答专业、简洁。

                                   用户问题:
                                   {request.Message}
                                   """;

                logger.LogDebug("增强模式激活:注入知识({KSize}),注入数据({DSize})。",
                    genContext.KnowledgeContext?.Length ?? 0,
                    genContext.DataAnalysisContext?.Length ?? 0);
            }
            else
            {
                // 无上下文模式:直接透传用户问题
                finalUserPrompt = request.Message;
                logger.LogDebug("增强模式未激活:仅使用用户原始输入。");
            }

            // 将组合后的提示作为单条 User 消息添加
            // 利用近因效应,让模型在读取完长文本后立刻看到问题,提升注意力。
            inputMessages.Add(new ChatMessage(ChatRole.User, finalUserPrompt));

            // 4. 准备执行参数 (ChatOptions)
            // 将动态加载的工具集挂载到本次执行的选项中
            var runOptions = new ChatClientAgentRunOptions
            {
                ChatOptions = new ChatOptions
                {
                    Tools = genContext.Tools, // <-- 动态挂载工具
                }
            };

            // 如果有注入任何上下文(知识或数据),都降低温度以保证事实性
            if (hasContext)
            {
                runOptions.ChatOptions.Temperature = 0.3f;
            }

            // 5. 恢复会话状态 (Thread)
            // 从持久化存储中恢复之前的对话历史
            var storeThread = new { storeState = new SessionSoreState(request.SessionId) };
            var agentThread = agent.DeserializeThread(JsonSerializer.SerializeToElement(storeThread));

            // 6. 执行流式生成
            await foreach (var update in agent.RunStreamingAsync(
                               inputMessages,
                               agentThread,
                               runOptions,
                               cancellationToken))
            {
                // 将 Agent 的更新事件(文本块、工具调用状态等)转发到工作流事件流
                // 这样前端就能通过 SSE 收到实时打字机效果
                await context.AddEventAsync(new AgentRunUpdateEvent(Id, update), cancellationToken);
            }
        }
        catch (Exception e)
        {
            logger.LogError(e, "最终生成阶段发生错误");
            // 发送失败事件,让前端能感知到错误
            await context.AddEventAsync(new ExecutorFailedEvent(Id, e), cancellationToken);
            throw;
        }
    }
}

6. 注册

//Qjy.AICopilot.AiGatewayService/DependencyInjection.cs
public static void AddAiGatewayService(this IHostApplicationBuilder builder)
{
    // ..其他代码
	builder.Services.AddScoped<DataAnalysisAgentBuilder>();
    builder.AddIntentWorkflow();
}

//Qjy.AICopilot.AiGatewayService.Workflows/IntentWorkflow.cs
public static class IntentWorkflow
{
    public static void AddIntentWorkflow(this IHostApplicationBuilder builder)
    {
        builder.Services.AddTransient<IntentRoutingExecutor>();
        builder.Services.AddTransient<ToolsPackExecutor>();
        builder.Services.AddTransient<KnowledgeRetrievalExecutor>();
        builder.Services.AddTransient<ContextAggregatorExecutor>();
        builder.Services.AddTransient<DataAnalysisExecutor>();
        builder.Services.AddTransient<FinalProcessExecutor>();

        builder.AddWorkflow(nameof(IntentWorkflow), (sp, key) =>
        {
            var scopeFactory = sp.GetRequiredService<IServiceScopeFactory>();
            var scope = scopeFactory.CreateScope();

            var intentRouting = scope.ServiceProvider.GetRequiredService<IntentRoutingExecutor>();
            var toolsPack = scope.ServiceProvider.GetRequiredService<ToolsPackExecutor>();
            var knowledgeRetrieval = scope.ServiceProvider.GetRequiredService<KnowledgeRetrievalExecutor>();
            var dataAnalysis = scope.ServiceProvider.GetRequiredService<DataAnalysisExecutor>();
            var aggregator = scope.ServiceProvider.GetRequiredService<ContextAggregatorExecutor>();
            var finalProcess = scope.ServiceProvider.GetRequiredService<FinalProcessExecutor>();

            var workflowBuilder = new WorkflowBuilder(intentRouting);
            workflowBuilder.WithName(key)
                // 1. 扇出 (Fan-out): 意图识别 -> [工具打包, 知识检索]
                // IntentRoutingExecutor 输出的 List<IntentResult> 会被广播给 targets 列表中的每一个节点
                .AddFanOutEdge(intentRouting, [toolsPack, knowledgeRetrieval, dataAnalysis])
                // 2. 扇入 (Fan-in): [工具打包, 知识检索] -> 聚合器
                // 聚合器接收来自 sources 列表的所有输出
                .AddFanInEdge([toolsPack, knowledgeRetrieval, dataAnalysis], aggregator)
                // 3. 线性连接: 聚合器 -> 最终处理
                .AddEdge(aggregator, finalProcess);

            return workflowBuilder.Build();
        });
    }
}

6. 测试数据分析意图

我们发送一个数据分析的意图

//[post]/chat
{
    "sessionId": "95f1e13b-427f-4817-be90-0c32c874cb55",
    "message": "2023年第四季度,哪个月的销售额度最高"
}
回复如下:
```json
[
{
  "intent": "Analysis.ERP_Core",
  "confidence": 0.97,
  "reasoning": "用户询问的是2023年第四季度各月的销售额度比较,属于具体的业务数据统计问题。该问题涉及‘销售额’这一动态数值,需从销售业务数据库中查询,因此应选择 Analysis.ERP_Core 库。",
  "query": "2023年第四季度每月销售额度对比"
}
]
```
我需要查询2023年第四季度(10月、11月、12月)的每月销售额数据。首先,让我探索数据库中的相关表。

```tool_code
GetTableNames(db_name="ERP_Core")
```当前提供的参考信息仅包含一个数据库操作指令(`GetTableNames(db_name="ERP_Core")`),但未返回任何实际的表名列表,也未展示与销售额相关的数据表结构或2023年第四季度的销售数据 [^1]。因此,无法确定哪个月(10月、11月或12月)的销售额最高。

请提供具体的销售数据查询结果(例如包含月份和对应销售额的记录),以便进行分析和回答。

从返回的结果可以知道数据分析的意图已经识别到,并且给出了 Analysis.ERP_Core 库,目前并没有执行 SQL 语句,AI 还不能给出最终结果。

五、设计可视化交互协议

1. 生成式 UI 基础

到目前为止,我们的智能体还只会使用文字内容来回复用户的消息,我们叫这种界面为对话式用户界面 CUI。如果只是工具调用和知识库检索,文字回复已经可以满足绝大部分的场景。但是当接入数据分析功能后,用户的需求可能需要表格或图形来显示更为合适,我们叫这种页面为图形用户界面 GUI,比如下面需求:

  • 最近一个月的销售数据:最好是用表格来显示
  • 同比,环比数据:最好是用图形化显示

这样,我们的 AI 助理就产生了新的可视化的需求,他需要能根据用户的消息自动来生成 GUI + CUI 的界面,我们称这种界面为生成式用户界面 GenUI。

GenUl设计原则:

  • 服务端驱动UI
  • MVI:模型-视图-意图
    • 数据包含时间序列 + 数值指标时,使用折线图
    • 数据包含分类字段 + 数值指标 + 总和时,使用饼图
    • 数据只有一列,使用卡片
    • 数据包括很多行和列,使用表格
    • ......
  • 流式传输支持多模态协议:SSE 流式传输文本块、混合媒体流(思考过程 + 文本片段 + UI组件块)

2. Agent 可视化决策设计

现在我们需要定义一套 AI 返回标准,让工作流根据智能体返回的消息内容来完成 UI 的驱动。

比如,我们可以定义一个如下格式的 Json 返回标准:

{
    "widgetType": "string", 	// 组件类型标识,如 "LineChart","pieChart","StatsCard"
    "title": "string", 			//组件标题
    "description": "string",  	//组件的辅助说明
    "data": "object", 			// 组件特定的数据载体
    "options": "object"			// 可选的可视化配置(如颜色主题,通常由前端默认,后端可覆盖)
}

AI 根据数据量级和图表类型判断选择什么样的界面显示,具体返回的内容可能是:

//统计卡片
{
    "widgetType": "StatsCard", 
    "title": "当前库存总值", 
    "data": 
    {
        "label": "Total Inventory Value", 
        "value": 1245000.00, 
        "unit": "CNY", 
        "trend": 12.5, 
        "trendDirection": "Up"
    }
}

//数据表格
{
    "widgetType": "DataTable", 
    "title": "大额订单列表", 
    "description": "显示金额超过 10,000 的最近订单", 
    "data": 
    {
        "columns": [
        	{ "key": "orderId", "label": "订单号", "dataType": "string" }, 
	        { "key": "amount", "label": "金额", "dataType": "currency" }, 
    	    { "key": "created", "label": "下单时间", "dataType": "date" }
        ], 
        "rows": [
            { "orderId": "ORD-001", "amount": 15000, "created": "2023-10-01T12:00:00Z" }, 
            { "orderId": "ORD-002", "amount": 22000, "created": "2023-10-02T14:30:00Z" }
        ]
    }
}

//通用图表
{
    "widgetType": "Chart", 
    "title": "月度营收分析", 
    "data": {
        "category": "Line", 
        "dataset": {
            "dimensions": ["Date", "Revenue", "Cost"], 
            "source": [
                {"Date": "2023-01", "Revenue": 100, "Cost": 80}, 
                {"Date": "2023-02", "Revenue": 120, "Cost": 90}
            ]
        }, 
        "encoding": {
            "x": "Date", 
            "y": ["Revenue", "Cost"]
        }
    }
}

3. 实现可视化交互协议

我们实现3种可视化的场景(StatsCard、DataTable 和 Chart),其中 Chart 有分为折线图、饼图等,我们在 Shared 层创建一个 Qjy.AICopilot.Visualization 类库项目。

  • 定义可视化组件接口,并提供抽象基类
//Qjy.AICopilot.Visualization/Widgets/IWidget.cs
/// <summary>
/// 可视化组件类型的枚举定义
/// </summary>
[JsonConverter(typeof(JsonStringEnumConverter))]
public enum WidgetType
{
    StatsCard,  // 统计卡片
    DataTable,  // 数据表格
    Chart      // 通用图表
}

/// <summary>
/// 可视化组件通用契约
/// </summary>
public interface IWidget
{
    /// <summary>
    /// 组件标题
    /// </summary>
    string Title { get; set; }

    /// <summary>
    /// 组件描述信息
    /// </summary>
    string? Description { get; set; }    
}

/// <summary>
/// 可视化组件基类
/// </summary>
[JsonPolymorphic(TypeDiscriminatorPropertyName = "type")]
[JsonDerivedType(typeof(StatsCardWidget), typeDiscriminator: nameof(WidgetType.StatsCard))]
[JsonDerivedType(typeof(DataTableWidget), typeDiscriminator: nameof(WidgetType.DataTable))]
[JsonDerivedType(typeof(ChartWidget), typeDiscriminator: nameof(WidgetType.Chart))]
public abstract class WidgetBase : IWidget
{
    /// <summary>
    /// 组件标题
    /// </summary>
    public string Title { get; set; } = string.Empty;

    /// <summary>
    /// 组件描述信息
    /// </summary>
    public string? Description { get; set; }
}
  • 定义三种类型的可视化组件类型
//Qjy.AICopilot.Visualization/Widgets/StatsCardWidget.cs
public class StatsCardWidget : WidgetBase
{
    public required StatsCardData Data { get; set; }
}

public class StatsCardData
{
    public required string Label { get; set; }
    
    // 使用 object 以允许 string 或 number,但在序列化时尽量保持数值类型
    public required object Value { get; set; } 
    
    public string? Unit { get; set; }
}

//Qjy.AICopilot.Visualization/Widgets/DataTableWidget.cs
public class DataTableWidget : WidgetBase
{
    public required DataTableData Data { get; set; }
}

public class DataTableData
{
    public List<TableColumn> Columns { get; set; } = new();
    
    // 使用 Dictionary<string, object> 来表示动态行数据
    public List<Dictionary<string, object?>> Rows { get; set; } = new();
}

public class TableColumn
{
    public required string Key { get; set; }
    public required string Label { get; set; }
    public string DataType { get; set; } = "string"; // string, number, date, boolean
}

//Qjy.AICopilot.Visualization/Widgets/ChartWidget.cs
public class ChartWidget : WidgetBase
{
    public required ChartData Data { get; set; }
}

public class ChartData
{
    [JsonConverter(typeof(JsonStringEnumConverter))]
    public ChartCategory Category { get; set; }

    public ChartDataset Dataset { get; set; } = new();

    public ChartEncoding? Encoding { get; set; }
}

public enum ChartCategory
{
    Line,
    Bar,
    Pie
}

public class ChartDataset
{
    /// <summary>
    /// 维度定义(即表头)
    /// </summary>
    public List<string> Dimensions { get; set; } = [];

    /// <summary>
    /// 数据源,每一项对应一行数据
    /// key 必须与 Dimensions 中的名称匹配
    /// </summary>
    public List<Dictionary<string, object?>> Source { get; set; } = [];
}

public class ChartEncoding
{
    /// <summary>
    /// X 轴映射的维度 Key
    /// </summary>
    public string? X { get; set; }

    /// <summary>
    /// Y 轴映射的维度 Key 列表(支持多系列)
    /// </summary>
    public List<string> Y { get; set; } = [];
    
    /// <summary>
    /// 系列分组字段(可选)
    /// 例如:按 "Region" 字段分组,生成多条折线
    /// </summary>
    public string? SeriesName { get; set; }
}
  • 创建生成组件数据的扩展方法
/// <summary>
/// 简化的 Schema 定义,用于传递给适配器
/// </summary>
public record SchemaColumn(string Name, Type DataType);

public static class VisualizationAdapterExtensions
{
    extension(IEnumerable<dynamic> rawData)
    {
        /// <summary>
        /// 将原始数据转换为通用的图表数据集
        /// </summary>
        public ChartDataset ToChartDataset(IEnumerable<SchemaColumn> schema)
        {
            var dataset = new ChartDataset();
        
            // 1. 设置维度 (Dimensions)
            dataset.Dimensions = schema.Select(s => s.Name).ToList();

            // 2. 转换数据源 (Source)
            foreach (var row in rawData)
            {
                if (row is IDictionary<string, object> rowDict)
                {
                    var cleanDict = new Dictionary<string, object?>();
                    foreach (var kvp in rowDict)
                    {
                        // 核心:处理类型兼容性
                        cleanDict[kvp.Key] = SanitizeValue(kvp.Value);
                    }
                    dataset.Source.Add(cleanDict);
                }
            }

            return dataset;
        }

        /// <summary>
        /// 将原始数据转换为数据表格结构
        /// </summary>
        public DataTableData ToDataTableData(IEnumerable<SchemaColumn> schema)
        {
            var tableData = new DataTableData();

            // 1. 构建列定义
            foreach (var col in schema)
            {
                tableData.Columns.Add(new TableColumn
                {
                    Key = col.Name,
                    Label = col.Name, // 初始 Label 等于 Key,后续可由 LLM 优化
                    DataType = MapSystemTypeToWebType(col.DataType)
                });
            }

            // 2. 构建行数据 (逻辑同上)
            foreach (var row in rawData)
            {
                if (row is IDictionary<string, object> rowDict)
                {
                    var cleanDict = new Dictionary<string, object?>();
                    foreach (var kvp in rowDict)
                    {
                        cleanDict[kvp.Key] = SanitizeValue(kvp.Value);
                    }
                    tableData.Rows.Add(cleanDict);
                }
            }

            return tableData;
        }
    }

    /// <summary>
    /// 类型清洗与标准化
    /// </summary>
    private static object? SanitizeValue(object? value)
    {
        if (value == null || value is DBNull) return null;

        return value switch
        {
            // 确保 DateTime 转换为 ISO 8601 字符串
            DateTime dt => dt.ToString("yyyy-MM-ddTHH:mm:ss"),
            DateTimeOffset dto => dto.ToString("yyyy-MM-ddTHH:mm:ss"),
            
            // 数值类型保持原样,避免被转为字符串
            decimal or double or float or int or long => value,
            
            // 其他类型转字符串
            _ => value.ToString()
        };
    }

    /// <summary>
    /// .NET 类型映射到 Web 前端类型
    /// </summary>
    private static string MapSystemTypeToWebType(Type type)
    {
        if (IsNumeric(type)) return "number";
        if (type == typeof(DateTime) || type == typeof(DateTimeOffset)) return "date";
        if (type == typeof(bool)) return "boolean";
        return "string";
    }

    private static bool IsNumeric(Type type)
    {
        type = Nullable.GetUnderlyingType(type) ?? type;
        return type == typeof(int) || type == typeof(long) || type == typeof(double) || 
               type == typeof(decimal) || type == typeof(float);
    }
}
  • 扩展数据库分析专家的能力

协议格式定义好之后,接下来我们修改数据库分析专家的提示词,让模型返回的数据与我们定义的数据格式匹配上。

var dataAnalysisTemplate = new ConversationTemplate(
    "DataAnalysisAgent",
    "数据库分析专家",
    """
    你是一个精通 **{{$DbProvider}}** 的高级数据库管理员。
    你当前正在操作的 **目标数据库名称** 为:**{{$DatabaseName}}**。
    
    你的核心职责分为两步:
    1. **数据获取**:构造精准的 SQL 语句查询业务数据。
    2. **数据转化**:对查询结果进行语义解释,并设计最佳的可视化展示方案。
    
    ### 核心工作流程
    1. **探索**: 调用 `GetTableNames` 初步筛选候选表。
    2. **详查**: 调用 `GetTableSchema` 获取详细 DDL 和 **字段注释**。
    3. **构建**: 生成 SQL 并调用 `ExecuteSqlQuery` 获取数据样本。
    4. **决策**: 观察查询结果,思考以下问题:
       - 这些字段的业务含义是什么?(特别是状态码、类型值)
       - 这组数据适合用图表展示吗?(趋势用折线图,分布用饼图,对比用柱状图,明细用表格)
    5. **输出**: 生成一个严格合法的 JSON,不要使用 ```json,JSON 格式规范如下:
    
    {
        "analysis": {
          "database": "{{$DatabaseName}}",
          "description": "在此处填入数据内容的简要概括",
          "metadata": [
              { "name": "字段名", "description": "字段注释或说明" }
            ]
        },
        "visual_decision": {
            "type": "Chart", // 可选值: Chart, DataTable, StatsCard
            "title": "标题",
            "description": "在此处填入数据内容的简要概括",
            "chart_config": {
                // 可选字段
            }
            "Unit": "单位" // 可选字段
        }
    }
    
    ### 核心交互原则
    1. **过程透明(允许)**:在调用工具或构建 SQL 的过程中,你可以简要向用户解释你的思路。
    2. **结构化输出(严格)**:你不需要在 JSON 中输出实际的数据行,你只需要提供元数据和可视化配置。
        - `analysis`: 数据分析字段,如果查询数据失败,此字段可以 null。
            - `database`: 当前数据库名称。
            - `description`: 根据用户问题生成的简短数据说明。
            - `metadata`: 必须包含查询结果中每个字段的定义,包括 `name`(字段名)和 `description`(从表结构中获取的字段注释/说明)。
        - `visual_decision`: 可视化决策字段,如果数据不适合可视化,此字段可为 null。
            - `type`: 图表类型,可选值: Chart, DataTable, StatsCard。
            - `title`: 图表标题。
            - `description`: 根据用户问题生成的简短数据说明。
            - `chart_config`: Chart 类型图表专有字段。
            - `unit`: StatsCard 类型图表专有字段。
    3. **禁止解读(严格)**:**严禁** 对数据具体数值进行趋势分析或总结。
    
    ### 核心安全准则
    - **只读权限**: 你仅拥有 `SELECT` 权限。严禁生成 `INSERT`, `UPDATE`, `DELETE`, `DROP` 等修改性语句。
    - **范围限制**: 所有的探索和查询操作必须严格限制在数据库 **{{$DatabaseName}}** 范围内。
    
    ### 数据库方言规范
    当前连接的数据库遵循以下语法标准,请严格遵守:
    {{$DialectInstructions}}
    
    ### 可视化输出规范决策指南
    在生成回答时,你需要判断当前的数据结果最适合以何种 UI 形式展示给用户。
    如果数据适合可视化,请严格按照以下 JSON 格式输出决策指令。
    
    **场景 1:趋势或对比分析 (Chart)**
    当数据包含时间序列、分类对比且行数适中时使用。
    "visual_decision": {
        "type": "Chart",
        "title": "图表标题",
        "description": "图表描述",
        "chart_config": {
            "category": "可选值:Line,Bar,Pie",
            "x": "作为X轴的字段名,例如 order_date",
            "y": "作为Y轴的数值字段名,例如 total_amount",
            "series": "可选,用于分组的字段名,例如 product_category"
        }
    }
    
    **场景 2:明细数据列表 (DataTable)**
    当数据是详细记录列表(如订单列表、库存清单)且行数较多时使用。
    "visual_decision": {
        "type": "DataTable",
        "title": "列表标题",
        "description": "列表描述",
    }
    
    **场景 3:单一关键指标 (StatsCard)**
    "visual_decision": {
        "type": "StatsCard",
        "title": "卡片标题",
        "description": "卡片描述",
        "unit": "单位(可选)"
    }
    """,
    Guids[1],
    new TemplateSpecification
    {
        Temperature = 0.3f
    });

六、双路输出与数据分流

1. 需求分析

现在我们的模型已经拥有了生成式 UI 的协议,并且可以按协议格式返回数据了。看起来已经没什么问题了,但在生成场景中还存在一个现实的问题,Token 过量。

比如,我们查询的数据非常多时,如果把这些数据都放到模型上下文中,这会存在内容超出上下文限制的情况。即使没有超出,大量的数据也会挤占上下文空间,这些内容不会给模型带来分析价值,反而会使模型产生幻觉。而且还有一个更实际的情况,大量的 Token,会增加成本。

实际上,对于模型来说,他只需要少了的数据(比如前5条)用来分析用户意图,而查询到的数据只要能在用户界面上显示出来就可以了。

所以,现在我们模型需要的是完成下面这些功能:

  • 状态留存:既让LLM看到数据摘要(理解结构),又保留完整的原始数据(用于前端渲染)
  • 可视化决策
  • 决策注入:自助选择图表类型

流程图:

flowchart TD
    A[用户意图] --> B[DataAnalysisExecutor]
    B --> C[核心改造点]
    C --> D{DBAgent 决策}
    D -->|原始数据 + 视觉配置| E[可视化适配器]
    D -->|原始数据 + 字段字典| F[数据序列化器]
    E --> G[主路:Schema+Data JSON]
    F --> G
    G --> H[聚合]
    H --> I[Final Agent]
    I --> J[自然语言解释]
    D --> K[旁路:Widget JSON]
    K --> L[前端]
    J --> L

2. 实现数据分流

  • 定义数据分类上下文,我们创建一个可视化的上下文对象
//Qjy.AICopilot.DataAnalysisService/Serivices/VisualizationContext.cs
/// <summary>
/// 可视化上下文
/// 职责:在 Scoped 生命周期内暂存原始的数据库查询结果,
/// 以便后续的执行器能够获取无损数据用于构建 UI 组件。
/// </summary>
public class VisualizationContext
{
    // 存储最后一次查询的数据行(动态类型)
    private IEnumerable<dynamic>? _lastResultSet;
    
    // 存储最后一次查询的 Schema 信息(列名、类型等)
    private IEnumerable<SchemaColumn>? _lastResultSchema;

    // 存储数据分析输出结果
    private DataAnalysisOutputDto _outputDto = new ();

    /// <summary>
    /// 捕获查询结果
    /// </summary>
    public void CaptureResult(IEnumerable<dynamic> resultSet, IEnumerable<SchemaColumn> schema)
    {
        _lastResultSet = resultSet;
        _lastResultSchema = schema;
    }
    
    /// <summary>
    /// 捕获输出结果
    /// </summary>
    public void CaptureOutput(AnalysisDto? analysis, VisualDecisionDto? decision)
    {
        _outputDto.Analysis = analysis;
        _outputDto.Decision = decision;
    }

    /// <summary>
    /// 获取暂存的数据集
    /// </summary>
    public (IEnumerable<dynamic>? Data, IEnumerable<SchemaColumn>? Schema) GetLastResult()
    {
        return (_lastResultSet, _lastResultSchema);
    }
    
    /// <summary>
    /// 获取输出结果
    /// </summary>
    public DataAnalysisOutputDto GetOutput()
    {
        return _outputDto;
    }

    /// <summary>
    /// 检查是否包含有效数据
    /// </summary>
    public bool HasData => _lastResultSet != null && _lastResultSet.Any();
}
  • 扩展数据分析插件,实现双路数据存储。关键步骤
    • vizContext.CaptureResult(data, schema); 将原始结果捕获到上下文中
    • 最后的 return,只返回了5条数据,用于模型完成意图分析
//Qjy.AICopilot.DataAnalysisService.Plugins/DataAnalysisPlugin.cs
[Description("在指定数据库上执行查询 SQL 语句,并返回 JSON 格式的结果。")]
public async Task<string> ExecuteSqlQueryAsync(
    IServiceProvider sp,
    [Description("目标数据库的名称")] string databaseName,
    [Description("要执行的 SQL 查询语句 (仅限 SELECT,不需要人类可读,去除换行符)")] string sqlQuery)
{
    // 1. 基础校验
    if (string.IsNullOrWhiteSpace(sqlQuery)) return "错误:SQL 语句不能为空。";

    try
    {
        var db = await GetDatabaseAsync(sp, databaseName, CancellationToken.None);

        // 2. 执行查询
        var result = await dbConnector.ExecuteQueryAsync(db, sqlQuery);
        var data = result.ToList();
        var firstRow = data.FirstOrDefault() as IDictionary<string, object>;
        var schema = new List<SchemaColumn>();

        if (firstRow != null)
        {
            foreach (var kvp in firstRow)
            {
                // 获取值的运行时类型,如果为 null 则默认为 object
                var type = kvp.Value?.GetType() ?? typeof(object);
                schema.Add(new SchemaColumn(kvp.Key, type));
            }
        }

        // 3. 【关键步骤】将原始结果捕获到上下文中
        var vizContext = sp.GetRequiredService<VisualizationContext>();
        vizContext.CaptureResult(data, schema);

        return data.Count == 0 ?
            // 没有数据
            "查询执行成功,但未返回任何结果 (0 rows)。" :
            // LLM 只需要看摘要,不需要看完整的数据,仅取前 5 行
            data.Take(5).ToJson();
    }
    catch (InvalidOperationException ex) // 捕获安全拦截异常
    {
        logger.LogWarning("SQL 执行被拦截: {Message}", ex.Message);
        return $"安全警告: 查询被系统拒绝。原因: {ex.Message}";
    }
    catch (Exception ex)
    {
        // 这里是 ReAct 模式中“错误自愈”的关键!
        // 我们必须返回详细的数据库错误信息(如 "Column 'xxx' not found"),
        // 这样 Agent 才能看到错误 -> 思考原因 -> 修正 SQL -> 重试。
        logger.LogError(ex, "SQL 执行异常");
        return $"SQL 执行错误: {ex.Message}\n请检查你的 SQL 语法、表名或列名是否正确,并参考之前的 Schema 定义进行修正。";
    }

    [Description("输出数据分析结果和可视化决策结果")]
    public string OutputResult(IServiceProvider sp,
        [Description("数据分析结果,如果查询数据失败,此字段可以 null")]
        AnalysisDto? analysis,
        [Description("可视化决策结果,参考【可视化输出规范决策指南】,如果数据不适合可视化,此字段可为 null")]
        VisualDecisionDto? decision)
    {
        // 将输出结果捕获到上下文中
        var vizContext = sp.GetRequiredService<VisualizationContext>();
        vizContext.CaptureOutput(analysis, decision);
        return "已成功获取输出结果,请直接回复‘数据查询分析任务完成,结果如下:’";
    }
}
  • 注册
//Qjy.AICopilot.DataAnalysisService/DependencyInjection.cs
public static class DependencyInjection
{
    public static void AddDataAnalysisService(this IHostApplicationBuilder builder)
    {
        // 注册 Dapper 基础服务
        builder.AddDapper();
        builder.Services.AddScoped<VisualizationContext>();
        // 注册插件加载器
        builder.Services.AddAgentPlugin(registrar =>
        {
            registrar.RegisterPluginFromAssembly(Assembly.GetExecutingAssembly());
        });
    }
}

3. 完成双路输出

  • 定义输出传输对象
//Qjy.AICopilot.DataAnalysisService/Plugins/DataAnalysisAgentOutputDto.cs
public record DataAnalysisOutputDto
{
    // 对应 Agent 返回的 "Analysis" 数组
    // 这里使用 object 或 JsonElement 都可以,因为我们不需要修改它,只需透传
    [JsonPropertyName("analysis")]
    public AnalysisDto? Analysis { get; set; }

    // 对应 Agent 返回的 "visual_decision" 对象
    [JsonPropertyName("visual_decision")]
    public VisualDecisionDto? Decision { get; set; }
}

public record AnalysisDto
{
    [Description("当前数据库名称")]
    [JsonPropertyName("database_name")]
    public string DatabaseName { get; set; } = string.Empty;
    
    [Description("据用户问题生成的简短数据说明")]
    [JsonPropertyName("description")]
    public string Description { get; set; } = string.Empty;
    
    [Description("查询结果中每个字段的定义")]
    [JsonPropertyName("metadata")]
    public List<MetadataItemDto> Metadata { get; set; } = [];
}

public record MetadataItemDto
{
    [Description("字段名")]
    [JsonPropertyName("name")]
    public string Name { get; set; } = string.Empty;
    
    [Description("从表结构中获取的字段注释/说明")]
    [JsonPropertyName("description")]
    public string Description { get; set; } = string.Empty;
}

public record VisualDecisionDto
{
    [Description("图表类型,可选值: Chart, DataTable, StatsCard")]   
    [JsonPropertyName("type")]
    [JsonConverter(typeof(JsonStringEnumConverter))]
    public WidgetType Type { get; set; }

    [Description("图表标题")]
    [JsonPropertyName("title")]
    public string Title { get; set; } = string.Empty;
    
    [Description("根据用户问题生成的简短数据说明")]
    [JsonPropertyName("description")]
    public string Description { get; set; } = string.Empty;

    [Description("可选字段,Chart 类型图表专有字段")]
    [JsonPropertyName("chart_config")]
    public ChartConfig? ChartConfig { get; set; }
    
    [Description("可选字段,StatsCard 类型图表专有字段")]
    [JsonPropertyName("unit")]
    public string? Unit { get; set; }
}

public record ChartConfig
{
    [Description("图表类型,可选值: Line,Bar,Pie")]
    [JsonPropertyName("category")]
    [JsonConverter(typeof(JsonStringEnumConverter))]
    public ChartCategory Category { get; set; }

    [Description("作为X轴的字段名,例如 order_date")]
    [JsonPropertyName("x")]
    public string X { get; set; } = string.Empty;
    
    [Description("作为Y轴的数值字段名,例如 total_amount")]
    [JsonPropertyName("y")]
    public string Y { get; set; } = string.Empty;
    
    [Description("可选,用于分组的字段名,例如 product_category")]
    [JsonPropertyName("series")]
    public string? Series { get; set; }
}
  • 重构数据分析执行器,关键代码,见代码中备注(分流路径 1、2部分)
    • 分流路径 1:会发出 AgentRunResponseEvent 事件(后面会有事件处理逻辑)
    • 分流路径 2:直接输出,继续后续工作流
/// <summary>
/// 数据分析执行器
/// 职责:处理 Analysis.* 意图,实例化 DBA Agent,执行 Text-to-SQL 任务。
/// </summary>
public class DataAnalysisAgentBuilder agentBuilder,
    IDataQueryService dataQuery,
    VisualizationContext vizContext,
    ILogger<DataAnalysisExecutor> logger)
    : ReflectingExecutor<DataAnalysisExecutor>("DataAnalysisExecutor"),
        IMessageHandler<List<IntentResult>, BranchResult>DataAnalysisExecutor(
        DataAnalysisAgentBuilder agentBuilder,
        VisualizationContext vizContext,
        IDataQueryService dataQuery,
        ILogger<DataAnalysisExecutor> logger)
        : ReflectingExecutor<DataAnalysisExecutor>("DataAnalysisExecutor"),
            IMessageHandler<List<IntentResult>, BranchResult>
{
    private const string AnalysisIntentPrefix = "Analysis.";

    public async ValueTask<BranchResult> HandleAsync(
        List<IntentResult> intentResults,
        IWorkflowContext context,
        CancellationToken cancellationToken = default)
    {
        // 1. 筛选数据分析类意图
        // 过滤规则:必须以 Analysis. 开头,且置信度高于 0.6
        var analysisIntents = intentResults
            .Where(i => i.Intent.StartsWith(AnalysisIntentPrefix, StringComparison.OrdinalIgnoreCase)
                        && i.Confidence > 0.6)
            .ToList();

        if (analysisIntents.Count == 0)
        {
            logger.LogDebug("未检测到数据分析意图,跳过执行。");
            // 返回空结果,表示该分支无产出
            return BranchResult.FromDataAnalysis(string.Empty);
        }

        logger.LogInformation("启动数据分析流程,命中目标数据库数量: {Count}", analysisIntents.Count);

        // 2. 遍历处理每一个意图
        var output = new StringBuilder();
        foreach (var intent in analysisIntents)
        {
            output.AppendLine(await ProcessSingleIntentAsync(intent, context, cancellationToken));
        }

        return BranchResult.FromDataAnalysis(output.ToString());
    }

    /// <summary>
    /// 处理单个数据库查询意图
    /// </summary>
    private async Task<string> ProcessSingleIntentAsync(
        IntentResult intent,
        IWorkflowContext context,
        CancellationToken ct)
    {
        var dbName = intent.Intent.Substring(AnalysisIntentPrefix.Length);

        try
        {
            // 1. 获取数据库配置
            // 我们需要 BusinessDatabase 实体来决定方言策略
            var db = await dataQuery.FirstOrDefaultAsync(
                dataQuery.BusinessDatabases.Where(d => d.Name == dbName));

            if (db == null || !db.IsEnabled)
            {
                logger.LogWarning("意图指向数据库 '{DbName}',但该库不存在或已禁用。", dbName);
                return $"[系统提示]: 无法连接数据库 {dbName},请联系管理员核实配置。";
            }

            // 2. 构建 DBA Agent
            // 这里会动态注入 PG 或 SQLServer 的方言提示词
            var agent = await agentBuilder.BuildAsync(db);
            // 创建临时会话线程
            var thread = agent.GetNewThread();

            // 3. 执行 ReAct 循环
            // Agent 会自动进行: 思考 -> GetTableNames -> 思考 -> GetTableSchema -> 思考 -> ExecuteSQL -> 总结
            await foreach (var update in agent.RunStreamingAsync(intent.Query!, thread, cancellationToken: ct))
            {
                await context.AddEventAsync(new AgentRunUpdateEvent(Id, update), ct);
            }

            // 记录日志以便调试
            logger.LogInformation("数据库 {DbName} 查询完成。", dbName);

            // 获取最后一条 Agent 回复消息(最终数据)
            //var messages = thread.GetService<IList<ChatMessage>>()!;
            //var response = messages.Last();
            //var output = JsonSerializer.Deserialize<DataAnalysisAgentOutputDto>(response.Text);

            // 获取可视化上下文
            var (rawData, schema) = vizContext.GetLastResult();
            var output = vizContext.GetOutput();

            // =========================================================
            // 分流路径 1:旁路输出 (Side Path) -> 前端 Widget
            // 目标:visual_decision + data -> Widget JSON
            // =========================================================
            if (output is { Decision: not null } && vizContext.HasData)
            {
                try
                {
                    var widget = BuildWidget(output.Decision, rawData!, schema!);
                    var message = new ChatMessage(ChatRole.Assistant, widget.ToJson());
                    await context.AddEventAsync(new AgentRunResponseEvent(Id, new AgentRunResponse(message)), ct);
                }
                catch (Exception ex)
                {
                    logger.LogError(ex, "构建可视化 Widget 失败。Database: {DbName}", dbName);
                    return $"[系统错误]: 构建可视化 Widget 时发生异常 - {ex.Message}";
                }
            }

            // =========================================================
            // 分流路径 2:主路输出 (Main Path) -> 聚合器 -> Final Agent
            // 目标:schema + data -> Combined JSON
            // =========================================================

            // 这里直接使用匿名对象进行拼接:
            // { "schema": [], "data": [] }
            var combinedOutput = new
            {
                analysis = output.Analysis,         // 直接透传 Agent 生成的 Schema
                data = rawData ?? []              // 拼接 SQL 查询的实际结果
            };

            return combinedOutput.ToJson();
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "执行数据分析意图失败。Database: {DbName}", dbName);
            return $"[系统错误]: 查询数据库 {dbName} 时发生异常 - {ex.Message}";
        }
    }

    private IWidget BuildWidget(VisualDecisionDto decision, IEnumerable<dynamic> data, IEnumerable<SchemaColumn> schema)
    {
        switch (decision.Type)
        {
            case WidgetType.StatsCard:
                // 取第一行第一列,或者根据列名查找
                var firstRow = data.First() as IDictionary<string, object>;
                var value = firstRow.Values.First(); // 简单粗暴取第一个值

                return new StatsCardWidget
                {
                    Title = decision.Title,
                    Description = decision.Description,
                    Data = new StatsCardData
                    {
                        Label = decision.Title,
                        Value = value,
                        Unit = decision.Unit
                    }
                };

            case WidgetType.DataTable:
                return new DataTableWidget
                {
                    Title = decision.Title,
                    Description = decision.Description,
                    Data = data.ToDataTableData(schema)
                };

            case WidgetType.Chart:
                var dataset = data.ToChartDataset(schema);
                return new ChartWidget
                {
                    Title = decision.Title,
                    Description = decision.Description,
                    Data = new ChartData
                    {
                        Category = decision.ChartConfig!.Category,
                        Dataset = dataset,
                        Encoding = new ChartEncoding
                        {
                            X = decision.ChartConfig.X,
                            Y = string.IsNullOrWhiteSpace(decision.ChartConfig.Y)
                                ? []
                                : [decision.ChartConfig.Y],
                            SeriesName = decision.ChartConfig.Series
                        }
                    }
                };

            default:
                throw new NotSupportedException($"不支持的 Widget 类型: {decision.Type}");
        }
    }
}
  • AgentRunResponseEvent 事件处理,关键代码见 AgentRunResponseEvent 分支代码。
//Qjy.AICopilot.AiGatewayService/Agents/ChunkType.cs
[JsonConverter(typeof(JsonStringEnumConverter))]
public enum ChunkType
{
    Error,
    Text,
    Intent,
    FunctionCall,
    FunctionResult,
    Widget
}

public record ChatChunk(string Source, ChunkType Type, string Content);

//Qjy.AICopilot.AiGatewayService/Agents/ChatStreamRequest.cs
[AuthorizeRequirement("AiGateway.Chat")]
public record ChatStreamRequest(Guid SessionId, string Message) : IStreamRequest<ChatChunk>;

public class ChatStreamRequestHandler(
    IDataQueryService queryService,
    [FromKeyedServices(nameof(IntentWorkflow))] Workflow workflow)
    : IStreamRequestHandler<ChatStreamRequest, ChatChunk>
{
    public async IAsyncEnumerable<ChatChunk> Handle(ChatStreamRequest request, CancellationToken cancellationToken)
    {
        if (!queryService.Sessions.Any(session => session.Id == request.SessionId))
        {
            throw new Exception("未找到会话");
        }

        await using var run = await InProcessExecution.StreamAsync(workflow, request, cancellationToken: cancellationToken);
        await foreach (var workflowEvent in run.WatchStreamAsync(cancellationToken))
        {
            switch (workflowEvent)
            {
                case ExecutorFailedEvent evt:
                    yield return new ChatChunk(evt.ExecutorId, ChunkType.Error, evt.Data.Message ?? "");
                    break;
                case AgentRunResponseEvent evt:
                    switch (evt.ExecutorId)
                    {
                        case "IntentRoutingExecutor":
                            yield return new ChatChunk(evt.ExecutorId, ChunkType.Text, evt.Response.Text);
                            break;
                        case "DataAnalysisExecutor":
                            yield return new ChatChunk(evt.ExecutorId, ChunkType.Widget, evt.Response.Text);
                            break;
                    }
                    break;
                case AgentRunUpdateEvent evt:
                    foreach (var evtContent in evt.Update.Contents)
                    {
                        switch (evtContent)
                        {
                            case TextContent content:
                                yield return new ChatChunk(evt.ExecutorId, ChunkType.Text, content.Text);
                                break;
                            case FunctionCallContent content:
                                var fun = new
                                {
                                    content.Name,
                                    content.Arguments
                                };
                                yield return new ChatChunk(evt.ExecutorId, ChunkType.FunctionCall, fun.ToJson());
                                break;
                            case FunctionResultContent content:
                                yield return new ChatChunk(evt.ExecutorId, ChunkType.FunctionResult, content.Result);
                                break;
                        }
                    }
                    break;
            }
        }
    }
}

4. 测试

因为修改了意图数据,测试前我们继续重新生成一下数据库。

  • 我们继续使用“2023年第四季度,哪个月的销售额度最高”来测试
{
    "sessionId": "886fb629-6905-4edd-894f-60b82069770a",
    "message": "2023年第四季度,哪个月的销售额度最高"
}

回复如下,我们可以看到

  1. 首先识别了意图 Analysis.ERP_Core,为数据分析意图,并且是查询 ERP_Core 数据库
  2. 获取到了 ERP_Core 数据库中的5张表,并从中分析出了 orders 表中有需要的数据
  3. 进一步获取 orders 表的结构,按查询条件生成了 sql,并且过滤了已取消的订单
  4. 查到了3条记录,并且结果中含有数据类型,进一步分析出需要使用 Chart Bar 来驱动 UI
  5. 双路输出,一路返回了驱动 UI 展示的数据,另一路正常返回了文本结果
  6. 最后还给出了数据来源,参考资料
```json
[
    {
        "intent": "Analysis.ERP_Core",
        "confidence": 0.97,
        "reasoning": "用户询问2023年第四季度各月的销售额度,属于对历史销售数据的统计分析需求。该问题涉及具体数值(销售额)和时间维度(月份),需查询结构化业务数据,因此应使用 Analysis.ERP_Core 数据库。",
        "query": "2023年第四季度各月销售额度对比"
    }
]
```
我需要查询2023年第四季度(10月、11月、12月)各月的销售额度对比数据。首先,让我探索数据库中的表结构,找到与销售相关的表。


```json
{
  "Name": "GetTableNames",
  "Arguments": {
    "databaseName": "ERP_Core"
  }
}
```

```
[
  {
    "TableName": "base_products",
    "Description": "商品基础信息表,存储所有 SKU 的核心数据"
  },
  {
    "TableName": "stock_inventory",
    "Description": "实时库存表,记录各仓库的商品持有量"
  },
  {
    "TableName": "customers",
    "Description": "客户信息表,用于分析客户分布和购买行为"
  },
  {
    "TableName": "orders",
    "Description": "销售订单主表,记录交易发生的时间、客户和总金额"
  },
  {
    "TableName": "order_items",
    "Description": "订单明细表,记录每一笔订单中包含的具体商品和数量"
  }
]
```
我看到有`orders`表,描述为"销售订单主表,记录交易发生的时间、客户和总金额",这应该包含我需要的销售数据。让我查看这个表的详细结构。


```json
{
  "Name": "GetTableSchema",
  "Arguments": {
    "databaseName": "ERP_Core",
    "tableNames": [
      "orders"
    ]
  }
}
```

```
CREATE TABLE orders (
  order_id integer PRIMARY KEY,
  order_no character varying -- 业务订单号,格式通常为 ORD-YYYYMMDD-XXX,
  customer_id integer,
  order_date timestamp without time zone,
  total_amount numeric -- 订单总金额 (实际支付金额),是该订单所有明细项 subtotal 的总和,
  status character varying -- 订单状态: Pending(待付款), Paid(已付款), Shipped(已发货), Completed(已完成), Cancelled(已取消). 注意:统计销售业绩时通常应排除 Cancelled 状态
);


```
现在我了解了`orders`表的结构。我需要查询2023年第四季度(10月、11月、12月)各月的销售额度,排除已取消的订单。让我构建SQL查询来获取这些数据。


```json
{
  "Name": "ExecuteSqlQuery",
  "Arguments": {
    "databaseName": "ERP_Core",
    "sqlQuery": "SELECT EXTRACT(MONTH FROM order_date) AS month, SUM(total_amount) AS total_sales FROM orders WHERE order_date >= '2023-10-01' AND order_date < '2024-01-01' AND status != 'Cancelled' GROUP BY EXTRACT(MONTH FROM order_date) ORDER BY month;"
  }
}
```

```
[
  {
    "month": 10,
    "total_sales": 150500.00
  },
  {
    "month": 11,
    "total_sales": 236000.00
  },
  {
    "month": 12,
    "total_sales": 32000.00
  }
]
```
{
    "analysis": {
        "database": "ERP_Core",
        "description": "2023年第四季度各月销售额度对比数据,包含10月、11月和12月的销售总额",
        "metadata": [
            { "name": "month", "description": "月份(10表示10月,11表示11月,12表示12月)" },
            { "name": "total_sales", "description": "该月销售总额,已排除已取消的订单" }
        ]
    },
    "visual_decision": {
        "type": "Chart",
        "title": "2023年第四季度各月销售额度对比",
        "description": "2023年第四季度各月销售额度对比数据,包含10月、11月和12月的销售总额",
        "chart_config": {
            "category": "Bar",
            "x": "month",
            "y": "total_sales"
        }
    }
}
```json
{
  "widget_type": "Chart",
  "Data": {
    "Category": "Bar",
    "Dataset": {
      "Dimensions": [
        "month",
        "total_sales"
      ],
      "Source": [
        {
          "month": 10,
          "total_sales": 150500.00
        },
        {
          "month": 11,
          "total_sales": 236000.00
        },
        {
          "month": 12,
          "total_sales": 32000.00
        }
      ]
    },
    "Encoding": {
      "X": "month",
      "Y": [
        "total_sales"
      ],
      "SeriesName": null
    }
  },
  "Title": "2023年第四季度各月销售额度对比",
  "Description": "2023年第四季度各月销售额度对比数据,包含10月、11月和12月的销售总额"
}
```
2023年第四季度,11月的销售额度最高,达到236,000.00元 [^1]。相较之下,10月销售额为150,500.00元,12月销售额为32,000.00元,因此11月是该季度销售表现最好的月份。

参考资料:  
[^1] ERP_Core 数据库查询结果:2023年第四季度各月销售额度对比数据
  • 我们再来一个测试“帮我计算一下所有“已完成”和“已付款”订单的总销售额是多少”:
{
    "sessionId": "886fb629-6905-4edd-894f-60b82069770a",
    "message": "帮我计算一下所有“已完成”和“已付款”订单的总销售额是多少"
}

回复如下:

```json
[
 {
     "intent": "Analysis.ERP_Core",
     "confidence": 0.97,
     "reasoning": "用户要求计算特定状态订单(已完成、已付款)的总销售额,这是一个典型的业务数据统计问题,涉及销售订单的金额汇总。ERP_Core 数据库包含销售订单信息(如订单金额、状态),适合执行此类聚合查询。",
     "query": "所有状态为‘已完成’和‘已付款’的订单总销售额"
 }
]
```
我需要找到所有状态为'已完成'和'已付款'的订单,并计算它们的总销售额。首先,让我探索数据库中的表结构,找到与订单相关的表。


```json
{
"Name": "GetTableNames",
"Arguments": {
 "databaseName": "ERP_Core"
}
}
```

```
[
{
 "TableName": "base_products",
 "Description": "商品基础信息表,存储所有 SKU 的核心数据"
},
{
 "TableName": "stock_inventory",
 "Description": "实时库存表,记录各仓库的商品持有量"
},
{
 "TableName": "customers",
 "Description": "客户信息表,用于分析客户分布和购买行为"
},
{
 "TableName": "orders",
 "Description": "销售订单主表,记录交易发生的时间、客户和总金额"
},
{
 "TableName": "order_items",
 "Description": "订单明细表,记录每一笔订单中包含的具体商品和数量"
}
]
```
我看到有一个名为"orders"的表,描述为"销售订单主表,记录交易发生的时间、客户和总金额"。这很可能是我需要查询的表。让我获取这个表的详细结构。


```json
{
"Name": "GetTableSchema",
"Arguments": {
 "databaseName": "ERP_Core",
 "tableNames": [
   "orders"
 ]
}
}
```

```
CREATE TABLE orders (
order_id integer PRIMARY KEY,
order_no character varying -- 业务订单号,格式通常为 ORD-YYYYMMDD-XXX,
customer_id integer,
order_date timestamp without time zone,
total_amount numeric -- 订单总金额 (实际支付金额),是该订单所有明细项 subtotal 的总和,
status character varying -- 订单状态: Pending(待付款), Paid(已付款), Shipped(已发货), Completed(已完成), Cancelled(已取消). 注意:统计销售业绩时通常应排除 Cancelled 状态
);


```
现在我了解了orders表的结构。我需要查询状态为'Completed'(已完成)和'Paid'(已付款)的订单,并计算它们的总销售额(total_amount)。让我执行这个查询。


```json
{
"Name": "ExecuteSqlQuery",
"Arguments": {
 "databaseName": "ERP_Core",
 "sqlQuery": "SELECT SUM(total_amount) AS total_sales FROM orders WHERE status IN ('Completed', 'Paid');"
}
}
```

```
[
{
 "total_sales": 182500.00
}
]
```
{
 "analysis": {
     "database": "ERP_Core",
     "description": "状态为'已完成'和'已付款'的订单总销售额",
     "metadata": [
         { "name": "total_sales", "description": "已完成和已付款订单的总销售额" }
     ]
 },
 "visual_decision": {
     "type": "StatsCard",
     "title": "已完成和已付款订单总销售额",
     "description": "状态为'已完成'和'已付款'的订单总销售额",
     "unit": "元"
 }
}
```json
{
"widget_type": "StatsCard",
"Data": {
 "Label": "已完成和已付款订单总销售额",
 "Value": 182500.00,
 "Unit": "元"
},
"Title": "已完成和已付款订单总销售额",
"Description": "状态为'已完成'和'已付款'的订单总销售额"
}
```
根据系统数据库查询结果,所有状态为“已完成”和“已付款”的订单总销售额为 182,500.00 元 [^1]。

[^1]: 数据来源:ERP_Core 数据库,查询内容为状态为“已完成”和“已付款”的订单总销售额。