Spiga

云原生电商微服务实战1:搭建基础框架

2024-09-07 22:29:37

	一直以来想专心写一个基于云原生的微服务案例,但总角色要写的内容太多了。电商案例本身并不会太难,而要搭建好一整套云原生和微服务的基础设施,并不是一项轻松的工作。随着.Net Asprise的推出,再集成Dapr,开发和生产几乎可以保持一样的环境了。这让我惊喜,曾经我们手动搭建k8s,手动配置集群以及可以成为历史,我们需要的就是使用这些工具,重新把核心回归到业务上来。 

​ 正好新公司也是做电商业务的,于是我终于决定开始写一套云原生电商微服务的案例。一方面可以学习Asprise和Dapr的相关知识,也是一次属性电商核心业务的机会。

​ 这次的内容会写得比较细,目标是把这个项目当成一个培训项目来写。让不了解云原生、不了解微服务和不了解电商业务的人,也能看得懂该项目。

一、DDD

DDD相关的文章以及很多了,我在几年前也写过一个DDD的学习系列,本文就只是做个总结,毕竟微服务是离不开DDD的。

解决什么问题

  • 问题域
  • 需求分析
  • 分析理解复杂业务领域问题
  • 准确反映业务语言

领域分析概念

  • 领域
  • 子域
  • 核心域、通用域和支撑域
  • 限界上下文

领域建模概念

  • 实体与值对象
  • 聚合与聚合根
  • 领域事件
  • 领域服务
  • 仓储
  • 工作单元模式
  • 规约
  • 应用服务
  • 防腐层

领域驱动设计

CQRS: 命令与查询分离,作为一种战术办法,是实现DDD建模领域的最佳途径之一。

充血模型: 让模型自带业务逻辑,业务属性的改变只有模型自身可以操作。

二、整洁架构

核心原则

  1. 独立于框架:整洁架构的系统核心业务逻辑不依赖于具体的软件框架,业务逻辑部分都能够独立运行。这样在框架更新或者替换时,对核心业务的影响最小。
  2. 可测试性:架构设计使得业务规则可以很方便地被测试。因为业务逻辑是独立于外部组件(如数据库、用户界面等)的,所以可以使用单元测试来验证业务规则的正确性。比如,在一个电商系统中,“计算商品折扣”的业务规则可以通过提供模拟的商品价格数据来进行单元测试,而不需要真正地连接数据库或者启动整个用户界面。
  3. 独立于UI(用户界面):业务逻辑与用户界面相互独立。这意味着可以方便地替换用户界面,比如从一个命令行界面转换为图形界面,或者从Web界面转换为移动应用界面,而不会影响到业务逻辑。
  4. 独立于数据库:系统的核心不依赖于数据库的类型和实现。不管是使用关系型数据库(如MySQL)还是非关系型数据库(如MongoDB),业务逻辑部分都能保持稳定。

洋葱架构

依赖原则:上图的同心圆代表软件的不同部分。总的来说,越往里面,代码级别越高。外面的圆是实现机制,而内层的圆是原则。

Entities:Entities封装了企业级的业务规则。一个Entity可以是一个带方法的对象,也可以是一个数据结构和方法集。Entities可以被用于企业的其他应用。

Use Cases: 这一层包含了应用特有的业务规则。它封装和实现了系统的所有用例。这些用例协调数据从entities的流入和流出,并且指导entities使用它们的企业级业务规则来达到用例的目标。

三、项目框架

上图是本电商项目每个微服务实现使用的分层架构,在具体实现每个微服务前,先为项目打好地基,编写一些通用类库。

上图是目前搭好的电商项目整体架构,其中:

  • aspire:.net 8开始提供的开发工具,用于构建和运行云原生应用程序
  • gateways:网关项目
  • services:各个微服务的实现
  • app:存放非核心域服务
  • works:后台作业
  • pakages:项目需要用到独立的可以打包的nuget包,可以拿到其他项目中使用
  • shared:项目的共享类库,为实现各个微服务提供共享的基础类。

