并行模型
允许应用程序状态的另一种表示,无论是在不同的时间还是在假设状态下。
2005 年 12 月 12 日
这是我在 2000 年代中期进行的《进一步的企业应用程序架构开发》写作的一部分。不幸的是,此后太多其他事情吸引了我的注意力,所以我没有时间进一步研究它们,而且在可预见的未来我也没有看到太多时间。因此,这些材料非常草稿形式,我不会进行任何更正或更新,直到我有时间再次处理它们。
企业应用程序是捕获有关世界状态数据的 信息系统。在许多情况下,当我们使用信息系统时,我们希望它们反映我们对该世界的当前最佳理解。有时我们也可能想知道过去的世界,或者探索未来可能发生的事情的后果。
大多数情况下,我们通过在当前世界状态中构建设施来捕获有关这些替代状态的数据来处理这些替代世界状态。客户过去地址的 时间属性 就是一个很好的例子。
但是,有时我们想要更全面的东西,一些真正能捕捉到过去或想象状态下世界所有细微差别的东西。一个 并行模型 通过允许我们轻松地获取整个信息存储并将其表示为过去或在某些替代过去、现在或未来中的状态来实现这一点。
工作原理
当我们谈论一个能够查询和操作任何过去状态(或任何假设的过去、现在或未来状态)的信息系统时,它听起来很像科幻小说。然而,任何半严肃的软件开发项目在使用版本控制系统时都会一直这样做。我们可以重新创建代码的任何过去状态,我们可以创建替代现实(通过分支),事实上,我们可以同时保留多个替代现实(例如,多个活动代码行)。
版本控制系统处理这种超级精神分裂思维能力的核心是它们使用 事件溯源。对代码的每一次更改都会被捕获并存储。如果您在星期五要求查看上周三的代码状态,版本控制系统从概念上讲将从无开始,并应用发生在周三之前的所有代码更改事件,然后为您提供结果。我说“从概念上讲”,因为它可能不会完全那样做,但它确实做出了与之可观察地等效的事情。
思考这个问题的方式是,应用程序的当前状态是将空白初始状态应用事件的结果。每个事件序列都会导致不同的状态。周三早上的状态是应用周三之前发生的所有事件的结果,现在状态是应用所有曾经发生过的事件的结果。(如果您在周三早上阅读本文,您将不得不放下书几个小时,以便最后一句话有意义。)
将状态视为应用事件的结果为处理假设现实的方法奠定了基础。你想知道如果德里克上周三没有进行那次愚蠢的检查会发生什么,你可以通过只播放除那次检查之外的所有事件来做到这一点(在某种程度上)。这为您提供了一个替代现实(或者可能是真实的现实,当我们思考这些事情时,我们离《银河系漫游指南》危险地近了)。
我们很正常地认为将 并行模型 应用于我们的代码,我们也可以将其应用于我们用代码编写的东西。如果我们在管理供应链的应用程序上使用 事件溯源,我们可以通过创建一大堆采购事件并将它们引入系统的安全副本,看看会发生什么,来探索疯狂的圣诞节抢购对不粘胶的影响。我们还可以通过为上周创建一个 并行模型 来查看上周那批无声唱片在哪里。
要构建一个 并行模型,首先要检查的是您是否拥有一个使用 事件溯源 的系统。如果该系统是在考虑 事件溯源 的情况下设计的,那是最好的。如果没有,可能有一些方法可以从日志中重建事件。
接下来,您需要一种方法来处理与通常现实分离的模型中的事件。大体上来说,有两种方法可以做到这一点。一种是构建一个 **并行系统**,它是一个具有自己的数据库和环境的应用程序的单独副本。另一种是允许单个系统能够在 **嵌入式并行模型** 之间切换。
并行系统
并行系统的一大优势是,您无需对应用程序本身进行任何操作即可构建并行模型。您获取应用程序的副本,将其对资源(例如持久数据存储)的使用切换为使用副本。然后,您向它提供需要提供的事件并运行它。
并行系统的一个问题是您如何处理结果。由于应用程序不知道并行性,因此它无法操作或显示来自多个并行模型的结果。要将并行模型处理的结果绑定到更广泛的系统中,您需要使用集成技术,有效地将并行系统视为一个完全独立的应用程序。
使用并行系统的好处之一是,您还可以更改并行系统的源代码。对于嵌入到应用程序中的并行模型,任何行为更改都限于内置于正在并行化的模型中的行为。虽然如果您使用自适应对象模型,这可能是相当大的,但还是有限制的。使用并行系统,您可以对应用程序的源代码进行任何更改,并查看这对事件流的影响。
嵌入式并行模型
如果您想将多个 并行模型 嵌入到应用程序中,您需要确保任何持久存储可以轻松地在多个实际数据源之间切换。一个好方法是使用 存储库 并提供一种机制,使应用程序能够在运行时切换 存储库。临时 存储库 不需要持久化 - 由于 并行模型 往往是临时的,因此它们通常可以在内存中组装。一旦您处理完事件并从 并行模型 中获得了所需的信息,就可以丢弃存储库。
将系统切换为使用临时存储库意味着您必须首先创建此存储库,对其进行初始化(通常使用模式和任何不可变的参考数据),然后将实时存储库替换为临时存储库,在任何常规应用程序代码看到它的地方。这就是使用存储库的单个参考点如此重要的原因。
使用这种存储库的一个优点是它可以快得多,因为您正在内存中处理所有内容。
无论您以哪种方式进行并行处理,都会有一种效果,即并行的模型不知道替代现实的存在,但有一个外部系统知道这些平行世界。对于嵌入式模型,应用程序本身被划分为并行感知和不感知部分,对于并行系统,一些系统不感知,而其他一些系统可以感知。您也可以有多个级别的并行处理和合并。您可能需要一杯非常浓的茶来处理它们。
处理事件
要构建您的并行模型,您需要决定哪些事件需要处理,这取决于 并行模型 的性质。如果它是历史状态,那么您需要所有事件,直到那时为止。这可以通过查询主系统事件日志来完成。事件日志的好处是它们是一系列不可变的对象,因此复制很容易(尽管可能很慢)。
如果它是假设状态,那么您需要一种方法来定义并将替代事件注入到您想要处理的事件流中。
然后,您以与处理实时系统事件完全相同的方式处理选定的事件。如果您安排得当,此时系统不应该有任何更改 - 您只需运行常规内容。最后,您将获得应用程序状态,您可以使用常规机制再次查询它。
使用 并行模型 的一个难题是,很容易陷入身份危机。通常,当您拥有一个模型时,您在模型中的实体与持久存储和现实世界之间存在明确的对应关系。您模型中的 Gregor 对象对应于您现实中的对象,并且您只有一个。当您拥有一个 并行模型 时,您可能会遇到有多个 Gregor 的情况,每个 并行模型 都有一个:既可怕又令人困惑。因此,您必须非常小心地处理您在处理 并行模型 时持有的对象,特别是如果您有嵌入式 并行模型。最不令人困惑的做法是在切换存储库时丢弃任何对象。另一种选择是让每个实体都保留对它来自的存储库的引用。虽然并行系统在单个系统内避免了这个问题,但如果您将并行运行的结果组合在一起,您可能会遇到这个问题。
优化
从创建 并行模型 的概念概述中,您可以看到创建它们在计算上非常昂贵。处理自有史以来的所有事件可能需要一段时间,特别是如果有很多事件的话。版本控制系统已经处理了这个问题一段时间,并提出了一些方法来减轻这种负担。
一种方法是,您实际上不必从有史以来的开始开始。您可以在不同时间点对应用程序状态进行快照,并将最新的合理快照用作 并行模型 的起点。最新的合理快照是在您在事件流中执行不同操作之前的最后一个快照。对于历史 并行模型,这将是 并行模型 日期之前的最后一个快照。对于假设的 并行模型,这将是第一个变体事件之前的最后一个快照。
所以让我们举一些例子。我现在是 11 月 17 日,我想要一个 9 月 12 日的 并行模型,并且我在每个月的开始进行快照。我可以从 9 月 1 日的快照开始,处理从 9 月 1 日到 9 月 12 日发生的每个事件。
我现在是 11 月 17 日,我想探索下周的销售高峰。在这种情况下,我可以从我当前的应用程序状态创建一个快照,并添加我的假设事件。
我现在是 11 月 17 日,我想探索如果我在 10 月份大幅减少销售额会发生什么。我从 10 月 1 日的快照开始,处理我的变体事件。
另一种优化是向后和向前工作。为了使这能够工作,我需要我的事件是可逆的。但是有了这个,我可以通过从 10 月 1 日的快照开始,并将事件从 10 月 1 日反向到 9 月 27 日来创建一个 9 月 27 日的历史 并行模型。通常这会更快,因为事件更少。
当您开始考虑处理更少的事件时,自然会想到选择性重放。如果您想查看订单的过去状态,您可以忽略对其他人订单起作用的事件,前提是这些事件对您的订单没有影响。使用选择性重放可以显著减少您需要处理的事件数量,但困难的是确保事件之间没有微妙的交互。不处理另一个订单的履行可能意味着系统错误地认为运输上有空间,这会完全弄乱您的订单历史记录。系统越复杂,发现这些复杂交互就越难。您可以将选择性重放与正向和反向处理一起使用,两种情况下的优势和风险都相同。
了解最常见的查询是什么是个好主意。Subversion 版本控制系统知道大多数请求都是针对代码的最新版本,因此它将该版本存储为快照,并使用从该版本开始的反向事件来确定过去的状态(它也会不时使用其他快照)。
所有这些优化的优点是,它们应该完全对系统用户隐藏。您可以始终考虑从时间开始向前创建并行模型。您也可以从这个简单的实现开始,然后添加快照。(可逆事件在以后添加起来比较棘手,并且可能会影响模型。)
这提供了一种测试方法,您可以随机生成事件序列,并使用未优化的方式以及各种优化的方式构建并行模型。并行模型中应用程序状态的任何差异都表明存在故障,您可以进一步调查。
外部查询
事件溯源的一个问题是,我们必须记住外部查询的结果。对于并行模型,在考虑假设场景时,您会遇到另一个问题。如果我们正在处理一个没有发生的事件,我们将没有该事件的外部查询结果的记录。
在这种情况下,我们需要修改网关,以便它们可以返回我们认为合理的作为外部系统响应的数据。如果我们用假设的记住的查询对网关进行编程,我们可以将这些查询作为设置假设场景的一部分捕获。
何时使用它
并行模型是处理历史状态和替代状态的一种方法。另一种选择是在模型本身中嵌入此信息,使用诸如时间属性、时间对象和有效性之类的模式。
使用并行模型的一个巨大优势是,它消除了这些模式在模型中的复杂性。您可以专注于使模型成为一个简单的快照模型,并完全忽略这些多次和视角。另一个优势是,将这些结构放在模型中的每个地方都是一项艰苦的工作,并且每个部分都会增加复杂性。因此,您必须选择将它们放在哪里。如果您忽略了将它们放在某个地方,您就陷入了困境。您要么必须在以后添加它们,要么可能没有机会,因为数据永远丢失了。使用并行模型,您可以在任何地方获得时间行为。您只需支付一次使用并行模型的成本,整个模型就会获得好处。
这些优势也伴随着缺点。第一个是需要事件溯源,这会对您的模型施加自己的约束和复杂性。第二个问题是处理时间。每个并行模型的查询都需要事件处理。您只能对快照和其他优化做这么多。
与大多数模式一样,这不是一个完全非此即彼的情况,因为完全有可能使用并行模型作为您的通用机制,但在某些地方使用时间属性来处理某些常见请求。您也不需要从一开始就拥有并行模型,尽管您确实需要拥有事件溯源。但是,如果您使用事件溯源构建系统,您可以在以后需要的地方轻松添加并行模型。
示例:运输时间查询(C#)
我将此示例基于我为事件溯源开发的关于运输、货物和港口的简单示例。因此,在深入研究此示例之前,您应该熟悉该示例。域逻辑代码几乎相同。
图 1:我们运输示例的域对象。
由于并行模型的大部分复杂性都与处理临时并行模型和活动数据库有关,因此我在此示例中引入了数据库。我使用流行的 NHibernate 对象关系映射器来映射到数据库。我不会详细介绍映射,这并不十分有趣。相反,我将重点介绍它如何被存储库包装,以及如何在它和并行模型的存储库之间切换。
与基本示例一样,域模型的所有更改都由事件处理。当船舶抵达港口时,这将通过到达事件记录下来。
class ArrivalEvent...
string _port; int _ship; internal ArrivalEvent (DateTime occurred, string port, int ship) : base (occurred) { this._port = port; this._ship = ship; } internal Port Port {get {return Port.Find(_port);}} internal Ship Ship {get {return Ship.Find(_ship);}} internal override void Process() { Ship.HandleArrival(this); }
与基本示例不同的是,在这种情况下,港口和船舶由简单的值作为标识符表示 - 在这种情况下对应于数据库中的主键,尽管任何键都可以。但是,属性使用实际的域对象。为了确保我们始终获得正确的域对象,我们使用在域对象类上定义的查找方法从数据库中提取它们。
class Port...
public static Port Find(string s) { return (Port) Registry.DB.Find(typeof (Port), s); }
查找方法反过来委托给存储库。在我们的基本情况下,此存储库是使用 NHibernate 映射域对象的数据库。因此,存储库代码使用 NHibernate API 从数据库(或 NHibernate 的缓存)中提取对象。
class DB...
public object Find(Type type, object key) { return _hibernateSession.Load(type, key); }
NHibernate 与大多数基于数据映射器的数据源一样,使用工作单元来跟踪它操作的对象。因此,您永远不会告诉对象将自己保存到数据库,而是提交工作单元,它会找出哪些对象在内存中发生了更改以及如何将它们写入。
对于此示例,我将在事件处理器中进行此事务包装,事件处理器的处理器现在看起来像这样。
class EventProcessor...
public void Process(DomainEvent e) { Registry.DB.BeginTransaction(); try { e.Process(); InsertToLog(e); Registry.DB.Commit(); } catch (Exception ex) { Registry.DB.Rollback(); LogFailedEvent(e, ex); } }
这绝不是将工作单元包装的最佳位置,这将取决于应用程序的性质。但这对于我的示例来说已经足够了,并且也提出了问题 - 我们如何轻松地切换出持久存储库来处理临时查询?
让我们通过一个测试用例来了解这一点
class Tester...
[Test] public void TemporalQueryForShipsLocation() { eProc.Process(new ArrivalEvent(new DateTime(2005,11,2), la, kr)); eProc.Process(new DepartureEvent(new DateTime(2005,11,5), la, kr )); eProc.Process(new ArrivalEvent(new DateTime(2005,11,6), sfo, kr)); Assert.AreEqual(sfo, Ship.Find(kr).Port.Code); eProc.SetToEnd(new DateTime(2005,11,2)); Assert.AreEqual(la, Ship.Find(kr).Port.Code); }
这里至关重要的方法是SetToEnd
,它将我们的存储库更改为使用内存中存储库,并重新处理事件日志,以便事件播放到该天的最后一个事件。这为 11 月 2 日创建了我们的并行模型。
class EventProcessor...
IList log; public void SetToEnd(DateTime date) { SynchronizeLog(); IRepository temporalRepository = new MemoryDB(); Registry.enableAlternateRepository(temporalRepository); foreach (DomainEvent e in log) { if (e.Occurred > date) return; e.Process(); } }
为了运行时间查询,处理器完全脱离数据库。在这种情况下,处理器保留事件日志的副本。在脱离数据库之前,它会将其日志与持久日志同步,以便它拥有写入数据库的所有事件的完整记录。
日志同步后,我们创建一个完全在内存中的新存储库。这可以由嵌入式内存数据库支持,这将允许您继续使用 SQL。它也可以是满足存储库接口的手写内容。由于这是一个简单的示例,我只使用了一堆哈希表。
class MemoryDB...
public object Find(Type type, object key) { object result = this[type][key]; if (null == result) throw new ApplicationException ("unable to find: " + key.ToString()); return result; } private IDictionary store = new Hashtable(); private IDictionary this[Type index] {get {return (IDictionary)store[index];}}
内存中存储库需要初始化为与数据库在处理任何事件之前相同的初始状态 - 在这种情况下,保存港口和船舶的参考数据。
class MemoryDB...
public void Initialize() { store[typeof(Ship)] = new Hashtable(); store[typeof(Cargo)] = new Hashtable(); store[typeof(Port)] = new Hashtable(); Insert(new Port("SFO", "San Francisco", "US")); Insert(new Port("LAX", "Los Angeles", "US")); Insert(new Port("YVR", "Vancouver", "CA")); Insert (new Port("XXX", "out to sea", "XX")); Insert(new Ship("King Roy", 1)); }
设置好临时存储库后,我们告诉注册表开始使用它而不是真正的存储库。
class Registry...
internal static void enableAlternateRepository(IRepository arg) { instance._repository = arg; arg.Initialize(); }
现在,域模型中对注册表的任何调用都将使用内存中注册表。由于此设计将 NHibernate 放在常规数据库存储库中,这意味着我们根本不会在并行模型中使用 hibernate。对象出现在内存中,并由存储库保留在那里。
设置好内存中存储库后,我们从日志中按顺序处理发生在我们目标日期或之前的所有事件。完成后,我们拥有代表世界在请求日期结束时的状态的内存中存储库。我们现在对域对象执行的任何查询都将反映该日期。
要返回到正确的数据库,我们只需将临时内存中存储库换回常规存储库连接。
class Registry...
internal static void restoreBaseRepository() { instance._repository = instance._baseRepository; }
示例:将并行模型与基本模型进行比较(C#)
对于并行模型的许多用途,我们一次只使用一个模型。一般处理(包括更新)使用基本模型完成,我们构建并行模型用于历史和假设查询。在任何时候,我们都只有一个并行模型在起作用。这种方案相对简单,避免了困扰许多科幻情节的跨平行宇宙身份问题。
但是,有时混合使用两者是有意义的。如果您还记得运输示例中乏味的幽默,您就会知道图书发行商对加拿大的致命恐惧以及用“eh”污染文本的风险。让我们假设,我们不时会遇到特别恶性的“eh”传染病。任何在具有这种传染病的港口的船舶中的货物都应被标记为高风险。当然,我们不会在当天发现这些事情,因此我们必须弄清楚当天港口里有什么。
这是一个表达这个问题的测试用例。
class Tester...
[Test] public void HighRiskDayFlagsAllCargo() { eProc.Process(new RegisterCargoEvent(new DateTime(2005,1,1), "UML Distilled", "UML", "LAX" )); eProc.Process(new RegisterCargoEvent(new DateTime(2005,1,1), "Planning XP", "PXP", "LAX" )); eProc.Process(new RegisterCargoEvent(new DateTime(2005,1,1), "Analysis Patterns", "AP", "LAX" )); eProc.Process(new RegisterCargoEvent(new DateTime(2005,1,1), "P of EAA", "eaa", "LAX" )); eProc.Process(new ArrivalEvent(new DateTime(2005,11,2), la, kr)); eProc.Process(new LoadEvent(new DateTime(2005,5,11),"PXP", 1)); eProc.Process(new LoadEvent(new DateTime(2005,5,11),"AP", 1)); eProc.Process(new ArrivalEvent(new DateTime(2005,11,9), yvr, kr)); eProc.Process(new ArrivalEvent(new DateTime(2005,11,12), la, kr)); eProc.Process(new ContagionEvent(new DateTime(2005,11,10), yvr)); Assert.IsTrue(Cargo.Find("PXP").IsHighRisk, "PXP should be high risk"); Assert.IsTrue(Cargo.Find("AP").IsHighRisk, "AP should be high risk"); Assert.IsFalse(Cargo.Find("UML").IsHighRisk, "UML should NOT be high risk"); Assert.IsFalse(Cargo.Find("eaa").IsHighRisk, "UML should NOT be high risk"); Assert.IsFalse(Cargo.Find(refact).IsHighRisk, "UML should NOT be high risk"); }
我用一个新的事件来描述传染病。
class ContagionEvent...
internal class ContagionEvent : DomainEvent { string _portID; public ContagionEvent(DateTime occurred, string port) : base(occurred) { this._portID = port; } Port Port {get {return Port.Find(_portID);}}
internal override void Process() { Registry.EventProcessor.SetToEnd(Occurred); ArrayList infectedCargos = new ArrayList(); foreach (Ship s in Port.Ships) infectedCargos.AddRange(s.Cargos); Registry.restoreBaseRepository(); foreach (Cargo c in infectedCargos) { Cargo actualCargo = Cargo.Find(c.RegistrationCode); actualCargo.IsHighRisk = true; } }
您会注意到,在这种情况下,我在事件的 process 方法中实际上有相当多的行为。这样做的原因是我决定让域模型不知道并行模型。因此,事件必须为传染病的日期创建临时并行模型,运行查询以找出哪些货物受到影响,将世界恢复到基本模型,并将更新传递出去。域模型仍然在每个并行模型中执行逻辑,尽管它并不多。
另一种方法(我会认真考虑)是为将货物标记为高风险创建新的事件。在这种情况下,传染病事件将在临时并行模型中找到受影响的货物,然后创建一个事件将这些货物标记为高风险。第二个事件将在基本状态下运行。当我写下这些内容时,我承认不确定我更喜欢哪种方法。