协议调度器

构建一个围绕业务协议处理领域事件的处理器。

2005年3月11日

这是我在2000年代中期进行的企业应用架构进一步开发写作的一部分。遗憾的是,此后太多其他事情吸引了我的注意力,所以我没有时间进一步研究它们,而且在可预见的未来我也没有看到太多时间。因此,这些材料还处于草稿阶段,在我能够抽出时间再次进行处理之前,我不会进行任何更正或更新。

在一个由领域事件驱动的系统中,能够轻松地查找和更改对事件做出反应的处理规则非常重要。由于事件通常比发生时间晚出现,因此您还必须保留使用旧规则处理旧事件的能力。

一个协议调度器主要围绕业务协议构建事件处理器,因为协议是组织对事件响应变化的常见形式。通过使用时间属性按事件类型组织处理器,您可以以受控的方式处理更改的事件规则。

工作原理

一个协议调度器是一种模块化领域事件处理器的机制,使主模块与业务协议保持一致。我使用“协议”而不是“合同”这个词,部分原因是协议通常不是正式的合同,部分原因是为了避免与设计契约中的软件合同概念发生冲突。

我对协议调度器的讨论遵循委托调度原则。您将每个协议建模为一个对象,该对象具有处理事件的方法。实际的处理代码放置在单独的处理器对象中,这些对象以结构化的方式加载到协议中。协议的基本行为是找到正确的处理器,然后使用它来处理事件。

这导致了关于正确结构的问题。当然,这在一定程度上取决于您的问题域,但我已经多次看到一种结构,它工作得很好,那就是按事件类型和时间结构处理器。

(如果您不习惯时间属性,这可能看起来很棘手。本质上,将其视为协议包含一个哈希映射,其键是事件类型,整个值是处理器的时态集合。时态集合是一个特殊的集合,它是时间属性的实现。)

在这里使用时间属性的价值在于它允许您存储随时间变化的处理规则,并且仍然能够使用旧规则处理事件。

一家航运公司决定在3月14日将其收费从10美元提高到15美元。在3月22日,它收到一个事件,说一批货物是在3月7日运出的。重要的是,它使用在3月7日有效的规则(10美元的费用)来处理此事件,而不是在处理事件当天有效的规则。

继承是构建协议的自然概念。不要在编程语言中使用此继承,而是为协议提供一个父属性。设置处理代码,以便如果子协议上没有规则,它会查找父协议上的规则。

何时使用它

协议调度器有两个主要优势:以协议为中心的性质以及很好地处理时间规则的能力。

使用协议作为调度委托的第一步,可以更容易地处理处理事件的逻辑根据生效的业务协议而变化的情况——这是一种常见的情况。

调度到组织在时间属性中的规则很有用,因为它提供了一个良好的结构来应对规则的变化,同时允许使用旧规则处理旧事件。这是在缺乏该功能的系统中导致混乱的条件日期逻辑的常见原因。

我在这里描述了这种模式,因为我已经看到它在许多情况下都运行良好。但是,让我感到不舒服的一件事是,我没有看到太多关于变体或替代方案的信息。执行此类事件处理器的最常见方法是几乎没有结构,我还没有看到有良好替代结构的案例。

但是,执行此操作的两个主要原因表明了替代变体。如果您的业务处理的主要变化点不是协议,那么将该替代方案用作调度器的中心点。将处理器放在时间集合中以允许在不同时间执行规则的想法可以在其他上下文中使用。

协议调度器自适应对象模型的一个例子,因此带来了它的优点和缺点。它确实很好地处理了复杂性(至少在其可变性的粒度内)。它可以使熟悉该模型的人员变得非常高效。但是,它也很难学习,对于新手来说可能非常令人生畏。

示例:公用事业计费(Java)

我多次遇到的一种特定模式组合是领域事件协议调度器会计分录。在这种组合中,领域事件由一个协议调度器处理,该调度器创建会计分录。这种组合特别有效,因为协议调度器创建会计分录很容易调整——允许您使用通用调整策略来处理错误信息的调整。

为了说明这一点,我将使用一个公用事业计费示例。计费系统接收各种领域事件,并将它们处理成客户帐户上的会计分录。该模型描述起来相当复杂,因此我将分阶段进行。

  • 首先,我将展示框架类的结构
  • 然后,我将展示如何将框架类连接起来