四、领域层模型规范

本项目使用.NET 9开发,现在我们先从shared开始。

  1. 在shared文件夹创建DDD.DHT.SharedKernel类库项目,修改默认命名空间为DDD.DHT

  2. 创建文件夹Domain,分别实现如下3个类

    IEntity.cs

    namespace DDM.DHT.Domain;
    
    public interface IEntity;
    
    public interface IEntity<TId> : IEntity
    {
        TId Id { get; set; }
    }
    

    IAggregateRoot.cs

    namespace DDM.DHT.Domain;
    
    public interface IAggregateRoot;
    

    BaseEntity.cs

    using System.ComponentModel.DataAnnotations;
    using System.ComponentModel.DataAnnotations.Schema;
    
    namespace DDM.DHT.Domain;
    
    public abstract class BaseEntity<TId> : IEntity<TId>
    {
        private readonly List<BaseEvent> _domainEvents = [];
    
        [NotMapped] 
        public IReadOnlyCollection<BaseEvent> DomainEvents => _domainEvents.AsReadOnly();
    
        [Key]
        public virtual TId Id { get; set; } = default!;
    
        public void AddDomainEvent(BaseEvent domainEvent)
        {
            _domainEvents.Add(domainEvent);
        }
    
        public void RemoveDomainEvent(BaseEvent domainEvent)
        {
            _domainEvents.Remove(domainEvent);
        }
    
        public void ClearDomainEvents()
        {
            _domainEvents.Clear();
        }
    }
    
  3. 在shared文件夹创建DDM.DHT.Core.Common类库项目,也修改默认命名空间为DDD.DHT。这个类库与SharedEvent的区别是,该类库实现主要是针对本项目的。而SharedEvent实际上可以做出nuget包供其他项目使用

  4. 在DDM.DHT.Core.Common项目中创建Domain文件夹,实现如下3个类:

    BaseEntity.cs

    namespace DDM.DHT.Domain;
    
    public abstract class BaseEntity : BaseEntity<long>;
    

    BaseAuditEntity.cs

    namespace DDM.DHT.Domain;
    
    public abstract class BaseAuditEntity : BaseEntity
    {
        public DateTime? CreatedAt { get; set; }
        public DateTime? LastModifiedAt { get; set; }
    }
    

    AuditWithUserEntity.cs

    namespace DDM.DHT.Domain; 
    
    public abstract class AuditWithUserEntity : BaseAuditEntity
    {
        public long? CreatedBy { get; set; }
        public long? LastModifiedBy { get; set; }
    }
    

五、通用仓储规范

仓储要重点说一下,按照DDD的要求,所有聚合的属性都是通过聚合根来操作的,而仓储是用来实现数据持久化的。因此根据DDD的要求,只有聚合根才会有仓储接口和具体实现。然而在具体开发中,这种约束会使得代码变得复杂,有些简单的,可能仅仅只有CRUD的实体,因为不是聚合根,没有自己的仓储实现,设计者可能被迫把这类实体也设计成聚合根了。

读过ABP源码的朋友应该清楚,ABP的仓储就不是直接约束的IAggregateRoot,而是改成了IEntity。这样完全放开,又存在未来开发者随便修改聚合属性,破坏聚合根的管辖职责。

