追溯事件
自动修正已经处理过的错误事件的后果。
2005年12月12日
这是我在2000年代中期进行的企业应用架构进一步发展写作的一部分。遗憾的是,此后太多其他事情吸引了我的注意力,所以我没有时间进一步研究它们,在可预见的未来我也没有看到太多时间。因此,这些材料非常草稿形式,我不会进行任何修正或更新,直到我有时间再次处理它。
企业应用程序是关于从世界中获取信息,对这些信息进行各种计算,并在该世界中启动进一步的操作。它们执行的计算和启动的操作只能与它们接收的信息一样准确。如果你在输入中得到错误,你就会在输出中得到错误。
我们已经习惯了这种现象,但是一旦我们在输入中发现错误,就很难修复。在大多数情况下,人类必须弄清楚系统对错误输入做了什么,系统如何反应,系统应该如何反应(如果它获得了正确的输入)以及如何纠正问题。
事件溯源的吸引力之一是它为使这项繁重的任务变得容易得多提供了基础。事实上,对事件及其后果的仔细记录使人类更容易进行修正。追溯事件更进一步,允许系统自动修正许多错误事件的结果。
追溯事件建立在事件溯源的基础之上,因此你需要熟悉事件溯源才能理解追溯事件。特别是,我将使用我在事件溯源中定义的许多术语。
工作原理
处理追溯事件意味着我们的当前应用程序状态在某种程度上是不正确的。如果我们收到了追溯事件,我们现在将处于与现在不同的应用程序状态。你可以把它想象成三个并行模型
- 错误现实:存在当前的实时应用程序状态,它没有考虑追溯事件。
- 正确分支:如果追溯事件被处理,我们应该得到的应用程序状态。
- 修正现实:我们想要最终得到的实时应用程序状态。
在许多情况下,修正现实将与正确分支相同 - 也就是说,我们将重新处理事件日志以考虑追溯事件。有时我们无法完全做到这一点。
我们的第一步是找出错误现实和正确分支在哪里分歧。这本质上是事件日志中应该处理追溯事件的点。我称之为分支点 - 再次使用来自源代码控制世界的术语。分支点是事件日志中追溯事件应该插入之前的那一点。
我们可以通过两种方式构建分支点:重建和回溯。
- 在重建中,我们将应用程序状态恢复到追溯事件之前的最后一个快照。然后,我们向前处理所有事件,直到到达分支点。
- 在回溯/重放中,我们从最新事件向后反转事件,直到到达分支点。
我们可以在并行模型中构建这个分支点,或者我们可以将我们的实时应用程序状态恢复到分支点。如果我们恢复了实时应用程序状态,那么当我们向前移动时,我们将自动创建我们的正确分支,并且修正现实将是相同的。如果我们使用并行模型,我们将在并行模型中拥有正确分支,并且需要对实时应用程序状态进行更改以将其转换为修正现实,本质上是将差异合并到实时状态中。
追溯事件主要有三种类型:乱序事件、拒绝事件和错误事件。乱序事件是指接收较晚的事件,晚到足以让你已经处理了应该在接收乱序事件后处理的事件。拒绝事件是指你意识到现在是错误的并且不应该处理的事件。错误事件是指你收到了关于事件的错误信息。
这三种事件都需要你在分支点之后对事件流进行不同的操作。
- 对于乱序事件,我们在分支点插入并处理追溯事件。
- 对于拒绝事件,我们反转事件并将其标记为拒绝。实际上是对事件日志的删除。
- 对于错误事件,我们反转原始事件并将其标记为拒绝。我们插入追溯事件并处理它。你可以把它想象成一个被拒绝的事件和一个乱序事件被处理为一个。
标记为拒绝的事件被所有后续处理忽略,因此它们不会在重放期间重新处理,也不会在将来的回溯中反转。它们保留在事件日志中以维护历史记录,但除此之外被忽略。我们不需要这样做,作为替代方案,我们可以在原始事件之后立即添加一个反转,但显然,像这样彼此紧随其后地处理和反转事件效率低下。
如果事件的错误信息改变了处理顺序,错误事件可能会带来进一步的复杂性。因此,我们得到了一艘在 4 月 1 日到达的船,但我们的修正表明它在 3 月 1 日到达。对于这些情况,我们的分支点是拒绝事件和正确事件中较早的事件。我们接下来的处理取决于哪个事件更早。如果,如本例所示,正确事件更早,我们处理它并将旧事件标记为拒绝。然后,我们可以向前重放,旧事件将被跳过。如果旧事件是第一个,那么我们反转它,重放到新事件,处理它并继续向前处理。
当我们讨论这个问题时,请记住,追溯事件本身始终是一个事件。这对乱序事件没有影响,但它确实会影响其他两种情况。考虑拒绝事件的情况 - 实际上,这是对事件日志中事件的删除。但是,事件日志的全部意义在于我们永远不会删除事件。因此,我们可以做的是在日志中插入一个拒绝事件,处理该拒绝事件会进行我描述的更改。拒绝事件本身总是被拒绝,因此它永远不会在重建中处理。错误事件可以用相同的方式处理,使用一个包含旧事件和新事件的替换事件。实际上,你可以将拒绝事件视为没有修正事件的替换事件。
我刚刚说过,拒绝事件和替换事件总是被拒绝,因此它们不会重放。如果你总是希望你的并行模型使用当前最佳知识构建,那么这是正确的。但是,如果你正在构建一个双时间并行模型,即一个关于我们过去某个日期的知识的模型,那么你将进行更复杂的处理,这将考虑一些拒绝事件。
从并行模型中合并
如果你遵循在并行模型中构建正确分支的风格,那么你将不得不找出正确分支和当前现实之间的差异,然后将这些差异合并以形成修正现实。
要发现更改,你需要检查两个模型中的所有对象,以查看哪些对象已更改,并检查仅存在于一个模型中的任何对象。显然,这可能是一项巨大的任务,你可以通过跟踪自分支点以来哪些对象受处理的事件影响来减少这项任务。你还可以进行一些类似于选择性重放的分析,以确定哪些对象的子集将被追溯事件更改。
找到更改后,现在需要将这些更改应用于实时模型。这里值得考虑的是是否将这些更改作为事件本身应用。这不是严格必要的,因为所有更改都应该完全可以从初始事件的分析中计算出来,但由于这种分析相当复杂,因此将合并更改捕获在它们自己的事件中可能很有用。
显然,这种合并处理可能会变得非常复杂,合并处理的复杂性是你在决定如何处理追溯事件时需要牢记的重要因素。
优化
由于处理追溯事件与创建并行模型非常相似,因此那里的许多优化讨论也适用于这里。反转、快照和选择性重放的技术都可以帮助你以更少的事件处理量到达分支点。
如果你使用选择性重放到达分支点,你可以使用相同的选择性重放来处理分支点之后的事件。
测试想法
与反转一起使用的特别有用的测试技术是始终确保你添加的任何行为在重放下都能正常工作,方法是在测试用例的事件中添加一个追溯事件,这将强制整个测试事件序列被重放。
另一个测试想法(虽然我还没有直接看到任何案例)是随机生成事件序列,然后以随机不同的顺序处理它们,以确保它们始终导致相同的应用程序状态。
更新外部系统
事件溯源总是会导致与未以相同方式构建的外部系统不匹配。对于常规的事件溯源,我们必须确保在重建期间不会向外部系统发送更新消息。这相对容易,只需关闭网关即可。但是,对于追溯事件,我们必须更进一步。我们需要弄清楚追溯事件是否会导致我们应该进行的更新发生任何变化,然后处理这些变化的通知。
这个问题有两个部分:检测和修正。
基本的检测方案是确定在错误现实中发送了哪些更新,在正确分支中应该发送了哪些更新,然后找到两个更新之间的任何差异。
一个好方法是通过将每个外部更新转换为事件,在网关本身使用事件溯源。如果我们这样做,我们可以在不同的模型中捕获这些事件并进行比较。为此,我们确保我们有一个机制来捕获所有外部更新事件并构建我们的两个列表。我们对自分支点以来发送的所有更新事件感兴趣。如果我们使用回溯,我们可以捕获错误现实更新,如果我们确保事件反转发送我们可以捕获的更新事件。
一旦我们有了错误现实和正确分支的两个更新事件列表,我们就可以比较它们以查找不匹配。在两个列表中相同的任何更新都可以忽略,重要的是要发现一个列表中存在但另一个列表中不存在的事件。然后,我们可以构建两个新的不同事件列表。这些更新事件是我们需要修正的事情。
我在上面定义的所有内容都可以通用地完成,一个通用组件可以跟踪事件并找到两个不匹配的事件列表。然而,处理不匹配需要针对每个外部系统单独编程,具体取决于外部系统需要采取的补偿措施。很可能无法自动纠正,而是需要人工干预来解决问题。但至少系统可以向用户提供一个很好的描述,说明发生了什么,以及应该发生什么。
何时使用它
使用追溯事件的主要原因是当自动执行对过去输入的修正时很有用。与任何自动化一样,您需要查看修正的频率和难度。如果大量的人工时间花在进行修正上,那么值得考虑使用类似追溯事件的东西来实现自动化。使用追溯事件进行自动化的最大优势在于它是一个完全通用的过程,一旦您为任何一个事件实现了追溯事件,将其扩展到其他事件就相对容易了。
我很少看到追溯事件,这有充分的理由。为了实现追溯事件,您需要准备一些重要的前提条件。您需要事件溯源,如果这还不够,您还需要添加可逆性或并行模型。这些不是小事。因此,构建一个支持追溯事件的应用程序是一个重大的决定,会影响整个系统。对现有系统进行必要的重构也是非同小可的。这种自动错误修正的需求通常不是早期需求,因此很容易最终得到一个需要大量工作才能实现追溯事件的设计。
如果一个系统与外部系统有许多链接,那么这会给使用追溯事件带来相当大的复杂性。完整的追溯处理需要完全访问信息才能执行,并将生成外部更新的每一个细微变化。如果您与外部系统有大量的集成,这是一种常见情况,您需要仔细研究如何处理与它们的追溯性,以确定追溯事件是否是一种可行的使用方式。
请记住,追溯事件不是一个非此即彼的选择。您可以通过几种方法来限制追溯事件的范围,以减少其范围,但仍然保留一些实用性。一种减少是只允许追溯事件应用于系统的子集 - 特别是外部影响较小的区域。事实上,这就是我看到它的地方 - 用于帐户。另一个范围缩减是时间,许多业务运营适合固定的周期,每周、每月、每年。在这种情况下,您可能只对当前处理周期内的事件(例如,过去一周)使用追溯事件,然后再进行结算。您也可以在周期内使用更积极的追溯事件形式,在封闭周期内使用更被动的形式。
我最后提到的追溯事件很少见的原因是我认为人们并不清楚要实现它需要做什么。我希望这种模式能够在一定程度上消除这种障碍,从而帮助我们找到其他人。
示例:使用回溯的追溯货物(C#)
我一直用一个基于在港口之间运输货物的示例来说明围绕事件溯源的大多数示例。我将继续使用该示例,这次看看如何使用追溯事件。
首先,我们需要对如何进行追溯事件做出一些战略决策。对于这个例子,我将使用通过回溯实时模型来达到分支点的方法。由于示例很简单,我不会使用任何选择性重播,而是直接回溯完整的事件列表。
由于我们使用的是回溯,因此所有事件都必须是可逆的。我不会详细介绍事件处理或逆转,请参阅事件溯源中的示例,了解如何实现。
追溯事件行为由一种特殊类型的事件触发。
class ReplacementEvent...
private DomainEvent original, replacement; public DomainEvent Original {get { return original; }} public DomainEvent Replacement {get { return replacement; }} public ReplacementEvent(DomainEvent oldEvent, DomainEvent replacement) : base(oldEvent.Occurred) { this.original = oldEvent; this.replacement = replacement; } internal override void Process() { throw new Exception("Replacements should not be processed directly"); } internal override void Reverse() { throw new Exception("Cannot reverse replacements"); }
您会注意到,在这些情况下,我已经阻止了基本过程和逆转方法。在这些示例中,我大多数时候都更喜欢让事件处理自己的处理逻辑。替换不同:它们不包含任何领域逻辑,它们的处理涉及对事件队列的了解以及对该事件队列的操作。我更喜欢让事件处理器单独处理与事件队列的密切交互,因此实际上我在事件队列中构建了所有替换行为。
class EventProcessor...
public void Process(DomainEvent ev) { try { if (ev is ReplacementEvent) ProcessReplacement((ReplacementEvent) ev); else if (OutOfOrder(ev)) ProcessOutOfOrder(ev); else BasicProcessEvent(ev); } catch (Exception ex) { ev.ProcessingError = ex; if (ShouldRethrowExceptions) throw ex; } InsertToLog(ev); }
您会注意到,我在这里犯了面向对象编程的致命错误:基于方法参数类型的显式条件行为。我并不经常这样做,但在这里我这样做是因为我希望事件处理器将对队列的了解保留给自己。(我也可以使用双重分派,但这里的情况似乎并不复杂到需要它。)
您可能还会注意到,我这里没有针对拒绝事件的案例。正如我们将会看到的,我通过将这些事件视为替换事件来处理它们,其中替换事件为 null。
我将从最简单的案例开始。基本 Process(和 reverse)方法只是一个简单的包装器,允许我在需要时添加一些跟踪行为。
class EventProcessor...
private void BasicProcessEvent(DomainEvent e) { traceProcess(e); e.Process(); } private void BasicReverseEvent(DomainEvent e) { traceReverse(e); e.Reverse(); }
乱序事件是最容易描述的。这些只是我们收到的顺序错误的常规事件,我们不使用替换事件来模拟它们。本质上,这些事件将一个新事件插入流中。处理器通过简单地将其与最后一个事件进行比较来测试乱序事件。
class EventProcessor...
private bool OutOfOrder(DomainEvent e) { if (LogIsEmpty()) return false; return (LastEvent().after(e)); } private DomainEvent LastEvent() { if (LogIsEmpty()) return null; return (DomainEvent) log[log.Count - 1]; } private bool LogIsEmpty() { return 0 == log.Count; }
为了使这个例子变得非常简单,我只是根据发生的日期对事件进行排序。要真正做到这一点,可能只需将时间点的分辨率提高到更精细的程度。有时,其他因素也会影响排序。
处理乱序案例的概要很简单。
class EventProcessor...
private void ProcessOutOfOrder(DomainEvent e) { RewindTo(e); BasicProcessEvent(e); ReplayAfter(e); }
要回溯,我只需从日志中选择所有晚于乱序事件的事件。同样为了简单起见,我将日志(或至少是它的缓存)保存在内存中。
class EventProcessor...
private void RewindTo(DomainEvent priorEvent) { IList consequences = Consequences(priorEvent); for (int i = consequences.Count - 1; i >= 0; i--) BasicReverseEvent(((DomainEvent) consequences[i])); } private IList Consequences(DomainEvent baseEvent) { IList result = new ArrayList(); foreach (DomainEvent candidate in log) if (candidate.IsConsequenceOf(baseEvent)) result.Add(candidate); return result; }
class DomainEvent...
public bool IsConsequenceOf(DomainEvent other) { return (!ShouldIgnoreOnReplay && this.after(other)); }
如您所见,并非所有事件在回溯期间都被选中进行重新处理,即使我没有使用任何选择性重播。本质上,我不会重新处理错误事件或已被拒绝的事件。
再次向前重播事件很简单
class EventProcessor...
private void ReplayAfter(DomainEvent ev) { foreach (DomainEvent e in Consequences(ev)) BasicProcessEvent(e); }
现在让我们继续讨论替换。
class EventProcessor...
private void ProcessReplacement(ReplacementEvent e) { if (e.Original.ShouldIgnoreOnReplay) throw new ProcessingException("Cannot replace event twice"); else if (null == e.Replacement) ProcessRejection(e); else if (e.HasPriorReplacement) ProcessPriorReplacement(e); else ProcessPriorOriginal(e); }
class ReplacementEvent...
public bool HasPriorReplacement { get { if (null == replacement) return false; else return original.after(replacement); } }
这里有几个案例需要处理。如果原始事件已经被标记为忽略,这意味着存在处理错误,因为我们不应该拒绝已经被拒绝的事件。完成此操作后,我们有三个主要案例:替换是拒绝,替换事件早于原始事件,以及原始事件早于替换事件。
让我们从拒绝案例开始,它由一个 null 替换事件指示。在这里,我们回溯到被拒绝的事件,拒绝它,逆转它,然后向前重播。
class EventProcessor...
private void ProcessRejection(ReplacementEvent e) { RewindTo(e.Original); BasicReverseEvent(e.Original); e.Original.Reject(); ReplayAfter(e.Original); }
class DomainEvent...
public bool after (DomainEvent other) { return this.CompareTo(other) > 0; } public void Reject() { _isRejected = true; } private bool _isRejected; public virtual bool ShouldIgnoreOnReplay { get { if (WasProcessingError) return true; return _isRejected; } }
拒绝会标记事件,使其不再被处理或回溯。
如果我们有一个先前的替换,我们回溯到替换,拒绝原始事件,处理替换,然后向前重播。
class EventProcessor...
private void ProcessPriorReplacement(ReplacementEvent e) { RewindTo(e.Replacement); e.Original.Reject(); BasicProcessEvent(e.Replacement); ReplayAfter(e.Replacement); }
对于先前的原始事件,我们回溯到原始事件,逆转并拒绝它,重播到替换,处理它,然后继续向前重播。
class EventProcessor...
private void ProcessPriorOriginal(ReplacementEvent e) { RewindTo(e.Original); BasicReverseEvent(e.Original); e.Original.Reject(); ReplayBetween(e.Original, e.Replacement); BasicProcessEvent(e.Replacement); ReplayAfter(e.Replacement); } private void ReplayBetween(DomainEvent first, DomainEvent last) { IList eventsToReplay = new ArrayList(); foreach (DomainEvent e in log) { if (e.IsConsequenceOf(first) && last.after(e)) eventsToReplay.Add(e); } foreach (DomainEvent e in eventsToReplay) BasicProcessEvent(e); }
我在这里遵循一个约定,即在事件成功处理之前不会将事件添加到日志中。如果日志不在与应用程序状态相同的交易中,这很有用。如果它们都在一个交易中,我可以在到达分支点并向前重播后将事件添加到日志中(这将拾取新事件)。
示例:更新外部系统(C#)
对于简单的事件溯源,在重播期间关闭外部通知就足够了。然而,对于追溯事件,我们需要更进一步,进行检测和修正。为了在回溯/重播期间做到这一点,我们将保存所有生成的事件并进行比较,以确定我们需要修正什么。然后,我们将假设一个简单的自动修正案例,即发送取消消息和新消息以进行更改。
我将继续使用航运示例,这里假设美国海关当局需要在任何货物进入经过加拿大的美国港口时收到通知。
外部通知由 Cargo 对象在处理到达事件时发出。
class Cargo...
public void HandleArrival(ArrivalEvent ev) { ev.priorCargoInCanada[this] = _hasBeenInCanada; if ("CA" == ev.Port.Country) _hasBeenInCanada = true; if (HasBeenInCanada && "US" == ev.Port.Country) { Registry.CustomsGateway.Notify(ev.Occurred, ev.Ship, ev.Port); ev.WasNotificationSent = true; } } private bool _hasBeenInCanada = false; public bool HasBeenInCanada {get { return _hasBeenInCanada;}}
为了逆转此事件,我们在正向播放中保存了是否发送通知的记录,并在发送时再次发送它。
class Cargo...
public void ReverseArrival(ArrivalEvent ev) { _hasBeenInCanada = (bool) ev.priorCargoInCanada[this]; if (ev.WasNotificationSent) Registry.CustomsGateway.Notify(ev.Occurred, ev.Ship, ev.Port); }
我在这里遵循的原则是,领域模型应该不知道事件处理中的重播逻辑。它知道如何逆转每个事件的自身状态,但并不关心与外部系统通信的复杂性。
与外部系统通信的逻辑由网关处理,在本例中是一个对象集群。
图 1:一个对象集群构成了处理追溯性的网关。
在这个例子中,CustomsGatewayFront 是一个普通的网关,它将面向领域的接口(由 ICustomsGateway 定义)转换为实际的消息传递基础设施。我们可以安全地忽略它做了什么 - 我们只是假设如果我们调用它上的方法,它会确保海关办公室收到消息。
有趣的事情发生在前面,由 CustomsGatewayBuffer 驱动,它包装了实际的 CustomsGatewayFront,实现了相同的接口,但增加了处理追溯回溯的能力。
由于大部分追溯回溯是通用的,我可以将通用行为放到另一个类 ReplayBuffer 中,这样 Customs Gateway Buffer 只处理与海关案例相关的部分。回溯缓冲区确实需要与事件处理器进行一些通信,以处理回溯/重播过程中的各个阶段。它还需要某种方法来调整最终不匹配的事件,在本例中,我将此任务分配给海关网关缓冲区。在这两种情况下,回溯缓冲区都通过协作接口进行通信,以保持其通用性。
所以这就是主要人物,现在让我们进入行动。我将从我们正在实时运行的正常情况开始。海关网关缓冲区通过创建一个更新事件并将其发送到回溯缓冲区来实现通知操作。
class CustomsGatewayBuffer...
public void Notify (DateTime arrivalDate, Ship ship, Port port) { CustomsNotificationEvent ev = new CustomsNotificationEvent(gateway, arrivalDate, ship, port); buffer.Send(ev); } ICustomsGateway gateway; ReplayBuffer buffer;
class CustomsNotificationEvent...
public CustomsNotificationEvent( ICustomsGateway gateway, DateTime arrivalDate, Ship ship, Port port) { this.gateway = gateway; this.arrivalDate = arrivalDate; this.ship = ship; this.port = port; }
如果回溯缓冲区处于活动状态,它将处理该事件,这将导致对实际网关前端的调用。
class ReplayBuffer...
internal void Send(IGatewayEvent ev) { current.Add(ev); if (isActive) ev.Process(); }
class CustomsNotificationEvent...
public virtual void Process() { gateway.Notify(arrivalDate, ship, port) ; }
(我将在稍后讨论回溯缓冲区如何变得活跃以及当前列表是什么。)
所以如您所见,对于领域事件的活动处理,所有发生的只是网关缓冲区创建一个网关事件,将其发送到回溯缓冲区,回溯缓冲区处理它,这会导致将原始调用应用于实际网关。所有这些都是一种极其复杂的简单委托形式。
这种怪异的回报在于处理追溯事件。回溯缓冲区通过可回溯接口连接到事件处理器。
class ReplayBuffer...
IRewindable eventProcessor;
internal interface IRewindable { event EventProcessor.EventHandler RewindStarted, RewindFinished, ReplayFinished; }
此接口定义了三个重要的事件。事件处理器在其处理各种案例的过程中会发出这些事件。以下是乱序案例
class EventProcessor...
private void ProcessOutOfOrder(DomainEvent e) { RewindStarted(); RewindTo(e); RewindFinished(); BasicProcessEvent(e); ReplayAfter(e); ReplayFinished(); }
对于替换,总体开始和结束是通用的,但每个案例都需要特定的放置来指示回溯何时结束。
class EventProcessor...
private void ProcessReplacement(ReplacementEvent e) { RewindStarted(); if (e.Original.ShouldIgnoreOnReplay) throw new ProcessingException("Cannot replace event twice"); else if (null == e.Replacement) ProcessRejection(e); else if (e.HasPriorReplacement) ProcessPriorReplacement(e); else ProcessPriorOriginal(e); ReplayFinished(); }
private void ProcessRejection(ReplacementEvent e) { RewindTo(e.Original); BasicReverseEvent(e.Original); RewindFinished(); e.Original.Reject(); ReplayAfter(e.Original); }
private void ProcessPriorOriginal(ReplacementEvent e) { RewindTo(e.Original); BasicReverseEvent(e.Original); RewindFinished(); e.Original.Reject(); ReplayBetween(e.Original, e.Replacement); BasicProcessEvent(e.Replacement); ReplayAfter(e.Replacement); }
private void ProcessPriorReplacement(ReplacementEvent e) { RewindTo(e.Replacement); RewindFinished(); e.Original.Reject(); BasicProcessEvent(e.Replacement); ReplayAfter(e.Replacement); }
这些事件为回溯缓冲区提供了正确的时间信息,以正确处理追溯处理。我在这里使用事件,因为任何事件处理器都可能具有任意数量的回溯缓冲区,回溯缓冲区不需要了解这些对象,除了它们可以理解这三个事件之外。
所以现在让我们转到回溯缓冲区,看看它如何处理这些事件。回溯缓冲区是用一堆列表创建的。
class ReplayBuffer...
internal ReplayBuffer (IRewindable processor, IAdjustable adjuster) { this.eventProcessor = processor; this.adjuster = adjuster; SubscribeToProcessorEvents(); sent = new ArrayList(); isActive = true; current = sent; } IList sent,rewound, replayed, current; IAdjustable adjuster; private bool isActive; private void SubscribeToProcessorEvents() { eventProcessor.RewindStarted += new EventProcessor.EventHandler(processor_RewindStarted); eventProcessor.RewindFinished += new EventProcessor.EventHandler(processor_RewindFinished); eventProcessor.ReplayFinished += new EventProcessor.EventHandler(processor_ReplayFinished); }
缓冲区创建后处于激活状态,这意味着它将处理发送到它的任何事件。当前列表变量将设置为三个核心列表之一,具体取决于我们处于回溯的哪个阶段 - 激活时它指向已发送列表。这让我们记录了所有实际发送的事件。它还订阅了事件处理器上的所有相关事件。(暂时不用担心调整器,我们将在校正中讨论它。)
在回溯开始时,缓冲区使自身处于非活动状态以避免将事件传递到真实网关,它还交换当前列表变量以指向一个新的列表,以便它可以捕获所有回溯的事件。
class ReplayBuffer...
private void processor_RewindStarted() { isActive = false; rewound = new ArrayList(); current = rewound; }
通过这个例子,你可能已经猜到回溯停止时会发生什么,但以防万一……
class ReplayBuffer...
private void processor_RewindFinished() { replayed = new ArrayList(); current = replayed; }
所以现在很清楚,最后一个事件将是最有趣的,因为我们必须在这里计算不匹配列表。
class ReplayBuffer...
private void processor_ReplayFinished() { current = sent; DetermineChange(); isActive = true; rewound = null; replayed = null; } private void DetermineChange() { IList matchingEvents = new ArrayList(); foreach (IGatewayEvent ev in replayed) { if (rewound.Contains(ev)) matchingEvents.Add(ev); } foreach (IGatewayEvent ev in matchingEvents) { replayed.Remove(ev); rewound.Remove(ev); } adjuster.Adjust(rewound, replayed); }
算法实际上很简单,我们只需要找到匹配的事件并将它们从两个列表中删除。完成后,我们有两个不匹配事件列表,我们将它们传递给调整器处理。
类 CustomsGatewayBuffer…
public void Adjust(IList oldEvents, IList newEvents) { foreach (CustomsNotificationEvent e in oldEvents) new CustomsCancellationEvent(e).Process(); foreach (CustomsNotificationEvent e in newEvents) e.Process(); } class CustomsCancellationEvent { private CustomsNotificationEvent original; public CustomsCancellationEvent(CustomsNotificationEvent original) { this.original = original; } public void Process() { original.Gateway.Cancel(original.ArrivalDate, original.Ship, original.Port); } }
在这种情况下,我让海关网关缓冲区处理调整,方法是简单地为所有最终回溯的事件发送取消通知,并为新事件发送新的通知。在我的幻想世界中,政府机构非常乐于助人。在实践中,每个调整都需要单独考虑,它们很容易变得非常复杂,需要人工干预。但是,事件列表应该极大地帮助理清外部系统需要做什么。