最近在开发过程中,遇到了一个场景,甚是棘手,在这里分享一下。希望大家脑洞大开一起来想一下解决思路。鄙人也想了一个方案拿出来和大家一起探讨一下是否合理。
一、简单介绍一下涉及的对象概念
工作单元:维护变化的对象列表,在整块业务逻辑处理完全之后一次性写入到数据库中。
领域事件:领域对象本身发生某些变化时,发布的通知事件,告诉订阅者处理相关流程。
二、问题来了
我认为最合理的领域事件的触发点应该设计在领域对象内部,那么问题来了。
当这个领域对象发生变化的上下文是一个复杂的业务场景,整个流程中会涉及到多个领域对象,所以需要通过工作单元来保证数据写入的一致性。
此时其中各个产生变化的领域对象的领域事件如果实时被发布出去,那么当工作单元在最终提交到数据库时,如果产生了回滚,那么会导致发布了错误的领域事件,产生未知的后果。
三、问题分析
我能够想到的方案是,这里领域事件的发布也通过一个类似于工作单元一样的概念进行持续的管理,在领域对象中的发布只是做一个记录,只有在工作单元提交成功之后,才实际发布其中所有的领域事件。
四、说干就干
public class DomainEventConsistentQueue : IDisposable
{
private readonly List<IDomainEvent> _domainEvents = new List<IDomainEvent>();
private bool _publishing = false;
public void RegisterEvent(IDomainEvent domainEvent)
{
if (_publishing)
{
throw new ApplicationException("当前事件一致性队列已被发布,无法添加新的事件!");
}
if (_domainEvents.Any(ent => ent == domainEvent)) //防止相同事件被重复添加
return;
_domainEvents.Add(domainEvent);
}
public void Clear()
{
_domainEvents.Clear();
_publishing = false;
}
public void PublishEvents()
{
if (_publishing)
{
return;
}
if (_domainEvents == null)
return;
try
{
_publishing = true;
foreach (var domainEvent in _domainEvents)
{
DomainEventBus.Instance().Publish(domainEvent);
}
}
finally
{
Clear();
}
}
public void Dispose()
{
Clear();
}
}
使用方式:
var aggregateA = new AggregateRootA();
var aggregateB = new AggregateRootB();
using (var queue = new DomainEventConsistentQueue())
{
using (var unitwork = new SqlServerUnitOfWork(GlobalConfig.DBConnectString))
{
aggregateA.Event(queue);
aggregateB.Event(queue);
var isSuccess = unitwork.Commit();
if (isSuccess)
queue.PublishEvents();
}
}
public class AggregateRootA : AggregateRoot
{
public void Event(DomainEventConsistentQueue queue)
{
queue.RegisterEvent(new DomainEventA());
}
}
public class AggregateRootB : AggregateRoot
{
public void Event(DomainEventConsistentQueue queue)
{
queue.RegisterEvent(new DomainEventB());
}
}
public class DomainEventA : IDomainEvent
{
public DateTime OccurredOn()
{
throw new NotImplementedException();
}
public void Read()
{
throw new NotImplementedException();
}
public bool IsRead
{
get { throw new NotImplementedException(); }
}
}
public class DomainEventB : IDomainEvent
{
public DateTime OccurredOn()
{
throw new NotImplementedException();
}
public void Read()
{
throw new NotImplementedException();
}
public bool IsRead
{
get { throw new NotImplementedException(); }
}
}
问题是解决了,但是第8、9行代码看着特别变扭,在产生领域事件的领域对象方法上需要增加一个与表达的业务无关的参数,这个大大破坏了DDD设计的初衷——统一语言(Ubiquitous Language),简洁明了的表达出每个业务行为,业务交流应与代码保持一致。
像这2行表达起来如“AggregateRootA Event DomainEventConsistentQueue”这个 DomainEventConsistentQueue其实并不是领域对象,所以其并不是领域的一部分。
五、陷入思考
这里突然想到,如果在运行中的每个线程的共享区域存储待发布的领域事件集合,那么不就可以随时随地的管理当前操作上下文中的领域事件了吗?这里需要引入ThreadLocal<T> 类。MSDN的解释参见https://msdn.microsoft.com/zh-cn/library/dd642243(v=vs.110).aspx。该泛型类可以提供仅针对当前线程的全局存储空间,正好能够恰到好处的解决我们现在遇到的问题。
六、说改就改
实现类:
public class DomainEventConsistentQueue : IDisposable
{
private static readonly ThreadLocal<List<IDomainEvent>> _domainEvents = new ThreadLocal<List<IDomainEvent>>();
private static readonly ThreadLocal<bool> _publishing = new ThreadLocal<bool> { Value = false };
private static DomainEventConsistentQueue _current;
/// <summary>
/// 获取当前的领域事件一致性队列。
/// 由于使用了线程本地存储变量,此处为单例模式。
/// </summary>
/// <returns></returns>
public static DomainEventConsistentQueue Current()
{
if (_current != null)
return _current;
var temp = new DomainEventConsistentQueue();
Interlocked.CompareExchange(ref _current, temp, null);
return temp;
}
public void RegisterEvent(IDomainEvent domainEvent)
{
if (_publishing.Value)
{
throw new ApplicationException("当前事件一致性队列已被发布,无法添加新的事件!");
}
var domainEvents = _domainEvents.Value;
if (domainEvents == null)
{
domainEvents = new List<IDomainEvent>();
_domainEvents.Value = domainEvents;
}
if (domainEvents.Any(ent => ent == domainEvent)) //防止相同事件被重复添加
return;
domainEvents.Add(domainEvent);
}
public void Clear()
{
_domainEvents.Value = null;
_publishing.Value = false;
}
public void PublishEvents()
{
if (_publishing.Value)
{
return;
}
if (_domainEvents.Value == null)
return;
try
{
_publishing.Value = true;
foreach (var domainEvent in _domainEvents.Value)
{
DomainEventBus.Instance().Publish(domainEvent);
}
}
finally
{
Clear();
}
}
public void Dispose()
{
Clear();
}
}
使用方式:
var aggregateA = new AggregateRootA();
var aggregateB = new AggregateRootB();
using (var queue = DomainEventConsistentQueue.Current())
{
using (var unitwork = new SqlServerUnitOfWork(GlobalConfig.DBConnectString))
{
aggregateA.Event();
aggregateB.Event();
var isSuccess = unitwork.Commit();
if (isSuccess)
queue.PublishEvents();
}
}
public class AggregateRootA : AggregateRoot
{
public void Event()
{
DomainEventConsistentQueue.Current().RegisterEvent(new DomainEventA());
}
}
public class AggregateRootB : AggregateRoot
{
public void Event()
{
DomainEventConsistentQueue.Current().RegisterEvent(new DomainEventB());
}
}
public class DomainEventA : IDomainEvent
{
public DateTime OccurredOn()
{
throw new NotImplementedException();
}
public void Read()
{
throw new NotImplementedException();
}
public bool IsRead
{
get { throw new NotImplementedException(); }
}
}
public class DomainEventB : IDomainEvent
{
public DateTime OccurredOn()
{
throw new NotImplementedException();
}
public void Read()
{
throw new NotImplementedException();
}
public bool IsRead
{
get { throw new NotImplementedException(); }
}
}
这样代码看起来比之前优雅多了。这里的 DomainEventConsistentQueue.Current() 中操作的变量针对同一个线程在哪都是共享的,所以我们只管往里丢数据就好了~
七、方案的局限性
对于执行上下文的要求较高,整个领域事件的发布必须要求在同一线程内操作。所以在使用的过程中尽量避免这种情况的发生。如果实在无法避免只能通过把DomainEventConsistentQueue 当作变量在多个线程之间传递了。
以上是个人的想法,可能有所考虑不周~ 不知道各位园子里的小伙伴们是否有处理过类似场景的经验,欢迎留言探讨,相互学习~
原创文章,转载请注明本文链接: https://zacharyfan.com/archives/61.html
关于作者:张帆(Zachary,个人微信号:Zachary-ZF)。坚持用心打磨每一篇高质量原创。欢迎扫描二维码~
定期发表原创内容:架构设计丨分布式系统丨产品丨运营丨一些思考。
如果你是初级程序员,想提升但不知道如何下手。又或者做程序员多年,陷入了一些瓶颈想拓宽一下视野。欢迎关注我的公众号「跨界架构师」,回复「技术」,送你一份我长期收集和整理的思维导图。
如果你是运营,面对不断变化的市场束手无策。又或者想了解主流的运营策略,以丰富自己的“仓库”。欢迎关注我的公众号「跨界架构师」,回复「运营」,送你一份我长期收集和整理的思维导图。