结合这些情况,本项目对仓储基类的设计做了一些特别的约束。见代码分析:

  1. 在DDM.DHT.SharedKernel项目添加Repository文件夹,创建IReadRepository.cs

    public interface IReadRepository<T> where T : class, IEntity
    {
        Task<T?> GetByIdAsync<TKey>(TKey id, CancellationToken cancellationToken = default);
    
        Task<List<T>> GetListAsync(ISpecification<T>? specification = null, CancellationToken cancellationToken = default);
    
        Task<T?> GetSingleOrDefaultAsync(ISpecification<T>? specification = null,
            CancellationToken cancellationToken = default);
    
        Task<int> CountAsync(ISpecification<T>? specification = null, CancellationToken cancellationToken = default);
    
        Task<bool> AnyAsync(ISpecification<T>? specification = null, CancellationToken cancellationToken = default);
    }
    

    注意看,这个接口的泛型约束是IEntity,而接口方法只是定义了一些读方法。如果只是读数据,该仓储是不会破坏到聚合内部的数据的,因此约束可以放开到IEntity。

    另外ISpecification这个接口是一个规约接口,为仓储提供非id属性的条件查询实现。

    Specification.cs实现如下

    public abstract class Specification<T> : ISpecification<T> where T : class, IEntity
    {
        public Expression<Func<T, bool>>? FilterCondition { get; protected init; }
        public List<Expression<Func<T, object>>> Includes { get; } = [];
        public List<string> IncludeStrings { get; } = [];
        public Expression<Func<T, object>>? OrderBy { get; private set; }
        public Expression<Func<T, object>>? OrderByDescending { get; private set; }
        public Expression<Func<T, object>>? GroupBy { get; private set; }
        public int Take { get; private set; }
        public int Skip { get; private set; }
        public bool IsPagingEnabled { get; private set; }
    
        protected void AddInclude(Expression<Func<T, object>> includeExpression)
        {
            Includes.Add(includeExpression);
        }
    
        protected void AddInclude(string includeString)
        {
            IncludeStrings.Add(includeString);
        }
    
        protected void SetPaging(int skip, int take)
        {
            Skip = skip;
            Take = take;
            IsPagingEnabled = true;
        }
    
        protected void SetOrderBy(Expression<Func<T, object>> orderByExpression)
        {
            OrderBy = orderByExpression;
        }
    
        protected void SetOrderByDescending(Expression<Func<T, object>> orderByDescExpression)
        {
            OrderByDescending = orderByDescExpression;
        }
    
        protected void SetGroupBy(Expression<Func<T, object>> groupByExpression)
        {
            GroupBy = groupByExpression;
        }
    }
    
  2. 接着我们继续创建IRepository.cs类

    //该仓储操作的聚合根实体类型
    public interface IRepository<T> : IReadRepository<T> where T : class, IEntity, IAggregateRoot
    {
        T Add(T entity);
    
        void Update(T entity);
    
        void Delete(T entity);
    
        Task<int> BatchDeleteAsync(ISpecification<T>? specification = null, CancellationToken cancellationToken = default);
    
        Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
    }
    

    这个仓储接口继承IReadRepository,同时它的泛型约束添加了IAggregateRoot,也就是说它既拥有了能灵活查询实体对象的优势,又提供了仅聚合根能操作实体属性变更的约束。

  3. 我们继续添加一个IGenericRepository仓储,代码如下:

    //该仓储操作的通用实体类型
    public interface IGenericRepository<T> : IReadRepository<T> where T : class, IEntity
    {
        Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
    }
    

    这个接口很有意思,它继承IReadRepository仓储后,多了一个SaveChangesAsync方法。也就是说这个仓储可以把查询获取的实体对象属性变更后,重新持久化。

    再看这个仓储接口的约束是IEntity,也就是说它为那些简单的只需要做CRUD的实体提供了简单了仓储实现,而不依赖与聚合根。

  4. 在shared文件夹,创建DDM.DHT.Infrastructure.EFCore项目。添加Pomelo.EntityFrameworkCore.MySql 9.0.0引用,注意目前该版本是preview的版本。

  5. 创建Repositories文件夹,分别去实现IReadRepository、IRepository和IGenericRepository接口。代码我这类不贴了,可下载完整的项目文件来查看。

六、通用数据返回对象

一般我们在开发项目时,每个接口要返回数据类型是不一样的,比如有些接口返回的是整数、有的返回的是List等等,那么前端在解析不同的返回数据类型时就会很麻烦,为了解决这个问题,需要对返回结果进行统一的封装。

不仅仅是前端访问接口,即使是后端程序各个服务之间,或者各个层次之间都会有数据返回。

