事件溯源
将应用程序状态的所有更改捕获为一系列事件。
2005 年 12 月 12 日
这是我在 2000 年代中期进行的 企业应用程序架构进一步发展 编写的一部分。不幸的是,此后太多其他事情吸引了我的注意力,因此我没有时间进一步研究它们,而且在可预见的未来我也没有看到太多时间。因此,这些材料非常草稿形式,我不会进行任何更正或更新,直到我能找到时间再次处理它们。
我们可以查询应用程序的状态以了解当前的世界状态,这可以回答许多问题。但是,有时我们不仅想知道我们在哪里,还想知道我们是如何到达那里的。
事件溯源 确保应用程序状态的所有更改都存储为一系列事件。我们不仅可以查询这些事件,还可以使用事件日志来重建过去的状态,并作为自动调整状态以应对追溯性更改的基础。
工作原理
事件溯源 的基本思想是确保对应用程序状态的每一次更改都捕获在一个事件对象中,并且这些事件对象本身按应用的顺序存储,与应用程序状态本身的生存期相同。
让我们考虑一个关于航运通知的简单示例。在这个例子中,我们有很多船只在公海上航行,我们需要知道它们在哪里。一个简单的方法是使用一个跟踪应用程序,该应用程序具有方法允许我们知道船只何时到达或离开港口。
图 1:一个简单的跟踪航运移动的接口。
在这种情况下,当调用服务时,它会找到相关的船只并更新其位置。船只对象记录船只的当前已知状态。
引入 事件溯源 在此过程中添加了一个步骤。现在服务创建了一个事件对象来记录更改并处理它以更新船只。
图 2:使用事件来捕获更改。
仅从处理的角度来看,这只是一个不必要的间接级别。有趣的是,当我们查看在几次更改后应用程序中持久化的内容时。让我们想象一些简单的更改
- 船只“国王罗伊”离开旧金山
- 船只“王子特雷弗”抵达洛杉矶
- 船只“国王罗伊”抵达香港
使用基本服务,我们只看到船只对象捕获的最终状态。我将此称为应用程序状态。
图 3:简单跟踪器跟踪的几次移动后的状态。
使用 事件溯源,我们还捕获每个事件。如果我们使用持久存储,则事件将与船只对象一样持久化。我发现说我们正在持久化两件不同的事情很有用:应用程序状态和事件日志。
图 4:事件溯源跟踪器跟踪的几次移动后的状态。
使用 事件溯源 获得的最明显的好处是我们现在拥有所有更改的日志。我们不仅可以看到每艘船在哪里,还可以看到它去过哪里。但是,这是一个很小的收获。我们也可以通过在船只对象中保留过去港口的历史记录,或者在船只移动时写入日志文件来做到这一点。这两种方法都可以为我们提供足够的历史记录。
事件溯源 的关键是我们保证所有对域对象的更改都是由事件对象发起的。这会导致许多可以在事件日志之上构建的功能
- 完全重建:我们可以完全丢弃应用程序状态,并通过从事件日志中重新运行事件到一个空的应用程序来重建它。
- 时间查询:我们可以确定应用程序在任何时间点的状态。名义上,我们通过从空白状态开始并重新运行事件直到特定时间或事件来做到这一点。我们可以通过考虑多个时间线(类似于版本控制系统中的分支)来进一步扩展这一点。
- 事件重播:如果我们发现过去的事件不正确,我们可以通过反转它和后面的事件,然后重播新的事件和后面的事件来计算后果。(或者,通过丢弃应用程序状态并按顺序重播所有事件,包括正确的事件。)相同的技术可以处理以错误顺序接收的事件 - 异步消息传递系统中的常见问题。
使用 事件溯源 的应用程序的常见示例是版本控制系统。这样的系统经常使用时间查询。Subversion 在您使用 dump 和 restore 将内容从存储库文件之间移动时使用完全重建。我不知道有任何系统进行事件重播,因为它们对这些信息并不特别感兴趣。使用 事件溯源 的企业应用程序比较少见,但我已经看到了一些使用它的应用程序(或应用程序的一部分)。
应用程序状态存储
使用 事件溯源 的最简单方法是从空白应用程序状态开始,然后应用事件以达到所需状态,从而计算请求的应用程序状态。同样简单地可以看出为什么这是一个缓慢的过程,尤其是在事件很多的情况下。
在许多应用程序中,更常见的是请求最近的应用程序状态,如果是这样,更快的替代方法是存储当前的应用程序状态,如果有人想要 事件溯源 提供的特殊功能,那么该附加功能将在其之上构建。
应用程序状态可以存储在内存中或磁盘上。由于应用程序状态完全可以从事件日志中推导出,因此您可以在任何地方缓存它。在工作日使用的系统可以在一天开始时从隔夜快照启动,并将当前应用程序状态保存在内存中。如果它崩溃,它会从隔夜存储中重播事件。在工作日结束时,可以创建一个新的状态快照。可以在任何时间并行创建新的快照,而不会使正在运行的应用程序停止运行。
官方记录系统可以是事件日志或当前应用程序状态。如果当前应用程序状态保存在数据库中,那么事件日志可能只用于审计和特殊处理。或者,事件日志可以是官方记录,并且可以在需要时从它们构建数据库。
构建事件处理程序逻辑
关于将事件处理逻辑放在哪里有很多选择。主要的选择是将逻辑放在 事务脚本 中还是 域模型 中。与往常一样,事务脚本 更适合简单的逻辑,而 域模型 更适合事情变得更复杂的情况。
总的来说,我注意到通过事件或命令驱动更改的应用程序倾向于使用 事务脚本。事实上,有些人认为这是构建以这种方式驱动的系统的必要方法。然而,这是一种错觉。
一个好的思考方式是,这里涉及两个责任。处理域逻辑是操作应用程序的业务逻辑。处理选择逻辑是根据传入事件选择哪个处理域逻辑块应该运行的逻辑。您可以将它们组合在一起,本质上这就是 事务脚本 方法,但您也可以通过将处理选择逻辑放在事件处理系统中来将它们分开,它会调用包含处理域逻辑的域模型中的方法。
做出这个决定后,下一个决定是将处理选择逻辑放在事件对象本身中,还是拥有一个单独的事件处理器对象。处理器的问题是,它必然会根据事件类型运行不同的逻辑,这是一种任何优秀的 OOer 都厌恶的类型切换。在所有条件都相同的情况下,您希望处理选择逻辑在事件本身中,因为这是随事件类型而变化的东西。
当然,并非所有条件都总是相同。在某些情况下,拥有一个单独的处理器是有意义的,例如,当事件对象是一个 DTO 时,它被一些自动手段序列化和反序列化,这些手段禁止在事件中放置代码。在这种情况下,您需要为事件找到选择逻辑。我的倾向是尽可能避免这种情况,如果不能,则将 DTO 视为事件的隐藏数据持有者,并将事件视为常规的多态对象。在这种情况下,值得做一些适度聪明的事情,使用配置文件或(更好)命名约定将序列化事件 DTO 与实际事件匹配。
如果没有必要反转事件,那么让 域模型 不了解事件日志很容易。反转逻辑会使这变得更加棘手,因为 域模型 需要存储和检索先前状态,这使得 域模型 了解事件日志变得更加方便。
反转事件
除了事件向前播放之外,它们也经常能够反转自身,这很有用。
当事件以差异的形式呈现时,反转是最直接的。一个例子是“将马丁的账户增加 10 美元”,而不是“将马丁的账户设置为 110 美元”。在前一种情况下,我可以通过简单地减去 10 美元来反转,但在后一种情况下,我没有足够的信息来重新创建账户的过去值。
如果输入事件不遵循差异方法,那么事件应该确保在处理过程中存储反转所需的一切。您可以通过存储任何更改值的先前值,或通过计算和存储事件上的差异来做到这一点。
当处理逻辑位于域模型内部时,这种存储要求会产生重大影响,因为域模型可能会以事件对象处理不可见的方式改变其内部状态。在这种情况下,最好设计域模型以了解事件并能够使用它们来存储先前值。
值得记住的是,所有反转事件的功能都可以通过恢复到过去的快照并重播事件流来完成。因此,反转对于功能来说永远不是绝对必要的。但是,它可能会对效率产生重大影响,因为您可能经常处于反转几个事件比对大量事件使用正向播放效率更高的位置。
外部更新
事件溯源 中的一个棘手因素是如何处理不遵循这种方法的外部系统(大多数系统都不遵循)。当您向外部系统发送修改器消息以及当您从其他系统接收查询时,您会遇到问题。
事件溯源 的许多优势源于能够随意重播事件,但是如果这些事件导致更新消息被发送到外部系统,那么事情就会出错,因为这些外部系统不知道真实处理和重播之间的区别。
为了处理这种情况,你需要用一个网关包装任何外部系统。这本身并不太麻烦,因为无论如何这都是一个好主意。网关必须更复杂一些,以便它可以处理事件溯源系统正在进行的任何重放处理。
对于重建和时间查询,网关通常可以在重放处理期间被禁用就足够了。你希望以一种对领域逻辑不可见的方式做到这一点。如果领域逻辑调用 PaymentGateway.send,它应该这样做,无论你是否处于重放模式。网关应该通过引用事件处理器并检查它是否处于重放模式来处理这种区别,然后再将外部调用传递到外部世界。
如果你使用的是追溯事件,外部更新会变得更加复杂,请参阅那里的讨论以了解详细信息。
你可能会在外部系统中看到的另一种策略是按时间缓冲外部通知。我们可能不需要立即发出外部通知,而只需要在月底发出。在这种情况下,我们可以更自由地重新处理,直到那个时间出现。我们可以通过让网关存储外部消息直到发布日期,或者通过通知领域事件触发外部消息来处理这个问题,而不是立即进行通知。
外部查询
外部查询的主要问题是,它们返回的数据会影响处理事件的结果。如果我在 12 月 5 日询问汇率,并在 12 月 20 日重放该事件,我需要的是 12 月 5 日的汇率,而不是之后的汇率。
外部系统可能可以通过询问某个日期的值来提供过去的数据。如果可以,并且我们相信它是可靠的,那么我们可以用它来确保一致的重放。我们也可能使用事件协作,在这种情况下,我们只需要确保保留更改的历史记录。
如果我们不能使用这些简单的计划,那么我们必须做一些更复杂的事情。一种方法是将外部系统的网关设计为记住其查询的响应,并在重放期间使用它们。为了完整起见,这意味着需要记住对每个外部查询的响应。如果外部数据变化缓慢,那么只在值发生变化时记住变化可能是合理的。
外部交互
对外部系统的查询和更新都会给事件溯源带来很多复杂性。当你进行涉及两者交互时,你会得到最糟糕的结果。这种交互可能是外部调用,它既返回结果(查询),又会导致外部系统发生状态变化,例如提交一个送货订单,该订单返回该订单的送货信息。
代码更改
因此,本讨论假设处理事件的应用程序保持不变。显然情况不会是这样。事件处理数据更改,那么代码更改怎么办?
我们可以将代码更改分为三种主要类型:新功能、缺陷修复和时间逻辑。
新功能本质上为系统添加了新功能,但不会使之前发生的事情失效。这些功能可以在任何时候自由添加。如果你想利用新功能处理旧事件,你只需重新处理这些事件,新的结果就会出现。
在使用新功能重新处理时,你通常希望外部网关关闭,这是正常情况。例外情况是当新功能涉及这些网关时。即使那样,你可能也不希望对过去的事件发出通知,如果你这样做,你需要对旧事件的第一次重新处理进行一些特殊处理。这会很麻烦,但你只需要做一次。
当你看过去处理并意识到它是错误的时候,就会出现错误修复。对于内部内容,这很容易修复,你只需要进行修复并重新处理事件。你的应用程序状态现在已修复为它应该具有的状态。对于许多情况来说,这确实相当不错。
同样,外部网关会带来复杂性。本质上,网关需要跟踪错误发生时的情况和没有错误发生时的情况之间的差异。这个想法类似于追溯事件需要发生的事情。事实上,如果有很多重新处理需要考虑,那么使用追溯事件机制用事件本身替换事件将是值得的,尽管要做到这一点,你需要确保事件能够正确地撤销错误事件以及正确的事件。
第三种情况是逻辑本身随着时间的推移而发生变化,例如“在 11 月 18 日之前收取 10 美元,之后收取 15 美元”。这种东西实际上需要进入领域模型本身。领域模型应该能够在任何时候运行事件,并使用正确的规则进行事件处理。你可以使用条件逻辑来做到这一点,但如果你有很多时间逻辑,这会变得很混乱。更好的方法是将策略对象挂钩到时间属性:类似于chargingRules.get(aDate).process(anEvent)
。查看协议调度器以了解这种风格。
在处理错误和时间逻辑时,可能存在重叠,因为旧事件需要使用错误代码进行处理。这可能会导致双时间行为:“根据我们在 10 月 1 日的 8 月 1 日规则撤销此事件,并根据我们现在的 8 月 1 日规则替换它”。显然,这些东西会变得非常混乱,除非你真的需要,否则不要走这条路。
其中一些问题可以通过将代码放入数据中来解决。使用自适应对象模型来确定使用对象配置进行处理是一种方法。另一种方法可能是使用某种直接可执行的语言(不需要编译)将脚本嵌入到数据中——例如,将 JRuby 嵌入到 Java 应用程序中。当然,这里的危险是保持适当的配置控制。我会倾向于通过确保对处理脚本的任何更改都以与任何其他更新相同的方式进行处理来做到这一点——通过事件。(尽管到目前为止,我肯定已经从观察转向推测了。)
事件和账户
我在会计系统的背景下看到了一些事件溯源(以及随之而来的模式)的特别强烈的例子。两者在需求(审计对会计系统非常重要)和实现方面都有很好的协同作用。这里的一个关键因素是,你可以安排事情,以便领域事件的所有会计后果都是创建会计分录,并将这些分录链接到原始事件。这为你提供了跟踪更改、撤销等非常好的基础。特别是,它简化了各种调整技术。
何时使用它
将应用程序的每个更改都打包成一个事件是一种接口风格,并非每个人都习惯,许多人发现它很笨拙。因此,它不是一个自然的选择,使用它意味着你期望获得某种形式的回报。
一种明显的回报形式是,很容易序列化事件以创建审计日志。这种审计跟踪对审计很有用,这并不奇怪,但也有其他用途。我和一个将自己的在线账户弄到尴尬状态并打电话寻求帮助的人聊天。他印象深刻的是,帮助者能够准确地告诉他做了什么,因此能够弄清楚如何解决它。要提供这种功能,意味着将审计跟踪暴露给支持组,以便他们能够浏览用户的交互。虽然事件溯源是一种很好的方法,但你也可以使用更常规的日志记录机制来做到这一点,这样就不必处理奇怪的接口。
这种完整审计日志的另一个用途是帮助调试。当然,使用日志文件来调试生产问题已经不是什么新鲜事了。但是事件溯源可以走得更远,允许你创建一个测试环境,并将事件重放到测试环境中,以查看究竟发生了什么,并能够像在调试器中执行测试一样停止、倒带和重放。这对于在将升级投入生产之前进行并行测试特别有价值。你可以在你的测试系统中重放实际事件,并测试你是否得到了你期望的答案。
事件溯源是并行模型或追溯事件的基础。如果你想使用其中任何一种模式,你将需要首先使用事件溯源。事实上,这已经到了这样的程度,很难将这些模式移植到没有使用事件溯源构建的系统上。因此,如果你认为系统将来有合理的可能性需要这些模式,那么现在构建事件溯源是明智的。这似乎是那些不应将此决定留待以后重构的案例之一。
事件溯源也为你的整体架构提出了一些可能性,特别是如果你正在寻找可扩展性很强的架构。如今,人们对“事件驱动架构”非常感兴趣。这个术语涵盖了相当广泛的想法,但大多数都集中在通过事件消息进行通信的系统上。这种系统可以以非常松散耦合的并行方式运行,这提供了出色的水平可扩展性和对系统故障的弹性。
一个例子是具有大量读取器和少量写入器的系统。使用事件溯源,这可以作为一个具有内存数据库的系统集群来实现,通过事件流保持彼此同步。如果需要更新,它们可以路由到单个主系统(或围绕单个数据库或消息队列的更紧密的服务器集群),该系统将更新应用于记录系统,然后将生成的事件广播到更广泛的读取器集群。即使记录系统是数据库中的应用程序状态,这也可能是一个非常有吸引力的结构。如果记录系统是事件日志,那么有很多选项可以实现非常高的性能,因为事件日志是一个纯粹的增量结构,需要最少的锁定。
当然,这种架构并非完美无缺。由于事件传播的时序差异,读取器系统可能与主系统(以及彼此)不同步。但是,这种广泛的架构风格正在使用,我听到的几乎都是关于它的正面评价。
使用这种事件流还可以通过利用事件流并填充自己的模型轻松添加新应用程序,这些模型不需要对所有系统都相同。这是一种非常适合消息传递集成方法的方法。
示例:跟踪船舶(C#)
这是一个非常简单的事件溯源示例,用于了解基本思想。在这个例子中,我故意保持极度简单,作为起点——然后我将使用其他例子来探索一些更复杂的问题。
领域模型是一个简单的模型,它包含运载货物并在港口之间移动的船舶。
有四种类型的事件会影响模型
- 到达:船舶到达港口
- 离开:船舶离开港口
- 装载:货物装载到船上
- 卸载:货物从船上卸载
让我们举一个简单的例子,将船舶移动到周围。
class Tester...
Ship kr; Port sfo, la, yyv; Cargo refact; EventProcessor eProc; [SetUp] public void SetUp() { eProc = new EventProcessor(); refact = new Cargo ("Refactoring"); kr = new Ship("King Roy"); sfo = new Port("San Francisco", Country.US); la = new Port("Los Angeles", Country.US); yyv = new Port("Vancouver", Country.CANADA) ; }
[Test] public void ArrivalSetsShipsLocation() { ArrivalEvent ev = new ArrivalEvent(new DateTime(2005,11,1), sfo, kr); eProc.Process(ev); Assert.AreEqual(sfo, kr.Port); } [Test] public void DeparturePutsShipOutToSea() { eProc.Process(new ArrivalEvent(new DateTime(2005,10,1), la, kr)); eProc.Process(new ArrivalEvent(new DateTime(2005,11,1), sfo, kr)); eProc.Process(new DepartureEvent(new DateTime(2005,11,1), sfo, kr)); Assert.AreEqual(Port.AT_SEA, kr.Port); }
为了使这些测试能够正常工作,我们只需要到达和离开事件。事件处理器非常简单。
class EventProcessor...
IList log = new ArrayList(); public void Process(DomainEvent e) { e.Process(); log.Add(e); }
每个事件都有一个处理方法。
class DomainEvent...
DateTime _recorded, _occurred; internal DomainEvent (DateTime occurred) { this._occurred = occurred; this._recorded = DateTime.Now; } abstract internal void Process();
到达事件只是捕获数据,并有一个处理方法,该方法只是将事件转发到相应的域对象。
class DepartureEvent...
Port _port; Ship _ship; internal Port Port {get { return _port; }} internal Ship Ship {get { return _ship; }} internal DepartureEvent(DateTime time, Port port, Ship ship) : base (time) { this._port = port; this._ship = ship; } internal override void Process() { Ship.HandleDeparture(this); }
因此,在这里事件只执行处理选择逻辑。处理域逻辑由船舶完成。
class Ship...
public Port Port; public void HandleDeparture(DepartureEvent ev) { Port = Port.AT_SEA; }
离开事件只是将船舶的港口设置为特殊情况。您会注意到我将事件传递到域对象中。这里有一个选择,即事件是否应该只传递域对象处理所需的數據,还是传递事件本身。通过传递事件,事件不需要知道域逻辑需要哪些数据。如果事件后来获得了更多数据,则无需更新签名。传递事件的缺点是域逻辑现在意识到事件本身。
这个简单的测试只是展示了基本的事件处理是如何工作的。现在我将展示一些域逻辑,看看它是如何工作的。我们的船只运输书籍,以满足我拥有整个航运公司将我的书籍运往世界各地的幻想。正如您所知,通过加拿大运送书籍非常危险,因为这样做会使书籍有被大量“eh”污染的风险。我见过一些书,几乎每句话都有一个 eh(较长的句子可以得到两个或三个)。
因此,我的完美书籍运输系统能够检测货物是否经过加拿大。
class Tester...
[Test] public void VisitingCanadaMarksCargo() { eProc.Process(new LoadEvent(new DateTime(2005,11,1), refact, kr)); eProc.Process(new ArrivalEvent(new DateTime(2005,11,2), yyv, kr)); eProc.Process(new DepartureEvent(new DateTime(2005,11,3), yyv, kr )); eProc.Process(new ArrivalEvent(new DateTime(2005,11,4), sfo, kr)); eProc.Process(new UnloadEvent(new DateTime(2005,11,5), refact, kr)); Assert.IsTrue(refact.HasBeenInCanada); }
由于货物可能在船舶之间移动和卸载,因此货物有责任知道它是否暴露于这些北方的危险之中。幸运的是,风险只发生在实际停泊在港口时,只是在水域中是安全的。因此,我们的到达事件必须跟踪这一点。
class ArrivalEvent...
Port _port; Ship _ship; internal ArrivalEvent (DateTime occurred, Port port, Ship ship) : base (occurred) { this._port = port; this._ship = ship; } internal Port Port {get {return _port;}} internal Ship Ship {get{return _ship;}} internal override void Process() { Ship.HandleArrival(this); }
处理程序再次是 Ship 对象。
class Ship...
IList cargo; public void HandleArrival (ArrivalEvent ev) { Port = ev.Port; foreach (Cargo c in cargo) c.HandleArrival(ev); }
船舶不负责跟踪加拿大的访问,因此它将到达通知传递给货物。
class Cargo...
public bool HasBeenInCanada = false; public void HandleArrival(ArrivalEvent ev) { if (Country.CANADA == ev.Port.Country) HasBeenInCanada = true; }
考虑事件处理方法包含域逻辑的另一种情况 - 当域逻辑变得更加复杂时,它需要了解大量域模型。在这种方法中,域对象将事件传递给相关对象,以便它们可以处理事件以执行它们需要做的事情。
示例:更新外部系统 (C#)
事件溯源的一个重要功能是您可以根据需要重新处理事件。但是,如果处理事件会导致与外部系统的交互,那么这是一件坏事。最好的结果是他们会厌倦你所有的垃圾邮件事件。
一种简单的解决方法是确保您的系统通过网关调用外部系统,这些网关可以配置为确保除非您正在“真正”处理事件,否则不会发送任何消息。
我将用一个简单的例子来说明这一点,使用船舶和港口(这次没有货物)。假设每当船舶进入港口时,它必须通知当地海关当局。我们可以在事件处理域逻辑中实现这一点。
class Port...
public void HandleArrival (ArrivalEvent ev) { ev.Ship.Port = this; Registry.CustomsNotificationGateway.Notify(ev.Occurred, ev.Ship, ev.Port); }
请注意,此代码只是在网关对象上调用通知,它不关心这是真正的处理还是某种重放。这里的一般原则是域逻辑不应该关心事件运行的上下文。
网关负责确定是否实际发送消息。由于这种情况非常简单,它只是通过链接到事件处理器并检查处理器是否处于活动状态来实现这一点。
class CustomsEventGateway...
EventProcessor processor; public void Notify (DateTime arrivalDate, Ship ship, Port port) { if (processor.isActive) SendToCustoms(BuildArrivalMessage(arrivalDate, ship, port)); }
事件处理器在进行常规处理时只是使自己处于活动状态。
class EventProcessor...
public void Process(DomainEvent e) { isActive = true; e.Process(); isActive = false; log.Add(e); }
虽然这种情况非常简单,但基本原理是相同的。网关决定是否发送外部消息,而不是域逻辑。网关根据它们收集的有关处理上下文的的信息来决定这一点。在这种情况下,来自处理器的简单布尔状态就足够了。
示例:反转事件 (C#)
在这里,我们将以运输示例为例,看看如何反转事件。我们需要反转的关键是确保我们可以准确地计算出由于事件而发生状态更改的任何对象的先前状态。
存储此先前数据的最佳位置是在事件本身,这与示例中将事件传递给域对象的做法非常吻合。由于域对象拥有事件,因此它们可以轻松地在事件上存储信息。
加载事件提供了一个简单的示例。事件包含以下源数据。
class LoadEvent...
int _shipCode; string _cargoCode; internal LoadEvent(DateTime occurred, string cargo, int ship) : base(occurred){ this._shipCode = ship; this._cargoCode = cargo; } internal Ship Ship {get { return Ship.Find(_shipCode); }} internal Cargo Cargo {get { return Cargo.Find(_cargoCode); }}
处理被传递给货物对象,货物对象需要存储货物的先前位置的港口。
class LoadEvent...
internal override void Process() { Cargo.HandleLoad(this); }
internal Port priorPort;
class Cargo...
internal void HandleLoad(LoadEvent ev) { ev.priorPort = _port; _port = null; _ship = ev.Ship; _ship.HandleLoad(ev); }
为了反转事件,我们添加了一个反转方法,该方法镜像处理方法,在域对象上调用反转方法。
class LoadEvent...
internal override void Reverse() { Cargo.ReverseLoad(this); }
class Cargo...
public void ReverseLoad(LoadEvent ev) { _ship.ReverseLoad(ev); _ship = null; _port = ev.priorPort; }
在这种情况下,事件承担了一堆可变的先前数据,这些数据是其处理数据的一部分。在这种情况下,简单的字段就足够了。其他情况可能需要更复杂的数据结构。当货物处理到达事件时,它会跟踪它是否在加拿大 eh。它可以使用一个简单的布尔字段来做到这一点。为了在事件上存储先前值,需要比简单字段更多的东西,因为许多货物可能会受到到达的影响。因此,在这种情况下,我使用一个以货物为索引的映射。
class Cargo...
public void HandleArrival(ArrivalEvent ev) { ev.priorCargoInCanada[this] = _hasBeenInCanada; if ("CA" == ev.Port.Country) _hasBeenInCanada = true; } private bool _hasBeenInCanada = false; public bool HasBeenInCanada {get { return _hasBeenInCanada;}}
class ArrivalEvent...
internal Port priorPort; internal IDictionary priorCargoInCanada = new Hashtable();
然后反转
class Cargo...
public void ReverseArrival(ArrivalEvent ev) { _hasBeenInCanada = (bool) ev.priorCargoInCanada[this]; }
这个例子很好地说明了事件上的源数据和错误处理如何影响我们进行反转的方式。对于加载事件,我们需要存储货物加载时的港口。如果事件在其源数据中包含了这一点,我们就不需要这样做。一些额外的源数据消除了添加先前数据的必要性。但这并不适用于所有地方:到达事件无法获取其货物的先前加拿大状态。
被认为是正确处理的内容也会产生影响。在这个系统中,我们有船舶的到达事件和离开事件。假设一切正常,我们应该始终获得这些交错的事件。因此,船舶可以通过将其港口字段设置为 Port.OUT_TO_SEA
来反转到达事件。当我们连续获得两个到达事件时会发生什么?为了反转这一点,我们需要在船舶上存储先前的港口。我们的另一种选择是将没有离开的第二个到达声明为错误,我们不需要进行此存储。
示例:外部查询 (C#)
即使对于基本的事件溯源,外部查询也很尴尬,因为如果您想重建应用程序状态,您需要使用过去进行的外部查询响应来完成此操作。
让我们想象一下,船舶必须在进入港口时确定其货物的价值。此估值由外部服务完成。如果船舶在 11 月 3 日进入港口,我立即处理事件,我将获得该货物在 11 月 3 日的价值。如果我在 12 月 5 日重建我的应用程序状态,我将希望获得相同的货物价值,即使它的价值在此期间发生了变化。
对于这个例子,我将展示一种处理方法,通过记录外部查询并使用它们在重新处理事件时提供值。在许多方面,它与您在外部更新中使用的方法类似 - 将交互转换为系统边界的事件,并使用事件记录来记住发生了什么。
使用我在这些示例中其他地方使用的类似于事件处理的风格,我们将事件传递给域对象进行处理。在这种情况下,货物启动对外部系统的调用并保存值。(我们假设我们将对这个值做一些有用的事情,但这与这个例子无关。)
class Cargo...
public void HandleArrival(ArrivalEvent ev) { declaredValue = Registry.PricingGateway.GetPrice(this); } private Money declaredValue;
按照我的习惯,我通过一个我控制的网关封装所有外部系统访问。基本网关只会将调用转换为外部交互所需的任何内容。为了支持重放事件,我只是用一个类来包装它,该类记录这些查询。
图 6:使用日志记录网关包装网关以支持从事件正确重建。
日志记录网关检查每个调用以查看它是否有一个与之匹配的旧请求,如果没有,它将发出一个新请求。
class LoggedPricingGateway...
public Money GetPrice(Cargo cargo) { GetPriceRequest oldReq = oldRequest(cargo); if (null != oldReq) return (Money) oldReq.Result; else return newRequest(cargo); }
如果请求是新的,它会将请求转换为事件对象,调用外部查询,并将其存储在日志中。
class LoggedPricingGateway...
private Money newRequest(Cargo cargo) { GetPriceRequest request = new GetPriceRequest(cargo); request.Result = gateway.GetPrice(cargo); log.Store(request); return (Money) request.Result; }
private class GetPriceRequest : QueryEvent { private Cargo cargo; public GetPriceRequest(Cargo cargo) : base() { this.cargo = cargo; }
class QueryEvent...
DomainEvent _eventBeingProcessed; Object result; public QueryEvent() { _eventBeingProcessed = Registry.EventProcessor.CurrentEvent; } public object Result { get { return result; } set { result = value; } } public DomainEvent EventBeingProcessed { get { return _eventBeingProcessed; } } }
因此,为了找到旧的请求,它会搜索其日志。
class LoggedPricingGateway...
private GetPriceRequest oldRequest(Cargo cargo) { IList candidates = log.FindBy(EventProcessor.CurrentEvent, typeof (GetPriceRequest)); foreach (GetPriceRequest request in candidates) { if (request.Cargo.RegistrationCode == cargo.RegistrationCode) return request; } return null; }
查询日志是通用的,因此我们可以使用已处理的域事件和请求类型发出查询以获取一些项目。这给了我们一个需要在网关特定方式进行进一步检查的小集合。
请求日志需要以与域事件日志相同的方式持久化,因为它需要重建应用程序状态。