框架结构

图 2显示了涉及的基本类。以下是此数据结构在代码中的外观。

图 2:公用事业示例的类

会计事件是领域事件的一种用法,在这里。它的数据很简单。

class AccountingEvent...

  private EventType type;
  private MfDate whenOccurred;
  private MfDate whenNoticed;
  private Subject subject; 

主题是一个接口,它定义了事件处理器所需的函数。我们唯一感兴趣的实现是 Customer。客户有两个我们感兴趣的东西,一个服务协议和一组帐户,这些帐户将保存我们事件处理的结果。

class Customer implements Subject

  private ServiceAgreement serviceAgreement;
private Map<AccountType, Account> accounts, savedRealAccounts;
public Customer(String name) {
    _name = name;
    setUpAccounts();
}
void setUpAccounts() {
    accounts = new HashMap<AccountType, Account>();
    for (AccountType type : AccountType.values())
        accounts.put(type, new Account(Currency.USD, type));
}
public Account accountFor(AccountType type) {
    assert accounts.containsKey(type);
    return accounts.get(type);
}
public void addEntry(Entry arg, AccountType type) {
    accountFor(type).post(arg);
}
public Money balanceFor(AccountType key) {
    return accountFor(key).balance();
}

帐户存储在 Map 中,每个帐户类型一个。

服务协议中的数据结构稍微复杂一些。本质上,它包含一个 Map,其中键是事件类型,值是时态集合的记账规则。我动态设置了这种结构。

class ServiceAgreement...

  private Map postingRules = new HashMap();
  public void addPostingRule(EventType eventType, PostingRule rule, MfDate date) {
      if (postingRules.get(eventType) == null)
          postingRules.put(eventType, new SingleTemporalCollection());
      getRulesTemporalCollectionFor(eventType).put(date, rule);
  }
  private TemporalCollection getRulesTemporalCollectionFor(EventType eventType) {
      TemporalCollection result = (TemporalCollection) postingRules.get(eventType);
      assert result != null;
      return result;
  }

时态集合类是我在那里讨论的时间属性的实现。

public interface TemporalCollection {
    //get and put at a supplied date
  Object get(MfDate when);
  void put(MfDate at, Object item);
  Object get(int year, int month, int date);
    //get and put at today's date
  Object get();
  void put(Object item);
}

记账规则是简单的处理器,它们了解将金额记账到某个帐户。我通过指示它们应该记账到哪个帐户以及它们是否应税来设置它们。

class PostingRule...

  private AccountType type;
  private boolean isTaxable;
  protected PostingRule(AccountType type, boolean isTaxable) {
      this.type = type;
      this.isTaxable = isTaxable;
  }

记账规则是专门的代码块,它们计算要收取的金额,然后将其记账到给定的帐户。

class PostingRule...

  public void process(AccountingEvent evt) {
      makeEntry(evt, calculateAmount(evt));
      if (isTaxable) generateTax(evt);
  }
  abstract protected Money calculateAmount(AccountingEvent evt);
  private void makeEntry(AccountingEvent evt, Money amount) {
      Entry newEntry = new Entry(amount, evt.getWhenNoticed());
      evt.getSubject().addEntry(newEntry, type);
      evt.addResultingEntry(newEntry);
  }

计算此金额的方法有很多,这些方法留给子类处理。(我还有一种处理税收的机制,但我将推迟讨论,直到以后。)

添加用于用量的记账规则

现在让我们看看如何将单个记账规则放入此结构中。

在标准协议中,当我们收到一个事件,表明客户使用了一些电力时,我们会收取一个费用,该费用是客户服务协议中定义的用量和费率的倍数。

为了实施这个简单的协议,我们需要创建一个服务协议,其中包含一个记账规则来处理提供有关电力用量信息的会计事件。

这是用量事件。

class Usage...

  private Quantity amount;
  public Usage(Quantity amount, MfDate whenOccurred, MfDate whenNoticed, Customer customer) {
      super(EventType.USAGE, whenOccurred, whenNoticed, customer);
      this.amount = amount;
  }
  public Quantity getAmount() {
      return amount;
  }

为了能够计算用量费用,我们需要一个记账规则,该规则可以将用量乘以费率。为此,我们将创建一个记账规则的子类。