因此需要规范一个通用的数据返回对象。

  1. 在DDD.DHT.SharedKernel项目添加Return文件夹,创建统一的IResult接口和实现,代码如下。
public enum ResultStatus
{
    Ok,
    Error,
    Forbidden,
    Unauthorized,
    NotFound,
    Invalid
}

public interface IResult
{
    IEnumerable<string>? Errors { get; }

    bool IsSuccess { get; }

    ResultStatus Status { get; }

    object? GetValue();
}

public class Result<T> : IResult
{
    public Result() : this(default(T))
    {
    }

    protected internal Result(T? value)
    {
        Value = value;
    }

    protected internal Result(ResultStatus status)
    {
        Status = status;
    }

    public T? Value { get; init; }

    public bool IsSuccess => Status == ResultStatus.Ok;

    public IEnumerable<string>? Errors { get; protected set; }

    public ResultStatus Status { get; protected set; } = ResultStatus.Ok;

    public object? GetValue()
    {
        return Value;
    }

    public static implicit operator Result<T>(Result result)
    {
        return new Result<T>(default(T))
        {
            Status = result.Status,
            Errors = result.Errors
        };
    }
}

public class Result : Result<Result>
{
    public Result() : base(null)
    {
    }

    protected internal Result(Result value) : base(value)
    {
    }

    protected internal Result(ResultStatus status) : base(status)
    {
    }

    public static Result From(IResult result)
    {
        return new Result(result.Status)
        {
            Errors = result.Errors
        };
    }

    public static Result Success()
    {
        return new Result(ResultStatus.Ok);
    }

    public static Result<T> Success<T>(T value)
    {
        return new Result<T>(value);
    }

    public static Result Failure()
    {
        return new Result(ResultStatus.Error);
    }

    public static Result Failure(params string[] errors)
    {
        return new Result(ResultStatus.Error)
        {
            Errors = errors.AsEnumerable()
        };
    }

    public static Result NotFound()
    {
        return new Result(ResultStatus.NotFound);
    }

    public static Result NotFound(params string[] error)
    {
        return new Result(ResultStatus.NotFound)
        {
            Errors = error.AsEnumerable()
        };
    }

    public static Result Forbidden()
    {
        return new Result(ResultStatus.Forbidden);
    }

    public static Result Unauthorized()
    {
        return new Result(ResultStatus.Unauthorized);
    }

    public static Result Invalid()
    {
        return new Result(ResultStatus.Invalid);
    }

    public static Result Invalid(params string[] errors)
    {
        return new Result(ResultStatus.Invalid)
        {
            Errors = errors.AsEnumerable()
        };
    }
}
  1. 有了通用的泛型返回对象,我们再补偿一个通用的分页数据返回对象
public class PagedMetaData
{
    public int CurrentPage { get; set; }
    public int TotalPages { get; set; }
    public int PageSize { get; set; }
    public long TotalCount { get; set; }

    public bool HasPrevious => CurrentPage > 1;

    public bool HasNext => CurrentPage < TotalPages;
}

public class Pagination
{
    private const int MaxPageSize = 100;

    public int PageNumber { get; set; } = 1;

    private int _pageSize = 10;

    public int PageSize
    {
        get => _pageSize;
        set => _pageSize = value > MaxPageSize ? MaxPageSize : value;
    }
}

public class PagedList<T> : List<T>
{
    public PagedList(IEnumerable<T> items, long count, Pagination pagination)
    {
        MetaData = new PagedMetaData
        {
            TotalCount = count,
            PageSize = pagination.PageSize,
            CurrentPage = pagination.PageNumber,
            TotalPages = (int)Math.Ceiling(count / (double)pagination.PageSize)
        };

        AddRange(items);
    }

    public PagedMetaData MetaData { get; set; }
}
  1. 进一步,我们再为仓储编写一个分页对象的扩展方法。在DDM.DHT.Infrastructure.EFCore项目创建QueryableExtensions.cs文件
