上篇中说到了面临的问题(传送门:DDD设计中的Unitwork与DomainEvent如何相容?),和当时实现的一个解决方案。在实际使用了几天后,有了新的思路,和@trunks 兄提出的观点类似。下面且听我娓娓道来。
一、回顾
先回顾一下,代码中的核心类。
DomainEventConsistentQueue : 用于把多个领域事件放到一个集合中,批量进行实际的发布操作。
SqlServerUnitOfWork : 基于SQL SERVER的工作单元实现。
上篇最终的编码效果。
var aggregateA = new AggregateRootA(); //从仓储中获取
var aggregateB = new AggregateRootB(); //从仓储中获取
using (var queue = DomainEventConsistentQueue.Current())
{
using (var unitwork = new SqlServerUnitOfWork(GlobalConfig.DBConnectString))
{
aggregateA.Event();
unitwork.RegisterModfied(aggregateA);
aggregateB.Event();
unitwork.RegisterModfied(aggregateA);
var isSuccess = unitwork.Commit();
if (isSuccess)
queue.PublishEvents();
}
}
public class AggregateRootA : AggregateRoot
{
public void Event()
{
DomainEventConsistentQueue.Current().RegisterEvent(new DomainEventA());
}
}
public class AggregateRootB : AggregateRoot
{
public void Event()
{
DomainEventConsistentQueue.Current().RegisterEvent(new DomainEventB());
}
}
public class DomainEventA : IDomainEvent
{
}
public class DomainEventB : IDomainEvent
{
}
二、问题
1.其中4、5、14、15行的代码显得与整个上下文格格不入,此处是应用层中的一个跨多个聚合根的业务处理操作。
对于编码业务逻辑的人来说,其实没有必要去管理整个领域事件如何发布,因为领域事件本身表达的就是已经发生的事情,所以概念上是在数据已经完成修改后给我成功发布出去就行。
那么此处标记出的代码显得有点多余,因为这里需要编码人员去管理领域事件的发布。
2.其中23、31行的代码的副作用很大,导致所有调用此方法发布的领域事件都得通过一致性队列进行批量发布。
哪怕是单个聚合根的操作,也都得在外层加个 using (var queue = DomainEventConsistentQueue.Current())。
这样的方式与常规的DomainEventBus.Instance().Publish方式产生了差异,让编码业务代码的人多了一份职责,去决定此处加不加using (var queue = DomainEventConsistentQueue.Current())。
三、解决方案
此时我想到的方案是,把工作单元的生命周期提炼出来作为执行上下文中的一个概念。这样可以使用类似Thread.CurrentThread这样的方式来在任何地方获取到当前的工作单元。有了这个可以做2件事:
①根据当前是否处于工作单元的环境中来处理领域事件的发布方式。这样可以隐藏起直接发布还是通过DomainEventConsistentQueue来发布的逻辑。
②在工作单元中抛出必要的事件,如(提交事件、回滚事件),通过注册其事件来关联DomainEventConsistentQueue的发布操作。
四、进行改造
1.先定义一个执行上下文。
public class ExcutingContext
{
private static readonly ThreadLocal<IUnitOfWork> _unitWork = new ThreadLocal<IUnitOfWork>();
public static UnitOfWork UseSqlServerUnitOfWork(string dbConnectString)
{
if (_unitWork.Value != null)
throw new ApplicationException("当前线程已经启动了一个工作单元");
var unitWork = new SqlServerUnitOfWork(dbConnectString);
_unitWork.Value = unitWork;
unitWork.CommittedEvent += CommittedEventHandle;
unitWork.RollBackEvent += RollBackEventHandle;
return unitWork;
}
public static UnitOfWork GetCurrentUnitOfWork()
{
return _unitWork.Value as UnitOfWork;
}
private static void CommittedEventHandle(bool isSuccess)
{
_unitWork.Value = null;
}
private static void RollBackEventHandle()
{
_unitWork.Value = null;
}
}
2.改造DomainEventBus的发布方法
public void Publish<T>(T aDomainEvent) where T : IDomainEvent
{
if (aDomainEvent.IsRead)
return;
var unitOfWork = ExcutingContext.GetCurrentUnitOfWork();
if (unitOfWork != null) //工作单元环境
{
var domainEventConsistentQueue = DomainEventConsistentQueue.Current();
if (domainEventConsistentQueue.IsEmpty())
{
unitOfWork.CommittedEvent += AutoPublishDomainEventConsistentQueue;
unitOfWork.RollBackEvent += domainEventConsistentQueue.Dispose;
}
domainEventConsistentQueue.RegisterEvent(aDomainEvent);
return;
}
var registeredSubscribers = _subscribers;
if (registeredSubscribers != null)
{
var domainEventType = aDomainEvent.GetType();
List<IDomainEventSubscriber> subscribers;
if (!registeredSubscribers.TryGetValue(domainEventType, out subscribers))
{
aDomainEvent.Read(); //未找到订阅者,但是消息还是消费掉。
return;
}
foreach (var domainEventSubscriber in subscribers)
{
var subscribedTo = domainEventSubscriber.SubscribedToEventType();
if (subscribedTo == domainEventType || subscribedTo is IDomainEvent)
{
Distribute(domainEventSubscriber, aDomainEvent);
}
}
aDomainEvent.Read();
}
}
private void AutoPublishDomainEventConsistentQueue(bool isSuccess)
{
if (isSuccess)
DomainEventConsistentQueue.Current().PublishEvents();
}
这里有一点要说明一下,因为这里的2个注册CommittedEvent的事件,AutoPublishDomainEventConsistentQueue的注册在CommittedEventHandle之后,所以当DomainEventConsistentQueue中调用Publish方法时ExcutingContext.GetCurrentUnitOfWork()已经获取到null了,就会进入到实际的发布操作。
五、使用方式
var aggregateA = new AggregateRootA(); //从仓储中获取
var aggregateB = new AggregateRootB(); //从仓储中获取
using (var unitwork = new SqlServerUnitOfWork(GlobalConfig.DBConnectString))
{
aggregateA.Event();
unitwork.RegisterModfied(aggregateA);
aggregateB.Event();
unitwork.RegisterModfied(aggregateA);
var isSuccess = unitwork.Commit();
}
public class AggregateRootA : AggregateRoot
{
public void Event()
{
DomainEventBus.Instance().Publish(new DomainEventA());
}
}
public class AggregateRootB : AggregateRoot
{
public void Event()
{
DomainEventBus.Instance().Publish(new DomainEventB());
}
}
public class DomainEventA : IDomainEvent
{
}
public class DomainEventB : IDomainEvent
{
}
这样代码又精简了些,并且隐藏了领域事件的实际发布过程,业务编码时无需关注领域事件是如何发布的。
欢迎大家继续探讨~
原创文章,转载请注明本文链接: https://zacharyfan.com/archives/81.html
关于作者:张帆(Zachary,个人微信号:Zachary-ZF)。坚持用心打磨每一篇高质量原创。欢迎扫描二维码~
定期发表原创内容:架构设计丨分布式系统丨产品丨运营丨一些思考。
如果你是初级程序员,想提升但不知道如何下手。又或者做程序员多年,陷入了一些瓶颈想拓宽一下视野。欢迎关注我的公众号「跨界架构师」,回复「技术」,送你一份我长期收集和整理的思维导图。
如果你是运营,面对不断变化的市场束手无策。又或者想了解主流的运营策略,以丰富自己的“仓库”。欢迎关注我的公众号「跨界架构师」,回复「运营」,送你一份我长期收集和整理的思维导图。