class MultiplyByRatePR...

  public class MultiplyByRatePR extends PostingRule {
      public MultiplyByRatePR(AccountType type, boolean isTaxable) {
          super(type, isTaxable);
      }
      protected Money calculateAmount(AccountingEvent evt) {
          Usage usageEvent = (Usage) evt;
          return Money.dollars(usageEvent.getAmount().getAmount() * usageEvent.getRate());
      }
  }

class Usage...

  double getRate() {
      return ((Customer) getSubject()).getServiceAgreement().getRate(getWhenOccurred());
  }

为了完成练习,我们将新的记账规则添加到服务协议中

class ExampleTester…

  private ServiceAgreement simpleAgreement() {
      ServiceAgreement result = new ServiceAgreement();
      result.setRate(10, MfDate.PAST);
      result.addPostingRule(EventType.USAGE,
              new MultiplyByRatePR(AccountType.BASE_USAGE, false),
              new MfDate(1999, 10, 1));
      return result;
  }

图 3:类图,显示了我们简单示例所需的额外类。

图 4:对象图,显示了记账规则如何连接到协议。

框架如何执行

这就是一个简单协议的连接方式,现在我们将看看它是如何执行的。

在开始时,事件数据将来自某个外部来源,我将方便地忽略它。相反,我假设读者将为我实例化一个用量事件对象,并将其放在一个事件列表中。然后,我们可以处理事件列表以启动事件。我可以在一个 JUnit 测试用例中捕获这一点。

class ExampleTester…

  public void testSimpleRule() {
      Customer mycroft = new Customer ("Mycroft Homes");
      mycroft.setServiceAgreement(simpleAgreement());
      AccountingEvent usageEvent = new Usage(Unit.KWH.amount(50),
              new MfDate(1999, 10, 1),
              new MfDate(1999, 10, 15),
              mycroft);
      EventList eventList = new EventList();
      eventList.add(usageEvent);
      eventList.process();
      assertEquals(Money.dollars(500), mycroft.balanceFor(AccountType.BASE_USAGE));
      assertEquals(Money.dollars(0), mycroft.balanceFor(AccountType.SERVICE));
      assertEquals(Money.dollars(0), mycroft.balanceFor(AccountType.TAX));
  }

重述代码的内容:我创建一个示例客户,并为其分配我上面创建的标准协议。然后,我为 Mycroft 创建一个用量事件,将其添加到我的事件列表中,并处理事件列表。然后,产生的费用将在 Mycroft 的帐户中可见。

执行所有这些操作是一个委托练习;如 图 5 所示,这是一个相当长的链。我从事件列表开始,它所做的只是处理其未处理的事件。

class EventList...

  public void process() {
      for (AccountingEvent event : unprocessedEvents()) {
          try {
              event.process();
          } catch (Exception e) {
              if (shouldOnlyLogProcessingErrors) logProcessingError (event, e);
              else throw new RuntimeException(e);
          }
      }
  }

如果任何事件无法处理,我会记录并继续。

对于此示例,我让事件类成为事件处理器。虽然最初将事件处理器视为一个单独的类是有意义的,但从根本上说,处理器对事件进行了很多亲密的处理——因此,遵循信息专家原则,将处理行为放在事件本身是有意义的。

class AccountingEvent...

  public void process() {
      assert !isProcessed;
      if (adjustedEvent != null) adjustedEvent.reverse();
      subject.process(this);
      markProcessed();
  }

现在忽略关于调整的行,我将在后面详细讨论。

本质上,事件所做的只是委托给它的主题,该主题委托给它的服务协议,因为多个客户共享同一个服务协议。

class Customer...

  public void process(AccountingEvent e) {
      serviceAgreement.process(e);
  }

服务协议也希望只委托,这次委托给记账规则。但是,这次有点复杂,因为服务协议首先必须确定使用哪个规则。

class ServiceAgreement...

  public void process(AccountingEvent e) {
      getPostingRule(e).process(e);
  }
  private PostingRule getPostingRule(AccountingEvent event) {
      final TemporalCollection rules = getRulesTemporalCollectionFor(event.getEventType());
      if (rules == null) throw new MissingPostingRuleException(this, event);
      try {
          return (PostingRule) rules.get(event.getWhenOccurred());
      } catch(IllegalArgumentException e) {
          throw new MissingPostingRuleException(this, event);
      }
  }