public static class QueryableExtensions
{
    public static async Task<PagedList<T>?> ToPageListAsync<T>(this IQueryable<T> queryable, Pagination pagination) where T : class
    {
        var count = queryable.Count();
        var items = await queryable
            .Skip((pagination.PageNumber - 1) * pagination.PageSize)
            .Take(pagination.PageSize)
            .ToListAsync();
        return items.Count == 0 ? null : new PagedList<T>(items, count, pagination);
    }
}

七、实现审计属性自动赋值

还记得我们在DDM.DHT.Core.Common项目中创建的3个抽象实体类吗?BaseEntity、BaseAuditEntity和AuditWithUserEntity。从这3个类中我们可以发现BaseEntity基类中我们把该项目的Id属性设置成了long类型,因为我们这个项目的默认Id就都用的long,所以AuditWithUserEntity中的创建和修改人Id也设置成了long。

现在有一个问题,在仓储层做数据持久化的时候能不能自动保存这些审计对象的值了,因为他们与业务无关,仅仅是审计时需要用到。我们不想在业务代码中去设置这些属性值。

答案肯定是可以做到的,现在我们来看一下如何实现。

在实现之前我们需要先定义一个IUser接口,用来保存系统的当前登录用户。而审计数据赋的值,就是IUser接口的用户Id。

在DDM.DHT.Core.Common项目中创建Interfaces文件夹,再创建IUser.cs文件,代码如下

public enum UserType
{
    User,
    Worker
}

public interface IUser
{
    long? Id { get; }

    string? UserName { get; }

    UserType UserType { get; }
}

UserType枚举值是用来区分操作是正常登录用户,还是后台作业用户。

接着在DDM.DHT.Infrastructure.EFCore项目创建Interceptors文件夹,创建AuditEntityInterceptor.cs类,代码如下:

public class AuditEntityInterceptor(IUser currentUser) : SaveChangesInterceptor
{
    public override InterceptionResult<int> SavingChanges(DbContextEventData eventData, InterceptionResult<int> result)
    {
        UpdateEntities(eventData.Context);

        return base.SavingChanges(eventData, result);
    }

    public override ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData,
        InterceptionResult<int> result, CancellationToken cancellationToken = default)
    {
        UpdateEntities(eventData.Context);

        return base.SavingChangesAsync(eventData, result, cancellationToken);
    }

    public void UpdateEntities(DbContext? context)
    {
        if (context == null) return;

        foreach (var entry in context.ChangeTracker.Entries<BaseAuditEntity>())
        {
            if (entry.State is not (EntityState.Added or EntityState.Modified)) continue;

            var now = DateTime.Now;

            if (entry.State == EntityState.Added)
            {
                entry.Entity.CreatedAt = now;
                entry.Entity.LastModifiedAt = now;
            }
            else
            {
                entry.Entity.LastModifiedAt = now;
            }
        }

        foreach (var entry in context.ChangeTracker.Entries<AuditWithUserEntity>())
        {
            if (entry.State is not (EntityState.Added or EntityState.Modified)) continue;

            if (currentUser.Id is null) continue;

            if (entry.State == EntityState.Added)
                entry.Entity.CreatedBy = currentUser.Id;
            else
                entry.Entity.LastModifiedBy = currentUser.Id;
        }
    }
}

SaveChangesInterceptor是EFCore提供的实体在SaveChange时的拦截器,我们可以提供一个基于该拦截器的实现,在实体类型是继承BaseAuditEntity时,自动保存当前时间属性。如果实现是继承AuditWithUserEntity,则自动保存当前用户为操作记录人。

八、实现命令查询职责模式

  1. 命令查询职责分离(CQRS,Command Query Responsibility Segregation)是一种架构模式,它将系统中的写操作读操作分离开来。CQRS 模式能够提升系统的可伸缩性、性能和可维护性,尤其适用于复杂的业务场景和高并发的系统。在传统的 CRUD(增、删、改、查)架构中,读写操作通常共享同一数据模型,而 CQRS 将这两者彻底分开,让它们有独立的模型、接口和存储方式。

