云原生电商微服务实战1:搭建基础框架
2024-09-07 22:29:37 一直以来想专心写一个基于云原生的微服务案例,但总角色要写的内容太多了。电商案例本身并不会太难,而要搭建好一整套云原生和微服务的基础设施,并不是一项轻松的工作。随着.Net Asprise的推出,再集成Dapr,开发和生产几乎可以保持一样的环境了。这让我惊喜,曾经我们手动搭建k8s,手动配置集群以及可以成为历史,我们需要的就是使用这些工具,重新把核心回归到业务上来。
正好新公司也是做电商业务的,于是我终于决定开始写一套云原生电商微服务的案例。一方面可以学习Asprise和Dapr的相关知识,也是一次属性电商核心业务的机会。
这次的内容会写得比较细,目标是把这个项目当成一个培训项目来写。让不了解云原生、不了解微服务和不了解电商业务的人,也能看得懂该项目。
一、DDD
DDD相关的文章以及很多了,我在几年前也写过一个DDD的学习系列,本文就只是做个总结,毕竟微服务是离不开DDD的。
解决什么问题
- 问题域
- 需求分析
- 分析理解复杂业务领域问题
- 准确反映业务语言
领域分析概念
- 领域
- 子域
- 核心域、通用域和支撑域
- 限界上下文
领域建模概念
- 实体与值对象
- 聚合与聚合根
- 领域事件
- 领域服务
- 仓储
- 工作单元模式
- 规约
- 应用服务
- 防腐层
领域驱动设计
CQRS: 命令与查询分离,作为一种战术办法,是实现DDD建模领域的最佳途径之一。
充血模型: 让模型自带业务逻辑,业务属性的改变只有模型自身可以操作。
二、整洁架构
核心原则
- 独立于框架:整洁架构的系统核心业务逻辑不依赖于具体的软件框架,业务逻辑部分都能够独立运行。这样在框架更新或者替换时,对核心业务的影响最小。
- 可测试性:架构设计使得业务规则可以很方便地被测试。因为业务逻辑是独立于外部组件(如数据库、用户界面等)的,所以可以使用单元测试来验证业务规则的正确性。比如,在一个电商系统中,“计算商品折扣”的业务规则可以通过提供模拟的商品价格数据来进行单元测试,而不需要真正地连接数据库或者启动整个用户界面。
- 独立于UI(用户界面):业务逻辑与用户界面相互独立。这意味着可以方便地替换用户界面,比如从一个命令行界面转换为图形界面,或者从Web界面转换为移动应用界面,而不会影响到业务逻辑。
- 独立于数据库:系统的核心不依赖于数据库的类型和实现。不管是使用关系型数据库(如MySQL)还是非关系型数据库(如MongoDB),业务逻辑部分都能保持稳定。
洋葱架构
依赖原则:上图的同心圆代表软件的不同部分。总的来说,越往里面,代码级别越高。外面的圆是实现机制,而内层的圆是原则。
Entities:Entities封装了企业级的业务规则。一个Entity可以是一个带方法的对象,也可以是一个数据结构和方法集。Entities可以被用于企业的其他应用。
Use Cases: 这一层包含了应用特有的业务规则。它封装和实现了系统的所有用例。这些用例协调数据从entities的流入和流出,并且指导entities使用它们的企业级业务规则来达到用例的目标。
三、项目框架
上图是本电商项目每个微服务实现使用的分层架构,在具体实现每个微服务前,先为项目打好地基,编写一些通用类库。
上图是目前搭好的电商项目整体架构,其中:
- aspire:.net 8开始提供的开发工具,用于构建和运行云原生应用程序
- gateways:网关项目
- services:各个微服务的实现
- app:存放非核心域服务
- works:后台作业
- pakages:项目需要用到独立的可以打包的nuget包,可以拿到其他项目中使用
- shared:项目的共享类库,为实现各个微服务提供共享的基础类。
四、领域层模型规范
本项目使用.NET 9开发,现在我们先从shared开始。
-
在shared文件夹创建DDD.DHT.SharedKernel类库项目,修改默认命名空间为DDD.DHT
-
创建文件夹Domain,分别实现如下3个类
IEntity.cs
namespace DDM.DHT.Domain; public interface IEntity; public interface IEntity<TId> : IEntity { TId Id { get; set; } }
IAggregateRoot.cs
namespace DDM.DHT.Domain; public interface IAggregateRoot;
BaseEntity.cs
using System.ComponentModel.DataAnnotations; using System.ComponentModel.DataAnnotations.Schema; namespace DDM.DHT.Domain; public abstract class BaseEntity<TId> : IEntity<TId> { private readonly List<BaseEvent> _domainEvents = []; [NotMapped] public IReadOnlyCollection<BaseEvent> DomainEvents => _domainEvents.AsReadOnly(); [Key] public virtual TId Id { get; set; } = default!; public void AddDomainEvent(BaseEvent domainEvent) { _domainEvents.Add(domainEvent); } public void RemoveDomainEvent(BaseEvent domainEvent) { _domainEvents.Remove(domainEvent); } public void ClearDomainEvents() { _domainEvents.Clear(); } }
-
在shared文件夹创建DDM.DHT.Core.Common类库项目,也修改默认命名空间为DDD.DHT。这个类库与SharedEvent的区别是,该类库实现主要是针对本项目的。而SharedEvent实际上可以做出nuget包供其他项目使用
-
在DDM.DHT.Core.Common项目中创建Domain文件夹,实现如下3个类:
BaseEntity.cs
namespace DDM.DHT.Domain; public abstract class BaseEntity : BaseEntity<long>;
BaseAuditEntity.cs
namespace DDM.DHT.Domain; public abstract class BaseAuditEntity : BaseEntity { public DateTime? CreatedAt { get; set; } public DateTime? LastModifiedAt { get; set; } }
AuditWithUserEntity.cs
namespace DDM.DHT.Domain; public abstract class AuditWithUserEntity : BaseAuditEntity { public long? CreatedBy { get; set; } public long? LastModifiedBy { get; set; } }
五、通用仓储规范
仓储要重点说一下,按照DDD的要求,所有聚合的属性都是通过聚合根来操作的,而仓储是用来实现数据持久化的。因此根据DDD的要求,只有聚合根才会有仓储接口和具体实现。然而在具体开发中,这种约束会使得代码变得复杂,有些简单的,可能仅仅只有CRUD的实体,因为不是聚合根,没有自己的仓储实现,设计者可能被迫把这类实体也设计成聚合根了。
读过ABP源码的朋友应该清楚,ABP的仓储就不是直接约束的IAggregateRoot,而是改成了IEntity。这样完全放开,又存在未来开发者随便修改聚合属性,破坏聚合根的管辖职责。
结合这些情况,本项目对仓储基类的设计做了一些特别的约束。见代码分析:
-
在DDM.DHT.SharedKernel项目添加Repository文件夹,创建IReadRepository.cs
public interface IReadRepository<T> where T : class, IEntity { Task<T?> GetByIdAsync<TKey>(TKey id, CancellationToken cancellationToken = default); Task<List<T>> GetListAsync(ISpecification<T>? specification = null, CancellationToken cancellationToken = default); Task<T?> GetSingleOrDefaultAsync(ISpecification<T>? specification = null, CancellationToken cancellationToken = default); Task<int> CountAsync(ISpecification<T>? specification = null, CancellationToken cancellationToken = default); Task<bool> AnyAsync(ISpecification<T>? specification = null, CancellationToken cancellationToken = default); }
注意看,这个接口的泛型约束是IEntity,而接口方法只是定义了一些读方法。如果只是读数据,该仓储是不会破坏到聚合内部的数据的,因此约束可以放开到IEntity。
另外ISpecification这个接口是一个规约接口,为仓储提供非id属性的条件查询实现。
Specification.cs实现如下
public abstract class Specification<T> : ISpecification<T> where T : class, IEntity { public Expression<Func<T, bool>>? FilterCondition { get; protected init; } public List<Expression<Func<T, object>>> Includes { get; } = []; public List<string> IncludeStrings { get; } = []; public Expression<Func<T, object>>? OrderBy { get; private set; } public Expression<Func<T, object>>? OrderByDescending { get; private set; } public Expression<Func<T, object>>? GroupBy { get; private set; } public int Take { get; private set; } public int Skip { get; private set; } public bool IsPagingEnabled { get; private set; } protected void AddInclude(Expression<Func<T, object>> includeExpression) { Includes.Add(includeExpression); } protected void AddInclude(string includeString) { IncludeStrings.Add(includeString); } protected void SetPaging(int skip, int take) { Skip = skip; Take = take; IsPagingEnabled = true; } protected void SetOrderBy(Expression<Func<T, object>> orderByExpression) { OrderBy = orderByExpression; } protected void SetOrderByDescending(Expression<Func<T, object>> orderByDescExpression) { OrderByDescending = orderByDescExpression; } protected void SetGroupBy(Expression<Func<T, object>> groupByExpression) { GroupBy = groupByExpression; } }
-
接着我们继续创建IRepository.cs类
//该仓储操作的聚合根实体类型 public interface IRepository<T> : IReadRepository<T> where T : class, IEntity, IAggregateRoot { T Add(T entity); void Update(T entity); void Delete(T entity); Task<int> BatchDeleteAsync(ISpecification<T>? specification = null, CancellationToken cancellationToken = default); Task<int> SaveChangesAsync(CancellationToken cancellationToken = default); }
这个仓储接口继承IReadRepository,同时它的泛型约束添加了IAggregateRoot,也就是说它既拥有了能灵活查询实体对象的优势,又提供了仅聚合根能操作实体属性变更的约束。
-
我们继续添加一个IGenericRepository仓储,代码如下:
//该仓储操作的通用实体类型 public interface IGenericRepository<T> : IReadRepository<T> where T : class, IEntity { Task<int> SaveChangesAsync(CancellationToken cancellationToken = default); }
这个接口很有意思,它继承IReadRepository仓储后,多了一个SaveChangesAsync方法。也就是说这个仓储可以把查询获取的实体对象属性变更后,重新持久化。
再看这个仓储接口的约束是IEntity,也就是说它为那些简单的只需要做CRUD的实体提供了简单了仓储实现,而不依赖与聚合根。
-
在shared文件夹,创建DDM.DHT.Infrastructure.EFCore项目。添加Pomelo.EntityFrameworkCore.MySql 9.0.0引用,注意目前该版本是preview的版本。
-
创建Repositories文件夹,分别去实现IReadRepository、IRepository和IGenericRepository接口。代码我这类不贴了,可下载完整的项目文件来查看。
六、通用数据返回对象
一般我们在开发项目时,每个接口要返回数据类型是不一样的,比如有些接口返回的是整数、有的返回的是List等等,那么前端在解析不同的返回数据类型时就会很麻烦,为了解决这个问题,需要对返回结果进行统一的封装。
不仅仅是前端访问接口,即使是后端程序各个服务之间,或者各个层次之间都会有数据返回。
因此需要规范一个通用的数据返回对象。
- 在DDD.DHT.SharedKernel项目添加Return文件夹,创建统一的IResult接口和实现,代码如下。
public enum ResultStatus
{
Ok,
Error,
Forbidden,
Unauthorized,
NotFound,
Invalid
}
public interface IResult
{
IEnumerable<string>? Errors { get; }
bool IsSuccess { get; }
ResultStatus Status { get; }
object? GetValue();
}
public class Result<T> : IResult
{
public Result() : this(default(T))
{
}
protected internal Result(T? value)
{
Value = value;
}
protected internal Result(ResultStatus status)
{
Status = status;
}
public T? Value { get; init; }
public bool IsSuccess => Status == ResultStatus.Ok;
public IEnumerable<string>? Errors { get; protected set; }
public ResultStatus Status { get; protected set; } = ResultStatus.Ok;
public object? GetValue()
{
return Value;
}
public static implicit operator Result<T>(Result result)
{
return new Result<T>(default(T))
{
Status = result.Status,
Errors = result.Errors
};
}
}
public class Result : Result<Result>
{
public Result() : base(null)
{
}
protected internal Result(Result value) : base(value)
{
}
protected internal Result(ResultStatus status) : base(status)
{
}
public static Result From(IResult result)
{
return new Result(result.Status)
{
Errors = result.Errors
};
}
public static Result Success()
{
return new Result(ResultStatus.Ok);
}
public static Result<T> Success<T>(T value)
{
return new Result<T>(value);
}
public static Result Failure()
{
return new Result(ResultStatus.Error);
}
public static Result Failure(params string[] errors)
{
return new Result(ResultStatus.Error)
{
Errors = errors.AsEnumerable()
};
}
public static Result NotFound()
{
return new Result(ResultStatus.NotFound);
}
public static Result NotFound(params string[] error)
{
return new Result(ResultStatus.NotFound)
{
Errors = error.AsEnumerable()
};
}
public static Result Forbidden()
{
return new Result(ResultStatus.Forbidden);
}
public static Result Unauthorized()
{
return new Result(ResultStatus.Unauthorized);
}
public static Result Invalid()
{
return new Result(ResultStatus.Invalid);
}
public static Result Invalid(params string[] errors)
{
return new Result(ResultStatus.Invalid)
{
Errors = errors.AsEnumerable()
};
}
}
- 有了通用的泛型返回对象,我们再补偿一个通用的分页数据返回对象
public class PagedMetaData
{
public int CurrentPage { get; set; }
public int TotalPages { get; set; }
public int PageSize { get; set; }
public long TotalCount { get; set; }
public bool HasPrevious => CurrentPage > 1;
public bool HasNext => CurrentPage < TotalPages;
}
public class Pagination
{
private const int MaxPageSize = 100;
public int PageNumber { get; set; } = 1;
private int _pageSize = 10;
public int PageSize
{
get => _pageSize;
set => _pageSize = value > MaxPageSize ? MaxPageSize : value;
}
}
public class PagedList<T> : List<T>
{
public PagedList(IEnumerable<T> items, long count, Pagination pagination)
{
MetaData = new PagedMetaData
{
TotalCount = count,
PageSize = pagination.PageSize,
CurrentPage = pagination.PageNumber,
TotalPages = (int)Math.Ceiling(count / (double)pagination.PageSize)
};
AddRange(items);
}
public PagedMetaData MetaData { get; set; }
}
- 进一步,我们再为仓储编写一个分页对象的扩展方法。在DDM.DHT.Infrastructure.EFCore项目创建QueryableExtensions.cs文件
public static class QueryableExtensions
{
public static async Task<PagedList<T>?> ToPageListAsync<T>(this IQueryable<T> queryable, Pagination pagination) where T : class
{
var count = queryable.Count();
var items = await queryable
.Skip((pagination.PageNumber - 1) * pagination.PageSize)
.Take(pagination.PageSize)
.ToListAsync();
return items.Count == 0 ? null : new PagedList<T>(items, count, pagination);
}
}
七、实现审计属性自动赋值
还记得我们在DDM.DHT.Core.Common项目中创建的3个抽象实体类吗?BaseEntity、BaseAuditEntity和AuditWithUserEntity。从这3个类中我们可以发现BaseEntity基类中我们把该项目的Id属性设置成了long类型,因为我们这个项目的默认Id就都用的long,所以AuditWithUserEntity中的创建和修改人Id也设置成了long。
现在有一个问题,在仓储层做数据持久化的时候能不能自动保存这些审计对象的值了,因为他们与业务无关,仅仅是审计时需要用到。我们不想在业务代码中去设置这些属性值。
答案肯定是可以做到的,现在我们来看一下如何实现。
在实现之前我们需要先定义一个IUser接口,用来保存系统的当前登录用户。而审计数据赋的值,就是IUser接口的用户Id。
在DDM.DHT.Core.Common项目中创建Interfaces文件夹,再创建IUser.cs文件,代码如下
public enum UserType
{
User,
Worker
}
public interface IUser
{
long? Id { get; }
string? UserName { get; }
UserType UserType { get; }
}
UserType枚举值是用来区分操作是正常登录用户,还是后台作业用户。
接着在DDM.DHT.Infrastructure.EFCore项目创建Interceptors文件夹,创建AuditEntityInterceptor.cs类,代码如下:
public class AuditEntityInterceptor(IUser currentUser) : SaveChangesInterceptor
{
public override InterceptionResult<int> SavingChanges(DbContextEventData eventData, InterceptionResult<int> result)
{
UpdateEntities(eventData.Context);
return base.SavingChanges(eventData, result);
}
public override ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData,
InterceptionResult<int> result, CancellationToken cancellationToken = default)
{
UpdateEntities(eventData.Context);
return base.SavingChangesAsync(eventData, result, cancellationToken);
}
public void UpdateEntities(DbContext? context)
{
if (context == null) return;
foreach (var entry in context.ChangeTracker.Entries<BaseAuditEntity>())
{
if (entry.State is not (EntityState.Added or EntityState.Modified)) continue;
var now = DateTime.Now;
if (entry.State == EntityState.Added)
{
entry.Entity.CreatedAt = now;
entry.Entity.LastModifiedAt = now;
}
else
{
entry.Entity.LastModifiedAt = now;
}
}
foreach (var entry in context.ChangeTracker.Entries<AuditWithUserEntity>())
{
if (entry.State is not (EntityState.Added or EntityState.Modified)) continue;
if (currentUser.Id is null) continue;
if (entry.State == EntityState.Added)
entry.Entity.CreatedBy = currentUser.Id;
else
entry.Entity.LastModifiedBy = currentUser.Id;
}
}
}
SaveChangesInterceptor是EFCore提供的实体在SaveChange时的拦截器,我们可以提供一个基于该拦截器的实现,在实体类型是继承BaseAuditEntity时,自动保存当前时间属性。如果实现是继承AuditWithUserEntity,则自动保存当前用户为操作记录人。
八、实现命令查询职责模式
- 命令查询职责分离(CQRS,Command Query Responsibility Segregation)是一种架构模式,它将系统中的写操作与读操作分离开来。CQRS 模式能够提升系统的可伸缩性、性能和可维护性,尤其适用于复杂的业务场景和高并发的系统。在传统的 CRUD(增、删、改、查)架构中,读写操作通常共享同一数据模型,而 CQRS 将这两者彻底分开,让它们有独立的模型、接口和存储方式。
CQRS 模式特别适合与事件溯源(Event Sourcing)一起使用,可以通过事件追溯系统的状态变化,确保数据的一致性。
该系统使用MediatR来提供CQRS的支持,我们先来看接口,在DDM.DHT.Core.Common项目创建Messaging,再创建4个类,代码如下:
//ICommand.cs
public interface ICommand<out TResponse> : IRequest<TResponse>;
//ICommandHandler.cs
public interface ICommandHandler<in TCommand, TResponse> : IRequestHandler<TCommand, TResponse> where TCommand : ICommand<TResponse>;
//IQuery.cs
public interface IQuery<out TResponse> : IRequest<TResponse>;
//IQueryHandler.cs
public interface IQueryHandler<in TQuery, TResponse> : IRequestHandler<TQuery, TResponse> where TQuery : IQuery<TResponse>;
MediatR中介者模式:这是一种旨在解耦对象之间通信的策略,MediatR是实现中介模式一种成熟的实现。我们只需要知道怎么使用它即可。从本质上讲,MediatR 在三种主要模式下运行:
- Request:涉及具有服务响应的单个接收方。
- Notification:在没有服务响应的情况下与多个接收方接合。
- StreamRequest:利用单个接收器进行具有服务响应的流操作。
就该项目而言,我们主要关注Request行为,尤其是探索MediatR的管道。
- MediatR Pipeline
在中介请求流中,发布者(发送_操作_)和订阅者(处理程序)之间存在明显的区别。
通过利用 MediatR 管道,我们可以有效地拦截此流程,并将自定义逻辑引入流程。
要实现管道,需要从接口继承IPipelineBehavior。接下来我们添加一个命令日志收集器,在DDM.DHT.Infrastructure.EFCore项目Interceptors文件夹添加LoggingBehavior.cs:
public class LoggingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> where TRequest : ICommand<TResponse>
{
private readonly ILogger<LoggingBehavior<TRequest, TResponse>> _logger;
public LoggingBehavior(ILogger<LoggingBehavior<TRequest, TResponse>> logger) => _logger = logger;
public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
{
_logger.LogInformation("----- Handling command {CommandName} ({@Command})", GetGenericTypeName(request!), request);
var response = await next();
_logger.LogInformation("----- Command {CommandName} handled - response: {@Response}", GetGenericTypeName(request!), response);
return response;
}
}
上面的代码所示,我们可以看到此方法允许在调用ICommand
九、实现领域事件
我们再来看一下MediatR的一种模式,Notification模式,它可以与零个或多个接受放通信,这种模式特别适合做事件发布。不让用户实体更新了,同时发出一个用户更新已经更新的事件。至于该事件被多少其他程序订阅,那是以后的事,这样事件发布者和订阅者就实现了解耦。
现在我们回到DDM.DHT.SharedKernel项目,在Domain文件夹添加BaseEvent.cs类
public abstract class BaseEvent : INotification
{
/// <summary>
/// 发生日期
/// </summary>
public DateTime DateOccurred { get; protected set; } = DateTime.Now;
}
该类继承INotification接口,因此领域事件可以有零或者多个订阅者实现。
接着我们在DDM.DHT.Infrastructure.EFCore项目的Interceptors文件夹添加DispatchDomainEventsInterceptor.cs类
public class DispatchDomainEventsInterceptor(IPublisher publisher) : SaveChangesInterceptor
{
public override int SavedChanges(SaveChangesCompletedEventData eventData, int result)
{
DispatchDomainEvents(eventData.Context).GetAwaiter().GetResult();
return base.SavedChanges(eventData, result);
}
public override async ValueTask<int> SavedChangesAsync(SaveChangesCompletedEventData eventData, int result,
CancellationToken cancellationToken = default)
{
await DispatchDomainEvents(eventData.Context);
return await base.SavedChangesAsync(eventData, result, cancellationToken);
}
private async Task DispatchDomainEvents(DbContext? context)
{
if (context == null) return;
var entities = context.ChangeTracker
.Entries<BaseEntity>()
.Where(e => e.Entity.DomainEvents.Any())
.Select(e => e.Entity)
.ToList();
var domainEvents = entities
.SelectMany(e => e.DomainEvents)
.ToList();
entities.ForEach(e => e.ClearDomainEvents());
foreach (var domainEvent in domainEvents)
await publisher.Publish(domainEvent);
}
}
上面代码的核心语句就是最后那句 await publisher.Publish(domainEvent)
,publisher对象是通过依赖注入的IPublisher实例,这是一个MediatR框架提供的对象,它的功能就是发布事件。
这段代码同时继承SaveChangesInterceptor抽象类,因此在实体执行SavedChange时,可以确保同时执行了DispatchDomainEvents方法,实现工作单元模式。
十、实现集成事件
上一部分内容中,领域事件的实现中我们可以知道一个领域事件可以实现多个订阅者,并且同时实现了工作单元模式。看起来好像很厉害,但实际上受到EFCore的事务特性的限制,领域事件只能在一个DbContext上下文中实现事务。如果是跨微服务的事件订阅,怎么办呢?
分布式事务的实现并不是一件容易的事情,我们的项目通过DotNetCore.CAP库来实现。
首先重温一下CAP的概念:
CAP定理是分布式系统中的一个重要理论,主要由埃里克·布鲁尔在2000年提出,并在2002年由塞思·吉尔伯特和南希·林奇正式证明。CAP定理指出,在一个分布式系统中,最多只能同时满足以下三项中的两项:
- 一致性(Consistency):所有节点在同一时间看到相同的数据。
- 可用性(Availability):每个请求都会在有限的时间内得到响应。
- 分区容错性(Partition Tolerance):系统在网络分区的情况下仍能继续工作。 因此,设计者在面对网络分区时,必须在一致性和可用性之间做出选择,无法同时保证这三项特性。
DotNetCore CAP通常运用在分布式事务的场景,主要解决的是不同程序之间远程调用的事务一致性。
集成CAP的实现:
-
在shared文件夹创建新的项目DDM.DHT.Infrastructure.CAP,需要实现CAP的微服务单独引用该项目。
-
创建EFDbContext类,实现开启事务的逻辑:
public class EFDbContext(DbContextOptions options) : DbContext(options) { private IDbContextTransaction? _currentTransaction; public IDbContextTransaction? GetCurrentTransaction() => _currentTransaction; /// <summary> /// 事务是否开启 /// </summary> public bool HasActiveTransaction => _currentTransaction != null; public Task<IDbContextTransaction> BeginTransactionAsync() { if (_currentTransaction != null) return null; _currentTransaction = Database.BeginTransaction(); return Task.FromResult(_currentTransaction); } public async Task CommitTransactionAsync(IDbContextTransaction transaction) { if (transaction == null) throw new ArgumentNullException(nameof(transaction)); if (transaction != _currentTransaction) throw new InvalidOperationException($"传入的事务{transaction.TransactionId}并不是当前事务"); try { await SaveChangesAsync(); transaction.Commit(); } catch { RollbackTransaction(); throw; } finally { if (_currentTransaction != null) { _currentTransaction.Dispose(); _currentTransaction = null; } } } public void RollbackTransaction() { try { _currentTransaction?.Rollback(); } finally { if (_currentTransaction != null) { _currentTransaction.Dispose(); _currentTransaction = null; } } } }
-
创建TransactionBehavior.cs类,通过拦截器开启事务
public class TransactionBehavior<TDbContext, TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> where TDbContext : EFDbContext where TRequest : ICommand<TResponse> { ILogger _logger; TDbContext _dbContext; ICapPublisher _capBus; public TransactionBehavior(TDbContext dbContext, ICapPublisher capBus, ILogger logger) { _dbContext = dbContext ?? throw new ArgumentNullException(nameof(dbContext)); _capBus = capBus ?? throw new ArgumentNullException(nameof(capBus)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken) { var response = default(TResponse); var typeName = request.GetGenericTypeName(); try { if (_dbContext.HasActiveTransaction) { return await next(); } //定义数据库操作执行的策略,比如可以在里面嵌入一些重试的逻辑 var strategy = _dbContext.Database.CreateExecutionStrategy(); await strategy.ExecuteAsync(async () => { using (var transaction = await _dbContext.BeginTransactionAsync(_capBus)) { using (_logger.BeginScope("TransactionContext:{TransactionId}", transaction.TransactionId)) { _logger.LogInformation("----- 开始事务 {TransactionId} {CommandName}({@Command})", transaction.TransactionId, typeName, request); response = await next(); _logger.LogInformation("----- 提交事务 {TransactionId} {CommandName}", transaction.TransactionId, typeName); await _dbContext.CommitTransactionAsync(transaction); //Guid transactionId = transaction.TransactionId; } } }); return response; } catch (Exception ex) { _logger.LogError(ex, "处理事务出错 {CommandName} ({@Command})", typeName, request); throw; } } }
-
创建DependencyInjection.cs类,注入CAP实例和配置
public static class DependencyInjection { public static IServiceCollection AddCAP<TMasterDbContext>( this IServiceCollection services, IConfiguration configuration) where TMasterDbContext : DbContext { var masterDbConn = configuration.GetConnectionString("MasterDb"); services.AddCap(x => { x.UseEntityFramework<TMasterDbContext>(); x.UseMySql(masterDbConn!); x.UseRabbitMQ(options => { configuration.GetSection("RabbitMQ").Bind(options); }); x.UseDashboard(); }); return services; } }
要使用CAP还需要2个步骤,目前还在框架阶段,后面两个步骤在使用的时候具体介绍,下面内容只是先把后续3个步骤写出来。
-
在要使用CAP的微服务项目中,让微服务的DbContext对象继承EFDbContext类,用来继承事务逻辑。
-
创建新的拦截器,注入,如
public class UserDbContextTransactionBehavior<TRequest, TResponse> : TransactionBehavior<UserDbContext, TRequest, TResponse> where TRequest : ICommand<TResponse> { public UserDbContextTransactionBehavior(UserDbContext dbContext, ICapPublisher capBus, ILogger<UserDbContextTransactionBehavior<TRequest, TResponse>> logger) : base(dbContext, capBus, logger) { } }
这个类主要目的是把ICapPublisher注入进来
-
添加依赖注入
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(UserDbContextTransactionBehavior<,>));
ps:这里CAP是实现还依赖了RabbitMQ中间件,具体的使用效果以后遇到真实需求了再详细介绍。
到目前已经把领域层规范和EFCore基础设施搭建好了,还有一些其他基础设施层的通用封装,如Cache、Quartz、MessageBus等,这些也等以后再展开介绍。