服务协议根据事件类型和事件的实际时间查找记账规则。

处理更多情况

有了这个基本框架,我们就可以开始扩展它来处理更多类型的事件。

服务电话有基于费用的费用。在这个标准协议中,我们收取费用加上费用的 10% 以及 10 美元的处理费。

要添加此额外规则,我们只需在协议中添加另一个记账规则,生成的协议设置如下所示。

class ExampleTester…

  private ServiceAgreement simpleAgreement2() {
      ServiceAgreement result = new ServiceAgreement();
      result.setRate(10, MfDate.PAST);
      result.addPostingRule(EventType.USAGE,
              new MultiplyByRatePR(AccountType.BASE_USAGE, false),
              new MfDate(1999, 10, 1));
      result.addPostingRule(EventType.SERVICE_CALL,
              new AmountFormulaPR(1.1, Money.dollars(10), AccountType.SERVICE, false),
              new MfDate(1999, 10, 1));
      return result;
 }
public class AmountFormulaPR extends PostingRule {
    private double multiplier;
    private Money fixedFee;
    public AmountFormulaPR(double multiplier, Money fixedFee, AccountType type, boolean isTaxable) {
        super(type, isTaxable);
        this.multiplier = multiplier;
        this.fixedFee = fixedFee;
    }
    protected Money calculateAmount(AccountingEvent evt) {
        Money eventAmount = ((MonetaryEvent) evt).getAmount();
        return (Money) eventAmount.multiply(multiplier).add(fixedFee);
    }
}

我们需要一种新的记账规则来支持一个简单的公式——在本例中,只需将金额乘以一个乘数并添加一个常数。

另一个变化领域是时间。

12 月 1 日,服务电话的处理费上涨至 15 美元

要添加此内容,我们添加另一个记账规则,该规则具有相同的事件类型,但日期不同。因此,现在我们的协议定义如下所示。

class ExampleTester…

  private ServiceAgreement simpleAgreement3() {
      ServiceAgreement result = new ServiceAgreement();
      result.setRate(10, MfDate.PAST);
      result.addPostingRule(EventType.USAGE,
              new MultiplyByRatePR(AccountType.BASE_USAGE, false),
              new MfDate(1999, 10, 1));
      result.addPostingRule(EventType.SERVICE_CALL,
              new AmountFormulaPR(1.1, Money.dollars(10), AccountType.SERVICE, false),
              new MfDate(1999, 10, 1));
      result.addPostingRule(EventType.SERVICE_CALL,
              new AmountFormulaPR(1.1, Money.dollars(15), AccountType.SERVICE, false),
              new MfDate(1999, 12, 1));
      return result;
 }

有了这个,12 月之前的服务电话会产生较低的费用,而 12 月之后的电话会产生较高的费用。

class ExampleTester…

  public void testSimpleRule3() {
      Customer mycroft = new Customer("Mycroft Homes");
      mycroft.setServiceAgreement(simpleAgreement3());
      EventList eventList = new EventList();

      //service call before rule change
      AccountingEvent call1 = new MonetaryEvent(Money.dollars(100),
              EventType.SERVICE_CALL,
              new MfDate(1999, 10, 1),
              new MfDate(1999, 10, 15),
              mycroft);
      eventList.add(call1);
      eventList.process();
      assertEquals(Money.dollars(0), mycroft.balanceFor(AccountType.BASE_USAGE));
      assertEquals(Money.dollars(120), mycroft.balanceFor(AccountType.SERVICE));
      assertEquals(Money.dollars(0), mycroft.balanceFor(AccountType.TAX));

      //service call after rule change
      AccountingEvent call2 = new MonetaryEvent(Money.dollars(100),
              EventType.SERVICE_CALL,
              new MfDate(1999, 12, 1),
              new MfDate(1999, 12, 15),
              mycroft);
      eventList.add(call2);
      eventList.process();
      assertEquals(Money.dollars(0), mycroft.balanceFor(AccountType.BASE_USAGE));
      assertEquals(Money.dollars(245), mycroft.balanceFor(AccountType.SERVICE));
      assertEquals(Money.dollars(0), mycroft.balanceFor(AccountType.TAX));
  }