CQRS 模式特别适合与事件溯源(Event Sourcing)一起使用,可以通过事件追溯系统的状态变化,确保数据的一致性。

该系统使用MediatR来提供CQRS的支持,我们先来看接口,在DDM.DHT.Core.Common项目创建Messaging,再创建4个类,代码如下:

//ICommand.cs
public interface ICommand<out TResponse> : IRequest<TResponse>;

//ICommandHandler.cs
public interface ICommandHandler<in TCommand, TResponse> : IRequestHandler<TCommand, TResponse> where TCommand : ICommand<TResponse>;

//IQuery.cs
public interface IQuery<out TResponse> : IRequest<TResponse>;

//IQueryHandler.cs
public interface IQueryHandler<in TQuery, TResponse> : IRequestHandler<TQuery, TResponse> where TQuery : IQuery<TResponse>;

MediatR中介者模式:这是一种旨在解耦对象之间通信的策略,MediatR是实现中介模式一种成熟的实现。我们只需要知道怎么使用它即可。从本质上讲,MediatR 在三种主要模式下运行:

  • Request:涉及具有服务响应的单个接收方。
  • Notification:在没有服务响应的情况下与多个接收方接合。
  • StreamRequest:利用单个接收器进行具有服务响应的流操作。

就该项目而言,我们主要关注Request行为,尤其是探索MediatR的管道。

  1. MediatR Pipeline

在中介请求流中,发布者(发送_操作_)和订阅者(处理程序)之间存在明显的区别。

通过利用 MediatR 管道,我们可以有效地拦截此流程,并将自定义逻辑引入流程。

要实现管道,需要从接口继承IPipelineBehavior。接下来我们添加一个命令日志收集器,在DDM.DHT.Infrastructure.EFCore项目Interceptors文件夹添加LoggingBehavior.cs:

public class LoggingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> where TRequest : ICommand<TResponse>
{
    private readonly ILogger<LoggingBehavior<TRequest, TResponse>> _logger;
    public LoggingBehavior(ILogger<LoggingBehavior<TRequest, TResponse>> logger) => _logger = logger;

    public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
    {
        _logger.LogInformation("----- Handling command {CommandName} ({@Command})", GetGenericTypeName(request!), request);
        var response = await next();
        _logger.LogInformation("----- Command {CommandName} handled - response: {@Response}", GetGenericTypeName(request!), response);

        return response;
    }
}

上面的代码所示,我们可以看到此方法允许在调用ICommand的管道的后续步骤之前和之后插入逻辑。假设这些命令的日志都被记录到CommandEvent的数据库,或者其他持久化存储中。我们后续时候可以对分析这些日志,实现事件溯源。

九、实现领域事件

我们再来看一下MediatR的一种模式,Notification模式,它可以与零个或多个接受放通信,这种模式特别适合做事件发布。不让用户实体更新了,同时发出一个用户更新已经更新的事件。至于该事件被多少其他程序订阅,那是以后的事,这样事件发布者和订阅者就实现了解耦。

现在我们回到DDM.DHT.SharedKernel项目,在Domain文件夹添加BaseEvent.cs类

public abstract class BaseEvent : INotification
{
    /// <summary>
    /// 发生日期
    /// </summary>
    public DateTime DateOccurred { get; protected set; } = DateTime.Now;
}

该类继承INotification接口,因此领域事件可以有零或者多个订阅者实现。

接着我们在DDM.DHT.Infrastructure.EFCore项目的Interceptors文件夹添加DispatchDomainEventsInterceptor.cs类

public class DispatchDomainEventsInterceptor(IPublisher publisher) : SaveChangesInterceptor
{
    public override int SavedChanges(SaveChangesCompletedEventData eventData, int result)
    {
        DispatchDomainEvents(eventData.Context).GetAwaiter().GetResult();
        return base.SavedChanges(eventData, result);
    }

