事件源是一种利用存储不变性的架构模式。事件源的基本思想如下——与其存储数据的当前状态,不如存储修改数据的事件?这个想法看似激进,但并不新鲜;事实上,您已经在使用基于这一原则的工具——像 Git 这样的源代码控制系统遵循这一架构。我们将更详细地探讨这个想法,包括讨论它的优点和缺点。
本章将涵盖以下主题:
- 不变性的概念如何应用于数据存储
- 事件源架构是什么样的
- 在决定是否使用事件源时要考虑什么
您将需要一个支持 C++ 17 的编译器。我用的是 GCC 7.4.0。
代码可以在的 GitHub 上找到。com/ PacktPublishing/动手-函数-用- Cpp 编程Chapter13
文件夹中的。它包含并使用了doctest
,这是一个单头开源单元测试库。你可以在它的 GitHub 资源库上找到它。com/ onqtam/ doctest 。
直到 2010 年左右,数据存储的选择还相当有限。无论您首选的是 Oracle、MySQL 还是 PostgreSQL,您都必须为数据使用关系模型。
然后,突然,大量新的数据库引擎不知从哪里冒出来,对关系数据的支持部分甚至没有。它们是如此的不同,以至于它们挑战了积极的分类,所以世界最终根据它们没有做的事情来命名它们——NoSQL 数据库。事实上,它们唯一的共同点是对 SQL 的支持少之又少。引擎的列表很长,而且在不断变化,但是在撰写本文时,有几个很流行——Redis、MongoDB、DynamoDb、Cassandra 和 Couchbase 等等。这些引擎中的每一个都有自己的优缺点,它们出现的原因是针对各种场景进行优化,通常是在云计算的背景下。例如,Cassandra 是高度分布式的,而 MongoDB 允许轻松存储多种类型的数据。
大约在我听说 NoSQL 的同时,我开始听说一种新的建筑模式,叫做事件源。与通常的用户界面-服务器-关系数据库管理系统模式相比,事件源采用了完全不同的数据存储方法。事件源模式没有存储系统的当前状态,而是说——为什么我们不将系统的增量变化存储为域事件?
机敏的读者会注意到关于这个想法的两点:
- 这听起来像是来自领域驱动设计(【DDD】)运动的东西,的确如此。领域事件可以是我们使用的另一种模式,作为我们的 DDD 架构方法的一部分,也是我们领域模型发展的一部分。
- 将增量更改存储在数据存储中的想法虽然对业务应用来说很激进,但在软件架构中并不新鲜。事实上,在本书的整个写作过程中,我一直在使用基于这种模式的工具。您可能还用它来获取代码示例。虽然使用比我们将讨论的事件源更复杂的历史模型,但 Git 将增量更改存储在代码的当前状态旁边。
Git 不是唯一使用这种模式的工具。多年来,我们一直在数据备份操作中使用此类工具。由于完整备份可能需要很长时间,因此一个好的策略是将频繁的增量备份与不频繁的完整备份混合在一起。但是,诀窍在于,当需要恢复时,我们可以一个接一个地应用增量备份,从而获得与完整备份相同的状态。一方面是用于备份的时间和存储,另一方面是恢复备份所需的时间,这是一个很好的权衡。
此时,您可能会想知道事件源除了与存储相关之外,还与 NoSQL 数据库有什么关系?虽然我无法证明,但我相信这两个想法来自 2010 年代围绕编程的同一思潮——通过消除技术障碍来优化开发速度,并为各种基于网络和云的架构优化系统。
让我们思考一下推特。在数据流方面,推特有两个主要功能——发布消息和查看其他人发布的消息。如果您没有立即看到另一个用户发布的消息,您甚至不会知道它,因此允许高延迟。然而,我们不想丢失数据,所以我们需要尽快存储用户消息。
实现类似这样的事情的标准方法是根据请求将消息直接保存到数据库中,并在响应时返回更新的提要。这使我们可以立即看到消息,但它有几个缺点。首先,它使数据库成为瓶颈,因为每个发布的消息都执行一个INSERT
和一个SELECT
语句。其次,它需要服务器上更多的资源,从而增加了基于云的服务器的成本。
如果我们有不同的想法呢?当您发布消息时,我们只需将事件保存到快速事件存储中并立即返回。将来请求更新提要时,会考虑该事件并返回更新的提要。数据存储不再是瓶颈,我们已经降低了服务器负载。然而,我们在系统中添加了一个新的元素,事件存储,这可能会花费更多一点,但事实证明,在高规模下,这可能比替代方案更便宜,响应速度也更快。这是事件源的一个例子。
另一种选择是在数据引擎级别解决这个问题,并如前所述将写入和读取分开;然而,我们使用的数据存储是为编写而优化的。不利的一面是,数据可以以比以前更高的延迟读取,但这没关系。在未来的某个时候,它会变为可用,并且消息源会更新。这是一个使用 NoSQL 数据库而不是关系数据库管理系统的例子。
2010 年代确实非常有趣,在将函数式编程引入主流编程语言的同时,在软件架构和设计方面产生了许多新想法。顺便说一句,他们还对从漫威电影宇宙 ( MCU )发布的超级英雄系列电影感兴趣。两者没有联系,我就是喜欢 MCU!然而,我不得不停止 fanboying(关于软件设计和 MCU 的历史),转到另一个奇怪的想法——将不变性带入数据存储。
我们已经看到不变性对代码结构有着深远的影响,因此对软件设计也有着深远的影响。我们也多次讨论过,输入/输出从根本上来说是可变的。我们将展示数据存储不一定是可变的,并且不可变的数据存储对架构也有深远的影响。
数据存储如何才能不变?毕竟,许多软件应用的全部原因是做 CRUD——创建、检索、更新和删除。唯一不改变数据的操作是检索,尽管在某些情况下,检索数据会有额外的副作用,如分析或记录。
然而,请记住,我们在数据结构方面面临着同样的问题。可变数据结构在添加或删除元素时会改变其结构。然而,纯函数式语言支持不可变的数据结构。
不可变数据结构具有以下属性—添加或删除项不会改变数据结构。相反,它返回初始数据结构的副本以及更改。为了优化内存,纯函数式编程语言实际上并不克隆数据,它们只是聪明地利用指针来重用现有的内存。然而,对于程序员来说,就好像数据结构已经被完全克隆了一样。
考虑将同样的想法应用于存储。每次写入或删除都会创建一个应用了更改的数据新版本,而不是更改现有数据,同时保持以前的版本不变。想象可能性;我们获得了数据更改的整个历史,并且我们总是可以恢复它们,因为我们有数据的最新版本。
不过,这并不容易。存储的数据往往很大,每次更改时复制数据会消耗大量存储空间,并且在此过程中会变得极其缓慢。与内存数据相同的优化技术效果不太好,因为存储的数据往往更复杂,而指针不是(还没有?)对于文件系统来说同样易于管理。
幸运的是,还有一个替代方法——首先存储一个版本的数据,然后只存储对数据的一些更改。我们可以在关系数据库中实现这一点(毕竟更改只是实体),但幸运的是,我们不必这样做。为了支持这种存储模式,已经实现了统称为事件存储的存储引擎。它们允许我们存储事件,并在需要时获取最新版本的数据。
这样的系统将如何工作?我们需要对领域和领域事件进行建模。让我们以推特为例:
如果我们使用传统的数据存储,我们会以某种方式保存实体,但我们希望存储事件,因此我们将拥有一长串增量更改,概念上看起来如下:
CreateUser name:alexboly -> userid 1
CreateUser name: johndoe -> userid 2
PostMessage userid: 1, message: 'Hello, world!' -> messageid 1
PostMessage userid: 2, message: 'Hi @alexboly' -> messageid 2
CreateNotification userid: 1, notification: "Message from johndoe"
PostMessage userid: 1, message: 'Hi @johndoe' -> messageid 3
CreateNotification userid: 2, notification: "Message from alexboly"
LikeMessage userid: 2, messageid: 3
...
在我们继续看一个实现的例子之前,我们需要记住我们讨论的是软件架构,没有一个解决方案是完美的。因此,我们必须停下来考虑一下我们在使用事件源时所做的权衡。
如果没有优势,我们就不会谈论活动采购。
在概念层面上,领域模型和领域事件可以很容易地在非常快速、轻量级的会话中从领域专家那里提取出来。事件风暴是一个简化的会议,通过技术和领域专家之间的合作,允许我们在几个小时内设计一个复杂的系统。本次活动创造的知识不容小觑;这样的共同理解是知识工作中任何跨领域合作的坚实基础。
在软件设计层面,事件源比其他代码结构更能揭示意图。域操作倾向于隐藏在实体内部;有了事件源,对领域模型的改变是架构的首要和中心。我们实际上可以搜索数据可能经历的所有变化,并获得一个列表——这对于其他代码结构来说是很困难的。
在编码层面,事件源简化了编程。虽然在事件中思考起初可能很困难,但它很快会成为第二天性。这个模型允许我们编写反映最重要业务特性的代码,从而使程序员和产品所有者或客户之间更容易理解。它还巧妙地封装了每种类型的变更,从而简化了我们的测试和代码。
在数据存储层面,事件源允许我们查看对数据所做的更改列表,这对于其他数据存储模型来说是一个极端的壮举。增量备份更适合这种模式,因为它基本上是增量备份。恢复内置于数据存储中,允许我们从任何过去的物化存储开始,并应用所有事件。
此外,事件源允许我们回到过去。如果每个事件都有一个相反的事件,这通常很容易做到,我们可以从结束到某个时间戳播放相反的事件,从而引导我们找到当时的确切数据。
在性能级别上,事件源优化了数据写入,使其对大多数需要快速写入但可以处理读取延迟的应用非常有用(也称为大多数基于网络的系统)。
但是没有什么是免费的,那么会出什么问题呢?
尽管事件源有很多优点,但它可能会成为构建复杂应用的一种流行方式,但它也有一些重要的缺点,你需要在加入之前考虑一下。
第一个问题来自事件源的核心模型——如果我们已经有了一堆数据,还需要改变事件的结构,会怎么样?例如,如果我们需要为每个事件添加时间戳,会怎么样?或者,如果我们需要更改我们的PostMessage
事件,以包括一个可见性字段,该字段可能只是接收者,只是追随者,或者是所有人呢?
这个问题有解决办法,但每个办法都有自己的问题。一种解决方案是对事件模式进行版本化,并同时拥有多个模式,这种方法可行,但会使具体化变得复杂。另一个解决方案是使用数据迁移脚本来改变过去的事件,但是它打破了不变性的概念,必须正确完成。另一种选择是永远不要更改事件模式,只需添加一个新的事件类型,但这可能会由于多个不推荐使用的事件类型而导致混乱。
第二个问题是隐私。最近在欧盟 ( EU )通过的通用数据保护条例(【GDPR】)影响了世界各地的许多软件系统,赋予用户要求从系统中完全删除私有数据的权利。当使用普通数据库时,这相对容易——只需删除与用户标识相关的记录——但是我们如何在事件存储中做到这一点呢?
我们可以从删除与用户相关的所有事件开始。但是我们能做到吗?如果事件具有时间关系,我们可能会遇到问题。例如,设想以下协作编辑文档的场景:
CreateAuthor alexboly => authorid 1
CreateAuthor johndoe => authorid 2
...
AddText index: 2400, authorid:1, text: "something interesting here."
AddText index: 2427, authorid:2, text: "yes, that's interesting" =>
"something interesting here. yes that's interesting"
DeleteText index: 2400, length: 10, authorid: 1 =>"interesting here.
yes that's interesting"
...
如果用户alexboly
要求我们:
CreateAuthor alexboly => authorid 1
CreateAuthor johndoe => authorid 2
...
AddText index: 2400, authorid:1, text: "something interesting here."
AddText index: 2427, authorid:2, text: "yes, that's interesting" =>
"something interesting here. yes that's interesting"
DeleteText index: 2400, length: 10, authorid: 1 =>"interesting here.
yes that's interesting"
...
你看到问题了吗?如果删除突出显示的事件,我们不仅会丢失文档中的数据,而且索引也不再匹配!因此,将事件应用于空白文档将导致错误或数据损坏。
我们可以做几件事:
- 一种解决方案是删除用户身份,但保留数据。虽然这可以在特定的环境中工作,但这种解决方案取决于删除请求的范围。有一种特殊情况,用户将个人数据(例如,地址、电子邮件地址或身份证号码)添加到文档中。如果我们删除了用户的身份,但还需要删除个人数据,我们将需要扫描所有事件中的个人数据,并删除或替换为相同数量的空白字符。
- 另一个解决方案是物化数据库,删除数据,并从具有未来事件的新检查点开始。这打破了事件源的核心思想之一——从空存储中重建数据的能力——对于有许多事件或许多删除的系统来说,这可能很困难。不过,有了适当的规划和结构,这是可能的。
- 第三种解决方案是利用架构优势,为
DeletePrivateData
使用特殊事件。然而,这个事件是不同的,因为它将不得不改变事件存储而不是数据。虽然它符合架构,但它有风险,需要大量测试,因为它会破坏一切。 - 第四种解决方案是设计事件,使它们不在时间上耦合。理论上,这听起来不错,但我们不得不承认,在实践中,这可能并不总是可能的。在前面的例子中,我们需要文本的某种位置,我要求您找到一种方法来指定独立于现有文本的位置。还要考虑到,我们会在一种罕见的情况下进行这种设计工作,这可能会使所有事件不太容易理解。如果可能的话,改变很小,很好;但是如果没有,你需要自己做决定。
我们接下来将看一个使用事件源实现的简单例子。我们将从我们的 Twitter 示例开始,并开始编写一些测试。
首先,让我们创建一个用户,用伪代码检查事件存储中的正确事件:
TEST_CASE("Create User"){
EventStore eventStore;
...
auto alexId = createUser("alexboly", eventStore);
...
CHECK_EQ(lastEvent, expectedEvent);
}
我们需要一些东西来编译这个测试。首先,一个可以存储事件的事件存储,但是我们如何表达一个可以存储的事件呢?我们需要某种能够保存属性名称和值的数据结构。最简单的是一个map<string, string>
结构,它将属性的名称映射到它们的值。为了看到它的运行,让我们为CreateUser
创建事件结构:
auto makeCreateUserEvent = [](const string& handle, const int id){
return map<string, string>{
{"type", "CreateUser"},
{"handle", handle},
{"id", to_string(id)}
};
};
CreateUser
事件有一个类型,CreateUser
,需要一个手柄,例如alexboly
,用户需要一个id
。让我们用typedef
把它变得更好更明确:
typedef map<string, string> Event;
auto makeCreateUserEvent = [](const string& handle, const int id){
return Event{
{"type", "CreateUser"},
{"handle", handle},
{"id", to_string(id)}
};
};
我们现在可以创建我们的EventStore
。因为它基本上是一个事件列表,让我们使用它:
class EventStore : public list<Event>{
public:
EventStore() : list<Event>(){
};
};
因此,现在我们的测试可以使用EventStore
和makeCreateUserEvent
功能来检查,在调用createUser
之后,正确的事件将在事件存储中:
TEST_CASE("Create User"){
auto handle = "alexboly";
EventStore eventStore;
auto alexId = createUser(handle, eventStore);
auto expectedEvent = makeCreateUserEvent(handle, alexId);
auto event = eventStore.back();
CHECK_EQ(event, expectedEvent);
}
我们现在只需要执行createUser
就可以让这个测试生效。很简单;调用makeCreateUserEvent
并将结果添加到EventStore
。我们需要一个id
,但是因为我们只有一个元素,现在,让我们使用一个硬编码值1
:
int id = 1;
auto createUser = [](string handle, EventStore& eventStore){
eventStore.push_back(makeCreateUserEvent(handle, id));
return id;
};
测试通过;现在我们可以执行事件,它们将进入事件存储。
现在让我们看看新用户如何发布消息。我们将需要第二种事件类型PostMessage
,以及类似的代码基础结构。让我们写测试。首先,我们需要创建一个用户。其次,我们需要创建一个通过userId
链接到用户的消息。测试如下:
TEST_CASE("Post Message"){
auto handle = "alexboly";
auto message = "Hello, world!";
EventStore eventStore;
auto alexId = createUser(handle, eventStore);
auto messageId = postMessage(alexId, message, eventStore);
auto expectedEvent = makePostMessageEvent(alexId, message,
messageId);
auto event = eventStore.back();
CHECK_EQ(event, expectedEvent);
}
makePostMessageEvent
功能将创建一个包含所有所需信息的Event
结构。它还需要一个类型和messageId
:
auto makePostMessageEvent = [](const int userId, const string& message, int id){
return Event{
{"type", "PostMessage"},
{"userId", to_string(userId)},
{"message", message},
{"id", to_string(id)}
};
};
最后,postMessage
只是将makePostMessageEvent
的结果加入到EventStore
中。我们再次需要一个 ID,但是我们只有一条消息,所以我们可以使用相同的 ID,1
:
auto postMessage = [](const int userId, const string& message,
EventStore& eventStore){
eventStore.push_back(makePostMessageEvent(userId, message, id));
return id;
};
所以,现在我们有了一个可以发布消息的用户,这一切都是通过事件实现的。这很简单,没有一开始看起来那么难。
然而,这个实现提出了一些有趣的问题。
首先,如果我想通过用户的手柄或他们的id
来搜索用户呢?这是推特上的真实使用场景。如果我在带有@alexboly
的消息中提到另一个用户,应该用手柄alexboly
向该用户发送通知。另外,我想在时间轴上显示与用户@alexboly
相关的所有消息。
对此我有两个选择。第一个选项是只存储事件,并在读取数据时运行所有事件。第二个选项是用当前值维护一个域存储,并像任何其他数据库一样查询它。需要注意的是,这些存储中的每一个或两个都可能在内存中,以便快速访问。
不管当前值是缓存的还是计算的,我们都需要一种方法来执行事件并获取它们。我们该怎么做?
让我们写一个测试来描述我们需要什么。在运行一个或多个事件后,我们需要执行这些事件并获取当前值,从而允许我们根据需要检索它们:
TEST_CASE("Run events and get the user store"){
auto handle = "alexboly";
EventStore eventStore;
auto alexId = createUser(handle, eventStore);
auto dataStore = eventStore.play();
CHECK_EQ(dataStore.users.back(), User(alexId, handle));
}
为了通过测试,我们需要一些东西。首先,一个User
域对象,我们将保持非常简单:
class User{
public:
int id;
string handle;
User(int id, string handle): id(id), handle(handle){};
};
第二,有一个列表users
的数据存储:
class DataStore{
public:
list<User> users;
};
最后是play
机制。现在让我们使用一个丑陋的实现:
class EventStore : public list<Event>{
public:
DataStore play(){
DataStore dataStore;
for(Event event : *this){
if(event["type"] == "CreateUser"){
dataStore.users.push_back(User(stoi(event["id"]),
event["handle"]));
}
};
return dataStore;
};
}
知道了高阶函数,我们当然可以看到前面片段中的for
语句可以转化为函数方法。事实上,我们可以通过CreateUser
类型过滤所有事件,然后通过调用transform
将每个事件转换为一个实体。首先,让我们提取一些较小的函数。我们需要一个将CreateUser
事件转化为用户的功能:
auto createUserEventToUser = [](Event event){
return User(stoi(event["id"]), event["handle"]);
};
我们还需要一个按类型过滤事件列表的工具:
auto createUserEventToUser = [](Event event){
return User(stoi(event["id"]), event["handle"]);
};
我们现在可以提取一个playEvents
函数,该函数获取一个事件列表,按类型过滤它,并运行转换,获得一个实体列表:
template<typename Entity>
auto playEvents = [](const auto& events, const auto& eventType,
auto playEvent){
list<Event> allEventsOfType;
auto filterEventByThisEventType = bind(filterEventByEventType,
_1, eventType);
copy_if(events.begin(),events.end(),back_insert_iterator
(allEventsOfType), filterEventByThisEventType);
list<Entity> entities(allEventsOfType.size());
transform(allEventsOfType.begin(), allEventsOfType.end(),
entities.begin(), playEvent);
return entities;
};
我们现在可以在我们的EventStore
中使用该功能来代替CreateUser
的治疗,并将其推广到其他事件:
class EventStore : public list<Event>{
public:
EventStore() : list<Event>(){
};
DataStore play(){
DataStore dataStore;
dataStore.users = playEvents<User>(*this, "CreateUser",
createUserEventToUser);
return dataStore;
};
};
我们现在有了一种基于事件从商店中检索数据的方法。是时候看下一个问题了。
到目前为止,我们已经看到使用事件时实体之间的关系是基于 id 的,但是如果我们用错误的id
来调用事件呢?请看下面代码片段中的例子:
CreateUser handle:alexboly -> id 1
DeleteUser id: 1
PostMessage userId: 1, text: "Hello, world!" -> user with id 1 doesn't
exist anymore
我看到了一些解决这个问题的方法:
- 第一个解决方案是运行事件。如果它不会在显示器上产生额外的问题,这将是可行的。在推特上,如果我看到一条消息,我可以导航到发布这条消息的用户。在这种情况下,导航会导致页面不存在。这有问题吗?我会说,对于像推特这样的东西来说,这不是一个大问题,只要它不经常发生,但是你必须在你自己的产品背景下进行判断。
- 第二种解决方案是在没有任何检查的情况下运行事件,但是运行一个重复的作业来检查引用问题并清除它们(当然是通过事件)。这种方法允许您最终使用事件源清理数据,而不会减慢完整性检查的更新速度。再一次,你需要弄清楚这在你的环境中是否有效。
- 第三种解决方案是在每次事件运行时运行完整性检查。虽然这可以确保参照完整性,但也会降低速度。
检查可以通过两种方式运行——要么检查数据存储,要么检查事件存储。例如,您可以检查 ID 为1
的DeleteUser
从未发生,或者在CreateUser
之后没有发生(但是您需要用户句柄)。
在为您的应用选择事件源时,请记住这一点!
事件源是一种不可变的数据存储方法,从一个简单的想法开始——如果我们存储所有导致当前状态的事件,而不是存储世界的当前状态,会怎么样?这种方法有许多有趣的优点——能够在时间上向前和向后移动,内置增量备份,以及在时间线而不是状态中思考。它还附带了一些警告——删除过去的数据非常困难,事件模式很难更改,引用完整性往往会变得更加松散。您还需要注意可能的错误,并定义以结构化和可重复的方式处理它们的策略。
我们还看到了如何在 lambdas as events 的帮助下实现简单的事件源架构。我们还可以查看用于存储 lambda 的事件源,因为存储的事件基本上是命令模式,命令模式的最简单实现是 lambda。好奇的读者可以尝试将事件序列化/反序列化为 lambdas,看看它如何改变设计。
像任何架构模式一样,我的建议是仔细考虑权衡,并找到实现带来的最重要挑战的答案。如果你选择尝试活动采购,我也建议你尝试一个生产就绪的活动商店,而不是建立自己的。我们在这一章中所写的内容对于展示活动采购的核心原则和挑战非常有用,但它还远未准备好用于生产。
现在是时候转向 C++ 函数式编程的未来了。在下一章中,我们将浏览 C++ 17 中现有的函数式编程特性,并查看关于 C++ 20 的新闻。