    public override async ValueTask<int> SavedChangesAsync(SaveChangesCompletedEventData eventData, int result,
        CancellationToken cancellationToken = default)
    {
        await DispatchDomainEvents(eventData.Context);
        return await base.SavedChangesAsync(eventData, result, cancellationToken);
    }

    private async Task DispatchDomainEvents(DbContext? context)
    {
        if (context == null) return;

        var entities = context.ChangeTracker
            .Entries<BaseEntity>()
            .Where(e => e.Entity.DomainEvents.Any())
            .Select(e => e.Entity)
            .ToList();

        var domainEvents = entities
            .SelectMany(e => e.DomainEvents)
            .ToList();

        entities.ForEach(e => e.ClearDomainEvents());

        foreach (var domainEvent in domainEvents)
            await publisher.Publish(domainEvent);
    }
}

上面代码的核心语句就是最后那句 await publisher.Publish(domainEvent),publisher对象是通过依赖注入的IPublisher实例,这是一个MediatR框架提供的对象,它的功能就是发布事件。

这段代码同时继承SaveChangesInterceptor抽象类,因此在实体执行SavedChange时,可以确保同时执行了DispatchDomainEvents方法,实现工作单元模式。

十、实现集成事件

上一部分内容中,领域事件的实现中我们可以知道一个领域事件可以实现多个订阅者,并且同时实现了工作单元模式。看起来好像很厉害,但实际上受到EFCore的事务特性的限制,领域事件只能在一个DbContext上下文中实现事务。如果是跨微服务的事件订阅,怎么办呢?

分布式事务的实现并不是一件容易的事情,我们的项目通过DotNetCore.CAP库来实现。

首先重温一下CAP的概念:

CAP定理是分布式系统中的一个重要理论,主要由埃里克·布鲁尔在2000年提出,并在2002年由塞思·吉尔伯特南希·林奇正式证明。CAP定理指出,在一个分布式系统中,最多只能同时满足以下三项中的两项:

  • 一致性(Consistency):所有节点在同一时间看到相同的数据。
  • 可用性(Availability):每个请求都会在有限的时间内得到响应。
  • 分区容错性(Partition Tolerance):系统在网络分区的情况下仍能继续工作。 因此,设计者在面对网络分区时,必须在一致性和可用性之间做出选择,无法同时保证这三项特性。

DotNetCore CAP通常运用在分布式事务的场景,主要解决的是不同程序之间远程调用的事务一致性。

集成CAP的实现

  1. 在shared文件夹创建新的项目DDM.DHT.Infrastructure.CAP,需要实现CAP的微服务单独引用该项目。

  2. 创建EFDbContext类,实现开启事务的逻辑:

    public class EFDbContext(DbContextOptions options) : DbContext(options)
    {
        private IDbContextTransaction? _currentTransaction;
        public IDbContextTransaction? GetCurrentTransaction() => _currentTransaction;
        /// <summary>
        /// 事务是否开启
        /// </summary>
        public bool HasActiveTransaction => _currentTransaction != null;
    
        public Task<IDbContextTransaction> BeginTransactionAsync()
        {
            if (_currentTransaction != null) return null;
            
            _currentTransaction = Database.BeginTransaction();
            return Task.FromResult(_currentTransaction);
        }
    
        public async Task CommitTransactionAsync(IDbContextTransaction transaction)
        {
            if (transaction == null) throw new ArgumentNullException(nameof(transaction));
            if (transaction != _currentTransaction) throw new InvalidOperationException($"传入的事务{transaction.TransactionId}并不是当前事务");
    
            try
            {
                await SaveChangesAsync();
                transaction.Commit();
            }
            catch
            {
                RollbackTransaction();
                throw;
            }
            finally
            {
                if (_currentTransaction != null)
                {
                    _currentTransaction.Dispose();
                    _currentTransaction = null;
                }
            }
        }
    
        public void RollbackTransaction()
        {
            try
            {
                _currentTransaction?.Rollback();
            }
            finally
            {
                if (_currentTransaction != null)
                {
                    _currentTransaction.Dispose();
                    _currentTransaction = null;
                }
            }
        }
    }
    
  3. 创建TransactionBehavior.cs类,通过拦截器开启事务

    public class TransactionBehavior<TDbContext, TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> where TDbContext : EFDbContext where TRequest : ICommand<TResponse>
    {
        ILogger _logger;
        TDbContext _dbContext;
        ICapPublisher _capBus;
        public TransactionBehavior(TDbContext dbContext, ICapPublisher capBus, ILogger logger)
        {
            _dbContext = dbContext ?? throw new ArgumentNullException(nameof(dbContext));
            _capBus = capBus ?? throw new ArgumentNullException(nameof(capBus));
            _logger = logger ?? throw new ArgumentNullException(nameof(logger));
        }
    
    
        public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
        {
            var response = default(TResponse);
            var typeName = request.GetGenericTypeName();
    
            try
            {
                if (_dbContext.HasActiveTransaction)
                {
                    return await next();
                }
                //定义数据库操作执行的策略,比如可以在里面嵌入一些重试的逻辑
                var strategy = _dbContext.Database.CreateExecutionStrategy();
    
                await strategy.ExecuteAsync(async () =>
                {
                    using (var transaction = await _dbContext.BeginTransactionAsync(_capBus))
                    {
                        using (_logger.BeginScope("TransactionContext:{TransactionId}", transaction.TransactionId))
                        {
                            _logger.LogInformation("----- 开始事务 {TransactionId} {CommandName}({@Command})", transaction.TransactionId, typeName, request);
                            response = await next();
                            _logger.LogInformation("----- 提交事务 {TransactionId} {CommandName}", transaction.TransactionId, typeName);
    
                            await _dbContext.CommitTransactionAsync(transaction);
                            //Guid transactionId = transaction.TransactionId;
                        }
                    }
                });
                return response;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "处理事务出错 {CommandName} ({@Command})", typeName, request);
                throw;
            }
        }
    }
    
  4. 创建DependencyInjection.cs类,注入CAP实例和配置

    public static class DependencyInjection
    {
        public static IServiceCollection AddCAP<TMasterDbContext>(
            this IServiceCollection services, IConfiguration configuration)
            where TMasterDbContext : DbContext
        {
            var masterDbConn = configuration.GetConnectionString("MasterDb");
    
            services.AddCap(x =>
            {
                x.UseEntityFramework<TMasterDbContext>();
                x.UseMySql(masterDbConn!);
                x.UseRabbitMQ(options =>
                {
                    configuration.GetSection("RabbitMQ").Bind(options);
                });
                x.UseDashboard();
            });
    
            return services;
        }
    }
    

    要使用CAP还需要2个步骤,目前还在框架阶段,后面两个步骤在使用的时候具体介绍,下面内容只是先把后续3个步骤写出来。

  5. 在要使用CAP的微服务项目中,让微服务的DbContext对象继承EFDbContext类,用来继承事务逻辑。

  6. 创建新的拦截器,注入,如

    public class UserDbContextTransactionBehavior<TRequest, TResponse> : TransactionBehavior<UserDbContext, TRequest, TResponse> where TRequest : ICommand<TResponse>
    {
        public UserDbContextTransactionBehavior(UserDbContext dbContext, ICapPublisher capBus, ILogger<UserDbContextTransactionBehavior<TRequest, TResponse>> logger) : base(dbContext, capBus, logger)
        {
        }
    }
    

    这个类主要目的是把ICapPublisher注入进来

  7. 添加依赖注入

    services.AddTransient(typeof(IPipelineBehavior<,>), typeof(UserDbContextTransactionBehavior<,>));
    

ps:这里CAP是实现还依赖了RabbitMQ中间件,具体的使用效果以后遇到真实需求了再详细介绍。

到目前已经把领域层规范和EFCore基础设施搭建好了,还有一些其他基础设施层的通用封装,如Cache、Quartz、MessageBus等,这些也等以后再展开介绍。