600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > 云原生微服务架构实战精讲第七节 调度算法与司机乘客行程查询

云原生微服务架构实战精讲第七节 调度算法与司机乘客行程查询

时间:2024-05-16 20:55:42

相关推荐

云原生微服务架构实战精讲第七节 调度算法与司机乘客行程查询

第19讲:如何实现行程派发与调度算法

第 18 课时介绍了司机模拟器如何发布位置更新事件,以及行程派发服务如何处理这些事件,并维护所有可用的司机信息,本课时紧接着第 18 课时的内容,主要介绍行程派发服务的实现。行程派发是示例应用的核心领域,因此需要进行重点介绍。本课时只对代码实现中的重要部分进行介绍,完整的实现请参考示例应用在 GitHub 上的源代码。

调度算法

当乘客发出创建行程的请求之后,该创建请求首先需要被验证,行程验证由专门的服务来完成。在第 21 课时介绍 Saga 模式的实现时,会具体介绍行程验证服务。当行程通过验证之后,行程会处于已确认状态,与此同时,行程派发服务会开始执行该行程的调度任务。

派发行程的操作由 DispatchService 类的 dispatchTrip 方法来完成。当行程派发服务接收到表示行程已经被确认的 TripConfirmedEvent 事件之后,dispatchTrip 方法就会被调用。下面的代码给出了 dispatchTrip 方法的实现,具体的派发过程分成下面 3 个步骤。

以行程的起始位置为中心,找到所有处于可用状态的司机,这些是派发行程的候选。这一步调用 findAvailableDrivers 方法来完成,实际调用的是 DriverLocationService 类的 findAvailableDrivers 方法。在第 18 课时已经介绍了这个方法,它通过 Redis 来查找特定范围之内的可用的司机。基于行程信息和可用司机的列表来创建表示行程派发的领域类 Dispatch 的对象,保存 Dispatch 对象并发布相关的事件,这是事务性消息模式的应用。如果存在可用的司机,则调用 TripAcceptanceService 类的 startTripAcceptanceCheck 方法来检查是否有司机接受行程。

由于第 1 个步骤已经在第 18 课时进行了介绍,下面会具体对第 2 个和第 3 个步骤进行介绍。

@Transactionalpublic void dispatchTrip(final String tripId, final TripDetails tripDetails) {final Set<AvailableDriver> availableDrivers = this.findAvailableDrivers(tripDetails);this.saveAndPublishEvents(Dispatch.createDispatch(tripId, tripDetails, availableDrivers));if (!availableDrivers.isEmpty()) {this.tripAcceptanceService.startTripAcceptanceCheck(tripId, tripDetails,this.acceptanceCheckInterval,this::selectTripAcceptance, this::notifyTripDispatchFailed);log.info("Dispatch trip {} to drivers {}", tripId, availableDrivers);}}

行程派发领域对象

每个行程的派发动作都有自己的生命周期,因此需要定义相关的实体类 Dispatch,并保存在数据库中。行程派发是所在聚合的根实体,每个行程被派发之后,查找到的可用司机会被邀请来接受行程。对行程的接受动作也是有生命周期的,同样以实体的形式来定义,即 TripAcceptance,属于聚合的一般实体。行程派发实体和行程接受实体存在一对多的关系。下图是这两个实体之间关系的 ER 图。

在创建行程派发对象时,需要提供的是行程信息和可用司机的列表。Dispatch 类的静态方法 createDispatch 用来创建 Dispatch 对象和需要发布的事件对象,如下面的代码所示。在创建出 Dispatch 对象之后,对于每一个表示可用的司机的 AvailableDriver 对象,创建出一个与之对应的 TripAcceptance 对象,用来追踪每个司机接受行程的状态。这些 TripAcceptance 对象与 Dispatch 对象关联起来。

对于发布的事件对象,如果可用司机的列表为空,则直接把 Dispatch 对象设置为失败状态,发布的事件为 TripDispatchFailedEvent 对象;否则,发布的事件为 TripDispatchedEvent 对象。Dispatch 对象和事件对象以 ResultWithDomainEvents 的形式返回。DispatchService 的 saveAndPublishEvents 方法用来保存 Dispatch 对象并发布事件。

public static ResultWithDomainEvents<Dispatch, DispatchDomainEvent> createDispatch(final String tripId,final TripDetails tripDetails,final Set<AvailableDriver> drivers) {final PositionVO startPos = tripDetails.getStartPos();final Dispatch dispatch = new Dispatch(tripId, startPos.getLng(),startPos.getLat());final List<TripAcceptance> tripAcceptances = drivers.stream().map(driver -> new TripAcceptance(driver.getDriverId(),driver.getPosLng(),driver.getPosLat())).collect(Collectors.toList());dispatch.setTripAcceptances(tripAcceptances);final DispatchDomainEvent event;if (drivers.isEmpty()) {dispatch.setState(DispatchState.FAILED);dispatch.setFailedReason(TripDispatchFailedReason.NO_DRIVERS_AVAILABLE);event = new TripDispatchFailedEvent(tripId,TripDispatchFailedReason.NO_DRIVERS_AVAILABLE);} else {final Set<String> driversId = drivers.stream().map(AvailableDriver::getDriverId).collect(Collectors.toSet());event = new TripDispatchedEvent(tripId, tripDetails, driversId);}return new ResultWithDomainEvents<>(dispatch, event);}

接受行程

在行程派发之后,需要通知司机来接受行程。对于乘客 App 说,可以使用消息推送来发送通知;对于 Web 应用来说,可以使用 WebSocket 来发送通知。只需要添加 TripDispatchedEvent 事件的处理器,就可以使用不同的方式来发送通知。

当司机接收到通知之后,可以选择是否接受行程。在一定的时间之内,所有收到通知的司机都可以选择接受行程。在初始的等待时间过后,如果有司机接受行程,那么会从接受行程的司机中,选择一个来作为行程的接受者,而其他的司机则会收到通知,说明行程已经被其他司机所接受。如果没有司机接受行程,那么会再等待一段时间之后,再进行检查;如果在给定的期限之后,仍然没有司机接受行程,那么行程派发失败。

司机接受行程的请求由行程管理服务提供的 REST API 来负责处理。REST 控制器使用 TripService 处理请求,如下面的代码所示。

@PostMapping("{id}/accept")public void acceptTrip(@PathVariable("id") String id,@RequestBody AcceptTripRequest request) {tripService.driverAcceptTrip(id, request.getDriverId(), request.getPosLng(), request.getPosLat());}

下面的代码给出了 TripService 类的 driverAcceptTrip 方法的实现。在实现中,一个 DriverAcceptTripEvent 事件会被发布。withTrip 方法的作用是根据行程的标识符找到对应的 Trip 对象,并执行操作。

public void driverAcceptTrip(final String tripId, final String driverId,final BigDecimal posLng,final BigDecimal posLat) {this.withTrip(tripId, trip -> this.eventPublisher.publish(trip,ImmutableList.of(new DriverAcceptTripEvent(new DriverAcceptTripDetails(driverId, posLng, posLat)))));}

DriverAcceptTripEvent 事件的处理器调用 DispatchService 类的 submitTripAcceptance 方法,如下面的代码所示。其中 withCurrentDispatch 方法的作用是根据行程的标识符,找到该行程当前的 Dispatch 对象,再对该 Dispatch 对象进行操作。

@Transactionalpublic void submitTripAcceptance(final String tripId,final DriverAcceptTripDetails details) {log.info("Driver {} wants to accept trip {}", details.getDriverId(),tripId);this.withCurrentDispatch(tripId, dispatch -> {this.dispatchRepository.save(dispatch.submitTripAcceptance(details));this.tripAcceptanceService.addDriverToAcceptTrip(tripId, details);});}

在下面的代码中,Dispatch 类的 submitTripAcceptance 方法用来对当前 Dispatch 对象进行修改,把司机对应的 TripAcceptance 对象的状态改为已提交。

public Dispatch submitTripAcceptance(final DriverAcceptTripDetails acceptTripDetails) {Stream.ofAll(this.tripAcceptances).find(tripAcceptance -> Objects.equals(tripAcceptance.getDriverId(),acceptTripDetails.getDriverId())).toJavaOptional().ifPresent(tripAcceptance -> {tripAcceptance.setState(TripAcceptanceState.SUBMITTED);tripAcceptance.setCurrentPosLng(acceptTripDetails.getPosLng());tripAcceptance.setCurrentPosLat(acceptTripDetails.getPosLat());});return this;}

TripAcceptanceService 类负责找到合适的司机来接受行程,如下面的代码所示。在 startTripAcceptanceCheck 方法中,把行程的起始地理位置添加到 Redis 中,然后启动一个定时任务来检查行程的接受状态。该定时任务由 CheckTripAcceptanceTask 类来描述。在每次执行任务时,调用 findDriverToAcceptTrip 方法来从当前已接受行程的司机中,找到距离行程的起始位置最近的司机,并选中该司机来接受行程。

如果当前没有司机接受行程,那么会启动一个新的执行同样操作的任务,并在给定的时间间隔之后运行。对于一个行程,检查任务最多运行 3 次,如果 3 次之后仍然没有司机接受行程,会调用指定的错误回调函数 failureCallback;如果有司机接受行程,则会调用指定的成功回调函数 successCallback。当有司机接受行程时,addDriverToAcceptTrip 方法会被调用来把司机的位置信息添加到 Redis 中,可以在下一次定时任务中被查询到。addDriverToAcceptTrip 方法会被 DispatchService 类的 submitTripAcceptance 方法调用。

public class TripAcceptanceService {@AutowiredRedisTemplate<String, String> redisTemplate;@AutowiredTaskScheduler taskScheduler;private final Distance searchRadius = new Distance(10,DistanceUnit.KILOMETERS);private final String passenger = "__passenger__";private final int acceptanceCheckMaxTimes = 3;public void startTripAcceptanceCheck(final String tripId,final TripDetails tripDetails,final Duration interval,final BiConsumer<String, String> successCallback,final BiConsumer<String, TripDispatchFailedReason> failureCallback) {this.redisTemplate.opsForGeo().add(this.keyForTripAcceptance(tripId),new Point(tripDetails.getStartPos().getLng().doubleValue(),tripDetails.getStartPos().getLat().doubleValue()),this.passenger);this.scheduleCheckTripAcceptanceTask(tripId, interval, successCallback,failureCallback, 1);}private void scheduleCheckTripAcceptanceTask(final String tripId,final Duration interval,final BiConsumer<String, String> successCallback,final BiConsumer<String, TripDispatchFailedReason> failureCallback,final int attempt) {this.taskScheduler.schedule(new CheckTripAcceptanceTask(tripId, interval, successCallback,failureCallback,attempt),Instant.now().plusMillis(interval.toMillis()));}public void addDriverToAcceptTrip(final String tripId,final DriverAcceptTripDetails details) {this.redisTemplate.opsForGeo().add(this.keyForTripAcceptance(tripId),new Point(details.getPosLng().doubleValue(),details.getPosLat().doubleValue()), details.getDriverId());}private Optional<String> findDriverToAcceptTrip(final String tripId) {final GeoResults<GeoLocation<String>> results = this.redisTemplate.opsForGeo().radius(this.keyForTripAcceptance(tripId), this.passenger,this.searchRadius,GeoRadiusCommandArgs.newGeoRadiusArgs().sortAscending());return results.getContent().stream().map(result -> result.getContent().getName()).filter(name -> !Objects.equals(name, this.passenger)).findFirst();}private String keyForTripAcceptance(final String tripId) {return String.format("accept_trip_%s", tripId);}}

当有司机被选中接受行程时,DispatchService 的 selectTripAcceptance 方法会被调用,如下面的代码所示。这个方法会对 Dispatch 对象及其关联的 TripAcceptance 对象进行修改,并发布相应的事件。只有被选中的司机所对应的 TripAcceptance 对象的状态会变为已选中,其他的 TripAcceptance 对象的状态会变为已拒绝,与之相关的事件也会被发布。

@Transactionalpublic void selectTripAcceptance(final String tripId, final String driverId) {log.info("Select driver {} to accept trip {}", driverId, tripId);this.withCurrentDispatch(tripId, dispatch ->this.saveAndPublishEvents(dispatch.selectTripAcceptance(driverId)));}

事件处理配置

下面的代码是 Eventuate Tram 框架使用的领域事件处理相关的配置,产生事件的领域对象是 Trip。创建的 DomainEventHandlers 对象对 Trip 对象产生的 TripConfirmedEvent 和 DriverAcceptTripEvent 两种事件进行处理。

public class DispatchServiceEventConsumer {@AutowiredDispatchService dispatchService;private static final Logger LOGGER = LoggerFactory.getLogger(DispatchServiceEventConsumer.class);public DomainEventHandlers domainEventHandlers() {return DomainEventHandlersBuilder.forAggregateType("io.vividcode.happyride.tripservice.domain.Trip").onEvent(TripConfirmedEvent.class, this::onTripConfirmed).onEvent(DriverAcceptTripEvent.class, this::onDriverAcceptTrip).build();}private void onTripConfirmed(DomainEventEnvelope<TripConfirmedEvent> envelope) {TripDetails tripDetails = envelope.getEvent().getTripDetails();try {dispatchService.dispatchTrip(envelope.getAggregateId(), tripDetails);} catch (Exception e) {LOGGER.warn("Failed to dispatch trip {}", envelope.getAggregateId(), e);}}private void onDriverAcceptTrip(DomainEventEnvelope<DriverAcceptTripEvent> envelope) {dispatchService.submitTripAcceptance(envelope.getAggregateId(),envelope.getEvent().getAcceptTripDetails());}}

事件驱动的微服务总结

本课时是事件驱动的微服务部分的最后一个课时,我对事件驱动的微服务做一下总结。事件驱动的微服务使用异步传递的消息来代替同步的微服务 API 调用。当一个微服务的领域对象状态发生变化时,会发布相应的事件来通知感兴趣的其他微服务。每个微服务可以添加对感兴趣的领域事件的处理器,来修改自己内部的对象状态,事件的发布和处理使用发布者 - 订阅者(PubSub) 模式。事件的发布者和处理者之间不存在耦合关系。

值得一提的是,在事件驱动的微服务中,每个微服务所发布和处理的事件,成为了其公开 API 的一部分,需要在设计时与其他微服务进行协调。比如,一个事件的发布者不能随意修改事件的载荷格式,不过事件驱动设计的一个好处是事件的版本更新很容易。比如,当前的某个事件的类型声明是 MyEvent,如果需要改变该事件的载荷格式时,只需要创建一个新的事件类型 MyEvent2 即可。在代码迁移阶段,事件的发布者可以同时发布这两种类型的事件,这样可以保证已有的代码不会出错。等所有的事件处理器都升级到使用新的事件类型之后,只需要停止旧版本的事件发布即可。

为了保证对象状态的修改和事件的发布之间的一致性,我们使用事务性消息模式,典型的做法是事务性发件箱模式。Eventuate Tram 框架提供了对事务性发件箱模式的支持,示例应用使用这个框架来进行跨服务的消息传递。

与事务性消息模式不同的是,事件源技术使用事件来描述所有对对象状态的修改。这样可以产生一个完整的对象状态修改的日志记录,形成一个审核日志。通过这些事件,可以查询到对象在任意时刻的状态。

总结

作为事件驱动的微服务部分的最后一个课时,本课时主要介绍了行程派发服务的一些实现细节,包括其中的领域对象、行程派发算法和接受行程的实现。这些实现细节可以帮助你更好的理解事务性消息模式的使用。最后对事件驱动的微服务做了总结。相信通过部分的学习,你已经掌握或了解了事件驱动的微服务的开发,在今后的工作中可以按照事件驱动的思路来设计和实现微服务。

第20讲:什么是数据一致性与 Saga 模式

从本课时开始,我将开始介绍跨微服务的协作与查询,这一部分的内容主要涉及微服务之间的交互方式。由于每个微服务一般都各自独立存储数据,所以在不同微服务之间共享数据变得复杂。本课时将讲解微服务架构的应用中的数据一致性问题,以及 Saga 模式。

数据一致性

数据一致性是软件开发中经常会遇到的问题,指的是相互关联的数据的值出现了不一致,破坏了业务逻辑中的不变量。数据一致性是一个很宽泛的话题,很多开发中的常见问题都可以归类为数据一致性的问题。数据一致性的问题通常涉及多个动作,每个动作都会对一些数据进行修改,这些动作的整体才是对数据的完整修改,这几个动作在逻辑上组成了一个工作单元(Unit of Work)。我们需要保证的是同一工作单元中的全部动作在执行前后,业务逻辑中所规定的不变量不被破坏。

一个典型的问题是银行账户之间的转账。当从账户 A 转账一定金额到账户 B 时,在转账操作执行前后,这两个账户的余额总和应该保持不变,这就是转账这一业务逻辑的不变量。如果转账操作成功,那么账户 A 的余额会减去转账金额,同时账户 B 的余额会加上转账金额;如果转账失败,则两个账户的余额保持不变。这两种情况都保证了业务逻辑的不变量。如果转账金额从账户 A 中被扣掉,而账户 B 的余额没有增加,这就表示业务逻辑的不变量被破坏,也就是出现了数据一致性的问题。

数据一致性问题的一个典型场景是在数据库操作中,关系型数据库通过事务来解决一致性问题。

数据库事务的 ACID 特性

数据一致性问题的一个解决办法是保证工作单元的原子性,也就是说,工作单元中的全部动作,要么全部发生,要么全部不发生。在关系式数据库管理系统中,事务用来作为多个语句执行时的单元。数据库事务满足 ACID 特性,ACID 是原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)对应的英文单词首字母的缩写。

原子性指的是每个事务都被当成一个独立的单元,其中包含的语句要么全部成功,要么全部不执行。如果事务中的一个语句执行失败,整个事务会被回滚,不会对数据库产生影响。上面提到的银行账户之间转账的例子,如果对两个账户的操作都在一个事务中完成,那么事务的原子性可以保证业务逻辑中的不变量不被破坏。

一致性指的是事务只会把数据库从一个合法的状态带到另外一个合法的状态,并保持数据库的不变量。数据库的不变量与之前提到的业务逻辑的不变量并不相同。数据库的不变量指的是为了保证数据的完整性所定义的规则,包括约束、级联操作和触发器等。常用的规则包括,数据库表中的主键必须唯一,外键所引用的主键必须存在等。

隔离性与事务的并发执行有关。事务通常是并发执行的,也就是说,多个事务可能同时对同一个数据库表进行修改。隔离性要求多个事务在并发执行的结果,与这些事务按顺序执行所得到的结果是一样的。也就是说,每个事务都相当于在自己隔离的空间中运行,不受其他事务的影响。

持久性指的是一旦事务被提交,那么即便是系统崩溃,该事件仍然处于已提交状态。一般的做法是使用事务日志来记录已提交的事件,持久性保证了事务的执行结果不会受到系统崩溃的影响。

之前提到的数据一致性问题,如果使用数据库事务,就可以轻松解决。很多的编程语言和框架都支持数据库事务,声明式的事务更加简化了开发人员的工作。比如,在 Java 中,只需要在类或方法上添加 @Transactional 注解,就可以启用事务。如果相关的操作涉及多个数据库,可以使用基于两阶段提交协议的 XA 事务。

在一个分布式系统中,事务并不总是可用的。在第 15 课时提到过,Apache Kafka 不支持 XA 事务,因此无法参与到关系型数据库的事务中来。即便是可以使用 XA 事务,其成本也是很高的。在分布式系统中,可以考虑的另外一种一致性模型是最终一致性

最终一致性的 BASE 特性

最终一致性(Eventual Consistency)指的是,对于一个数据项,如果没有对它做新的改动,那么所有对该数据项的访问最终都会返回最后一次更新的值。最终一致性所提供的特性是 BASE,即基本可用(Basically Available)、软状态(Soft State)和最终一致性(Eventual Consistency)的缩写。BASE 在化学上的含义是碱,刚好与 ACID 的含义酸相对应。

基本可用指的是基本的读取和写入操作是尽可能可用的,但是并不保证一致性。也就是说,读取操作不一定返回的是最近一次更新的值,写入操作只有在解决冲突之后才会被持久化。软状态指的是由于没有一致性的保证,在某个时间点上,我们只能对系统的状态有一个大致的认知。最终一致性的含义如上面所述,只需要等待足够长的时间,系统的状态就会最终恢复一致性。

最终一致性的目标是提高系统的可用性,这就要提到分布式系统中的 CAP 定理。CAP 定理指的是一个分布式数据存储最多只能提供一致性(Consistency)、可用性(Availability)和分区容错性(Partition Tolerance)这三个保证的两个保证。

这三个保证的内容分别是:

一致性,每次读取操作可以获取到最近一次写入的值,或者产生错误;可用性,每次请求总是可以得到一个正确的响应,尽管其中包含的不一定是最近一次写入的值;分区容错性,当由于节点之间的网络原因,造成系统内部的消息丢失时,系统仍然可以继续工作。

由于分布式系统中的网络错误不可避免,分区容错性的保证是必须要有的。所以基于 CAP 定理,当出现网络分区时,就需要在一致性和可用性之间进行选择。一种做法是直接出错,这样保证了一致性,但是会降低可用性,因为不能再提供请求的响应;另外一种做法是返回系统已知的最近值,但是该值不一定是最新的,这样保证了可用性,但是丢失了一致性。

这里需要注意的是,CAP 定理并不是说永远只能在一致性、可用性和分区容错性这三者中选择两个。事实上,当网络没有问题时,一致性和可用性是可以兼顾的。一致性和可用性的取舍,只发生在网络出现问题时。

微服务架构中的最终一致性

微服务架构的本质是一个分布式系统,也同样也会遇到一致性的问题,这种一致性不仅体现在数据层面上,更多的是在业务逻辑上。在微服务架构的应用中,一个业务场景可能会由多个微服务来协作完成,所有参与的微服务的数据必须在业务逻辑上保持一致。比如,在一个外卖订餐系统中,当用户下单之后,订单服务需要进行记录,同时通知餐馆开始准备订单中的菜品,支付服务也需要进行扣款。如果扣款失败,那么订单的状态需要更新,餐馆也需要得到通知。当一个订单成功完成时,订单服务、餐馆服务和支付服务中关于这一订单的数据应该是匹配的。

在微服务架构的应用中,最终一致性是解决数据一致性问题的最现实方案。当业务流程横跨多个微服务时,完成一个业务流程的时间可能会比较长。如果从业务流程的生命周期全过程中的某个时间点来看,相关的数据可能处于不一致的状态。比如,一个外卖订单已经扣款成功,但是餐馆由于自身原因,暂时无法确认是否能提供全部菜品,在这个时间点上来说,用户完成了支付,但是对应的菜品处于未确定状态。如果餐馆无法提供菜品,而导致订单取消,在完成退款操作之前,用户付了钱,但可能没有得到任何菜品。如果等整个业务流程全部完成,那么系统的状态会恢复一致性。

在微服务架构中,描述业务流程,需要用到下面介绍的 Saga 模式。

Saga 模式

Saga 最早在 1987 年作为解决数据库系统中的长时间运行的事务的方案而出现,该模式通常又被称为长时间运行的事务(Long-Running Transaction)。一个长时间运行的事务,由多个小的本地事务组成,它避免了对非本地资源的锁定,并通过补偿机制来处理失败。长时间运行的事务并不具备数据库事务的全部 ACID 特性,但是组成它的本地事务具有 ACID 特性。如果某个本地事务出现错误,那么对于那些已提交的本地事务,会应用其对应的补偿机制来恢复状态。

以银行账户之间的转账操作为例,如果以 Saga 模式来实现,那么从源账户转出和转入到目标账户这两个操作都由本地事务来完成。假设从账户 A 转账 100 元到账户 B,如果从账户 A 的转出操作成功,而转入账户 B 的操作失败,那么会执行对应的补偿操作,也就是对账户 A 存入 100 元。这样就保证了数据的一致性。

虽然 Saga 模式起源于数据库系统,它非常适合于微服务架构,该模式用来保证业务事务(Business Transaction)的数据一致性。业务事务可能横跨多个微服务的边界,涉及不同类型的数据存储,还可能有人员的参与。这样的业务事务有自己的状态,而且可能耗时漫长,Saga 模式是实现业务事务的良好解决方案。

在应用 Saga 模式之后,每个微服务更新本地的数据库,并发布事件来推动业务事务往前发展。根据是否有协调者,Saga 分成编排型(Choreography)和编制型(Orchestration)两种,其中编制型有协调者。编排型 Saga 中的本地事务由事件来直接触发,而编制型中 Saga 的本地事务的触发由协调者来确定。

每个 Saga 中有多个参与者,每个参与者需要定义所执行的操作,以及对应的补偿操作。补偿操作不一定与执行的操作完全相反。比如,订单服务中的创建订单操作的补偿操作是把订单的状态改为已取消,同时根据不同的情况,可能收取一定的取消费用。每个参与者只负责完成整个业务事务中的某一步,并根据执行的结果来确定下一步的操作。编排型 Saga 中的业务逻辑散落在每个参与者之中,而编制型 Saga 中的业务逻辑由协调者来统一管理。业务事务的进程推进由事件和消息来完成,当业务事务进行到最后一步时,这个 Saga 处于已完成的状态。

下图是编排型 Saga 的示意图。图中的每个六边形表示一个服务,其中的箭头表示事件。对事件的处理发生在每个服务的内部,处理的结果会导致新的事件产生。整个业务事务的状态可以从订单对象的状态中得到。

下图是编制型 Saga 的示意图。服务之间传递的是命令和命令的响应,图中以双向箭头来表示。订单服务中有专门的 Saga 实体来维护业务事务的状态,这个 Saga 实体也负责根据之前命令的响应结果,来确定下一步需要调用的命令。

总结

微服务架构中的数据一致性是一个相对复杂的问题,不同微服务中独立的数据存储,使得维护数据的一致性变得困难。本课时对数据一致性的问题做了介绍,包括数据库事务的 ACID 特性,以及最终一致性的 BASE 特性;最后介绍了用来保证数据一致性的 Saga 模式。通过本课时的学习,你将对数据一致性问题有更清楚的认识,了解到 ACID 和 BASE 这两种一致性特性,并对 Saga 模式有最基本的认识。

第21讲:如何使用 Saga 模式实现行程验证

上一课时对 Saga 模式进行了概念上的介绍,本课时将介绍 Saga 模式在实际开发中的应用,涉及示例应用中创建行程取消行程这两个业务场景。Eventuate Tram 和 Axon 框架都提供了对 Saga 模式的支持,示例应用使用的是 Eventuate Tram 框架。Saga 分成编制型编排型两类,本课时将通过两个不同的业务事务来分别说明。创建行程的业务使用编制型 Saga 来实现,而取消行程的业务使用编排型 Saga 来实现,其中又以编制型 Saga 为重点。

编制型 Saga

编制型 Saga 使用一个协调者来管理 Saga 的生命周期,每个 Saga 描述一个业务事务。Saga 的定义用来描述对应的业务事务流程,主要包含具体的步骤,以及步骤之间的递进关系。Saga 定义中有多个参与者,每个参与者可以接受命令并返回响应。在微服务架构的应用中,参与者通常来自不同的微服务。

在运行时,每个 Saga 定义会产生多个实例,每个实例表示业务事务的一次执行。以创建行程为例,每个行程对象的创建过程都有与之对应的 Saga 实例,该实例的状态会被持久化下来。Eventuate Tram 使用关系型数据库来保存 Saga 实例。

Saga 定义可以看成是一个状态机的描述,状态机中的状态来自 Saga 所工作的领域对象,通常是聚合的根实体。状态机中的状态变迁来自对 Saga 参与者所提供的命令的调用,以及命令的回应消息。根据命令的回应结果,状态机转换到不同的状态,当状态机处于某个状态时,会调用与当前状态相关的参与者的命令。

了解业务事务

创建 Saga 的第一步是了解所要描述的业务事务,典型的做法是使用流程图来描述。下图是创建行程的流程图。

从这个流程图中,可以识别出创建行程的 Saga 中的参与者,以及每个参与者需要支持的命令,如下表所示。

处理命令

下一步是在每个微服务中处理与 Saga 相关的命令。在第 16 课时中已经介绍了 Eventuate Tram 框架中的命令的用法。Saga 参与者所使用的命令与普通的命令是相似的,只不过 Saga 中的命令的回应要区分成功和失败这两种情况,这是通过命令消息中特定的消息头的值来确定的。当接收到命令的失败回应时,需要执行补偿操作。

下面代码中的 TripValidationServiceCommandHandlers 类用来定义行程验证服务中,对验证行程的命令 ValidateTripCommand 的处理器。这里需要注意的是,创建 CommandHandlers 对象时使用的是 Saga 参与者特有的 SagaCommandHandlersBuilder 类,而不是通用的 CommandHandlersBuilder 类。

处理 ValidateTripCommand 命令的 validateTrip 方法调用的是 TripValidationService 类的 validateTrip 方法来进行验证。验证时执行的操作包括检查发起行程的乘客是否处于被封禁的状态,以及行程的距离是否超过预设的限制值。如果行程验证成功,则使用 withSuccess 方法返回表示成功的消息;如果验证失败,则使用 withFailure 方法返回以 InvalidTripReply 对象作为载荷的错误消息。

@Component@Slf4jpublic class TripValidationServiceCommandHandlers {@AutowiredTripValidationService tripValidationService;public CommandHandlers commandHandlers() {return SagaCommandHandlersBuilder.fromChannel(TripValidationServiceChannels.tripValidation).onMessage(ValidateTripCommand.class, this::validateTrip).build();}private Message validateTrip(final CommandMessage<ValidateTripCommand> cm) {try {this.tripValidationService.validateTrip(cm.getCommand().getTripDetails());return withSuccess();} catch (final TripValidationException e) {log.warn("Trip is not valid", e);return withFailure(new InvalidTripReply());}}}

Saga 参与者的命令处理器的配置方式也发生了变化,如下面的代码所示。CommandDispatcher 对象使用 SagaCommandDispatcherFactory 来创建,而不是普通的 CommandDispatcherFactory 对象。

@Configuration@EnableAutoConfiguration@Import({SagaParticipantConfiguration.class})@ComponentScanpublic class TripValidationServiceConfiguration {@Beanpublic CommandDispatcher commandDispatcher(final TripValidationServiceCommandHandlers commandHandlers,final SagaCommandDispatcherFactory sagaCommandDispatcherFactory) {return sagaCommandDispatcherFactory.make("tripValidationServiceDispatcher",mandHandlers());}}

除了行程验证服务之外,行程管理服务和支付服务中的命令处理使用相似的方式来实现。

Saga 定义

第三步是创建 Saga 定义,Eventuate Tram 提供了一种简单的 DSL 来描述 Saga 的定义。下面的代码中是创建行程的 CreateTripSaga 类。Saga 是所有 Saga 的接口,其中的方法 getSagaDefinition 用来返回表示 Saga 定义的 SagaDefinition 接口的对象。Saga 和 SagaDefinition 的类型参数 Data 表示该 Saga 所关联的上下文对象的类型。CreateTripSaga 类实现的是 SimpleSaga 接口,而 SimpleSaga 同时继承了 Saga 和 SimpleSagaDsl 接口。SimpleSagaDsl 接口提供了描述 Saga 定义的 DSL 支持,其中默认方法 step 返回一个构建 Saga 定义的 StepBuilder 对象。

@Componentpublic class CreateTripSaga implements SimpleSaga<CreateTripSagaState> {private final SagaDefinition<CreateTripSagaState> sagaDefinition;public CreateTripSaga(final TripServiceProxy tripService,final TripValidationServiceProxy tripValidationService,final PaymentServiceProxy paymentService) {this.sagaDefinition = this.step().withCompensation(tripService.reject,CreateTripSagaState::createRejectTripCommand).step().invokeParticipant(tripValidationService.validateTrip,CreateTripSagaState::createValidateTripCommand).step().invokeParticipant(paymentService.createPayment,CreateTripSagaState::createPaymentCommand).step().invokeParticipant(CreateTripSagaState::paymentRequired, paymentService.makePayment,CreateTripSagaState::makePaymentCommand).onReply(PaymentFailedReply.class,CreateTripSagaState::handlePaymentFailedReply).onReply(Success.class, (state, success) -> state.markAsPaid()).step().invokeParticipant(CreateTripSagaState::shouldConfirmTrip,tripService.confirm,CreateTripSagaState::createConfirmTripCommand).step().invokeParticipant(((Predicate<CreateTripSagaState>) CreateTripSagaState::shouldConfirmTrip).negate(),tripService.reject, CreateTripSagaState::createRejectTripCommand).build();}@Overridepublic SagaDefinition<CreateTripSagaState> getSagaDefinition() {return this.sagaDefinition;}}

StepBuilder 类中的方法用来描述 Saga 定义的每个步骤中可以执行的操作,如下表所示。

invokeParticipant 和 withCompensation 方法描述的操作以发送命令的形式来完成。在 Saga 定义中,只需要描述如何创建命令对象即可,命令以 CommandWithDestination 对象来表示。在 Saga 实例执行到对应的步骤时,框架会负责发送相应的命令。

创建命令对象的第一种方式是提供 Function<Data, CommandWithDestination> 类型的函数,函数的参数是 Saga 实例当前的上下文对象,返回值是 CommandWithDestination 对象。第二种方式是提供一个 CommandEndpoint 对象来描述发送命令的终端,以及 Function<Data, C> 类型的函数来创建命令对象。类型参数 C 是命令对象的类型。CommandEndpoint 中包含了命令发送的通道、命令类和可能的回应类。

在下面的代码中,validateTrip 是发送 ValidateTripCommand 命令的终端,通过 CommandEndpointBuilder 构建器来创建,其中的 withReply 方法声明了 ValidateTripCommand 命令的成功响应 Success 类和错误响应 InvalidTripReply 类。

@Componentpublic class TripValidationServiceProxy {public final CommandEndpoint<ValidateTripCommand> validateTrip = CommandEndpointBuilder.forCommand(ValidateTripCommand.class).withChannel(TripValidationServiceChannels.tripValidation).withReply(Success.class).withReply(InvalidTripReply.class).build();}

我们再回到 CreateTripSaga 类的定义,看一下 Saga 的定义是如何与业务流程关联起来的。第一个步骤对应的是创建初始的行程对象,行程对象与 Saga 实例同时创建,因此不需要发送额外的命令来创建。这个步骤的补偿操作是通过 TripServiceProxy 中的 reject 命令终端来发送 RejectTripCommand 命令,该命令的处理器负责把行程对象的状态改为已拒绝。

第二个步骤对应的操作是进行行程验证,通过 TripValidationServiceProxy 中的 validateTrip 命令终端来发送 ValidateTripCommand 命令。第三个步骤对应的是创建支付记录,通过 PaymentServiceProxy 中的 createPayment 命令终端来发送 CreatePaymentCommand 命令。

第四个步骤对应的是完成支付,不过这个步骤中的动作只有在满足一定的条件时才会被执行,对应的条件通过 invokeParticipant 方法的第一个 Predicate 类型的参数来表示。CreateTripSagaState 对象的 paymentRequired 方法用来判断行程是否需要提前支付,如果是的话,通过 PaymentServiceProxy 对象中的 makePayment 命令终端来发送 MakePaymentCommand 命令,该命令有成功和失败两种不同的回应。onReply 方法的作用是根据回应对象的类型来执行不同的操作。

最后两个步骤用来确定行程的状态,可以是已确认或已拒绝的状态,状态由 CreateTripSagaState 的 shouldConfirmTrip 方法来确定。第五个步骤的动作是通过 TripServiceProxy 中的 confirm 命令终端来发送 ConfirmTripCommand 命令。第六个步骤的动作则是发送 RejectTripCommand 命令。

Saga 实例的上下文对象

Saga 和 SagaDefinition 接口都有一个类型参数来表示上下文对象的类型。在 Saga 实例的生命周期中,上下文对象用来在不同的步骤之间进行数据传递,有些数据可能是 Saga 中的很多步骤所需要的。一个 Saga 中的某些步骤,可能需要使用之前步骤的命令的回应结果,这些数据的共享,都是通过上下文对象来完成的。在 Saga 实例创建的时候,会提供一个上下文对象作为初始状态。在 Saga 的步骤中,可以使用 invokeLocal 方法来修改这个对象的状态,也可以使用 invokeParticipant 方法返回的对象上的 onReply 方法来根据命令的回应修改这个对象。

下面代码中的 CreateTripSagaState 类是 CreateTripSaga 对应的上下文对象类。CreateTripSagaState 类中包含的一些属性作为状态值,一些方法用来创建不同的命令对象,以及一些改变状态值的方法。

@Data@NoArgsConstructor@RequiredArgsConstructorpublic class CreateTripSagaState {@NonNullprivate String tripId;@NonNullprivate TripDetails tripDetails;@NonNullprivate BigDecimal fare;private boolean paid;public RejectTripCommand createRejectTripCommand() {return new RejectTripCommand(this.tripId);}public ValidateTripCommand createValidateTripCommand() {return new ValidateTripCommand(this.tripDetails);}public ConfirmTripCommand createConfirmTripCommand() {return new ConfirmTripCommand(this.tripId);}public CreatePaymentCommand createPaymentCommand() {return new CreatePaymentCommand(this.tripId, this.fare);}public MakePaymentCommand makePaymentCommand() {return new MakePaymentCommand(this.tripId);}public boolean paymentRequired() {return pareTo(BigDecimal.valueOf(100)) > 0;}public void markAsPaid() {this.setPaid(true);}public void handlePaymentFailedReply(final PaymentFailedReply reply) {this.setPaid(false);}public boolean shouldConfirmTrip() {return !this.paymentRequired() || (this.paymentRequired() && this.isPaid());}}

创建 Saga 实例

CreateTripSaga 用来管理创建行程的过程,因此该 Saga 的实例应该在创建行程对象的同时被创建。Saga 实例由 SagaInstanceFactory 对象的 create 方法来创建。create 方法的第一个参数是 Saga 对象,第二个参数是对应的上下文对象。

下面的代码展示了 TripService 的 createTrip 方法的实现。在创建 Trip 对象和发布相应的事件之后,首先创建一个 CreateTripSagaState 对象,再创建一个新的 CreateTripSaga 实例。

@Service@Transactionalpublic class TripService {@AutowiredTripRepository tripRepository;@AutowiredTripFareService tripFareService;@AutowiredTripDomainEventPublisher eventPublisher;@AutowiredSagaInstanceFactory sagaInstanceFactory;@AutowiredCreateTripSaga createTripSaga;public TripVO createTrip(final String passengerId, final PositionVO startPos,final PositionVO endPos) {final ResultWithDomainEvents<Trip, TripDomainEvent> tripAndEvents = Trip.createTrip(passengerId, startPos, endPos);final BigDecimal fare = this.tripFareService.calculate(startPos, endPos);final Trip trip = tripAndEvents.result;trip.setFare(fare);this.tripRepository.save(trip);this.eventPublisher.publish(trip, tripAndEvents.events);final TripDetails tripDetails = new TripDetails(passengerId, startPos,endPos);final CreateTripSagaState data = new CreateTripSagaState(trip.getId(),tripDetails, fare);this.sagaInstanceFactory.create(this.createTripSaga, data);return trip.toTripVO();}}

当 Saga 实例创建之后,其中所包含的步骤会按照顺序来执行。

Saga 单元测试

Eventuate Tram 框架提供了对 Saga 的单元测试支持。下面代码给出了 CreateTripSaga 所对应的单元测试类。Saga 的单元测试以 BDD 的形式来描述。通过 expect 方法来声明每个步骤中所期望发送的命令,以及命令的回应消息。

@ExtendWith(SpringExtension.class)@ContextConfiguration(classes = TestConfig.class)@DisplayName("Trip saga")public class CreateTripSagaTest {@AutowiredTripServiceProxy tripService;@AutowiredTripValidationServiceProxy tripValidationService;@AutowiredPaymentServiceProxy paymentService;@Test@DisplayName("Create trip")public void shouldCreateTrip() {final String tripId = this.uuid();final TripDetails tripDetails = this.tripDetails0();final BigDecimal fare = BigDecimal.valueOf(50);given().saga(this.makeCreateTripSaga(),new CreateTripSagaState(tripId, tripDetails, fare)).expect().command(new ValidateTripCommand(tripDetails)).to(TripValidationServiceChannels.tripValidation).andGiven().successReply().expect().command(new CreatePaymentCommand(tripId, fare)).to(PaymentServiceChannels.payment).andGiven().successReply().expect().command(new ConfirmTripCommand(tripId)).to(TripServiceChannels.trip);}private CreateTripSaga makeCreateTripSaga() {return new CreateTripSaga(this.tripService, this.tripValidationService,this.paymentService);}private TripDetails tripDetails0() {return new TripDetails(this.uuid(),new PositionVO(BigDecimal.ZERO, BigDecimal.ZERO),new PositionVO(BigDecimal.ZERO, BigDecimal.ZERO));}private String uuid() {return UUID.randomUUID().toString();}@TestConfiguration@ComponentScan(basePackageClasses = TripServiceProxy.class)static class TestConfig {}}

编排型 Saga

编排型 Saga 没有单独的 Saga 实体来管理业务事务的流程,而是通过不同参与者之间的事件传递来完成。每个参与者只需要添加相应事件的处理器,通过本地事务来完成操作即可。处理的结果以新的事件方式进行发布,从而触发其他参与者的处理逻辑,推动业务事务的进展。

从实现的角度来说,编排型 Saga 只需要利用 Eventuate Tram 框架中提供的事务性消息即可,并不需要额外的支持。业务事务的流程,只存在于事件的发布和处理之中。以取消行程的业务事务为例,行程的取消涉及乘客和司机两个参与者,乘客和司机都可以发起取消行程的请求。如果另外一方同意,那么行程被取消,同时发送 TripCancelledEvent 事件;如果另外一方不同意,那么行程的取消则需要进行调解,在更新行程状态之后,发送 TripCancellationResolutionRequiredEvent 事件。在行程取消之后,行程派发服务可能需要取消正在进行的行程派发动作。如果取消行程还需要有后续的其他动作,只需要添加新的 TripCancelledEvent 事件的处理器即可。

编排型 Saga 的好处在于简单,并不需要附加的 Saga 实体,另外参与者之间是松散耦合的。编排型 Saga 的缺点在于业务事务的逻辑散落在不同的参与者中,不容易理解整个业务的流程,另外参与者可能会由于事件的发布和处理而产生循环依赖关系。由于这样的缺点,编排型 Saga 一般只用来实现非常简单的业务事务,更多的时候,使用编制型 Saga 是更好的选择。

Saga 的隔离性问题

Saga 由一系列本地事务组成,并通过补偿操作来处理失败。从数据库事务的 ACID 特性来说,Saga 只具有 ACD 特性,缺少了隔离性,隔离性保证了多个事务并发执行的结果,与这些事务顺序执行时的结果保持一致。每个事务都相当于在各自隔离的空间中运行,互相并不影响。Saga 并不具备隔离性,这是因为组成 Saga 的本地事务是各自独立提交的。当一个 Saga 实例的某个步骤完成之后,该步骤对应的本地事务就会被提交,该事务对数据库的改动对其他本地事务是可见的。一个正在运行 Saga 实例中的某个步骤在对数据库进行操作时,可以读取到另外一个 Saga 实例产生的部分结果,也可以覆写掉另外一个 Saga 实例已经写入的结果。这可能会造成数据异常。

以创建和取消行程这两个 Saga 为例,当创建行程的 Saga 实例执行到支付这一步骤时,乘客发出了取消行程的请求,取消行程的 Saga 把行程设置为已取消的状态。在这之后,创建行程的 Saga 实例继续执行,最后把行程又重新设置为已确认状态。这就造成了乘客已经取消的行程,仍然被确认和派发。

解决 Saga 隔离性问题的一个常见方案是在应用层次添加锁,可以把领域对象的状态作为锁。在上面的例子中,在创建支付这一步骤完成之后,可以把行程对象的状态设置为等待支付。取消行程的 Saga 首先检查行程的状态,如果发现行程处于等待支付的状态,它可以直接出错,或是等待创建行程的 Saga 实例完成之后,再进行取消的动作。

另外一种解决方案是从数据库中重新读取领域对象。在上面的例子中,创建行程的 Saga 在确认行程之前,重新读取行程对象,如果发现行程对象的状态变为已取消,则直接出错,将导致 Saga 实例自动执行相应的补偿操作。

总结

本课时详细介绍了如何使用 Eventuate Tram 框架来实现编制型和编排型 Saga,尤其是编制型 Saga 的使用。示例应用使用编制型 Saga 来实现创建行程这一业务事务。同时,还介绍了 Saga 的隔离性问题及解决办法。通过本课时的学习,你应该掌握如何在实际的开发中使用 Eventuate Tram 框架来创建 Saga,以满足业务事务的需求。

第22讲:CQRS 如何设计与实现

本课时和紧接着的第 23 课时将介绍 CQRS 技术相关的内容,本课时侧重讲解 CQRS 技术的基本概念,下一课时将重点讲解 CQRS 技术在示例应用中的使用。

CQRS 是命令和查询的职责分离(Command Query Responsibility Segregation)对应的英文名称的首字母缩写。CQRS 中的命令指的是对数据的更新操作,而查询指的是对数据的读取操作,命令和查询的职责分离指的是用不同的模型来分别进行更新和读取操作。CQRS 与我们通常使用的更新和读取数据的方式并不相同。

我们通常对数据的操作方式是典型的 CRUD 操作,分别表示对记录的创建(Create)、读取(Read)、更新(Update)和删除(Delete)。在有些时候,还会加上一个列表(List)操作来读取满足条件的多个记录,组成LCRUD 操作,CRUD 操作使用的是同一个模型。在面向对象的设计中,通常使用领域对象类来作为模型的描述,在进行持久化时,领域对象的实例被映射成关系型数据库中的表中的记录,或是 NoSQL 数据库中的文档等。这样的实现方式,相信很多开发人员都不陌生,也是开发中经常会用到的模式。很多开发框架都提供了对这种模式的支持,Spring Data 中的 CrudRepository 接口就提供了对 LCRUD 操作的基本抽象。

下图是单一模型的使用示意图,其中的模型在数据存储时使用,而展示模型则提供给客户端使用。更新和读取操作需要在这两个模型之间进行转换。

对更新和读取操作使用单一模型的好处是简单易懂,实现起来也容易,开发人员相关的经验比较丰富。但是单一模型在某些情况下也会遇到一些问题,这也是 CQRS 技术的用武之地。

单一模型的问题

单一模型要面对的问题是如何用一个模型来满足不同的更新和查询请求。当模型比较简单,或是模型的使用者比较少时,这并不是一个太大的问题;当模型变得复杂,或是需要满足很多使用者的不同需求时,维护这样的模型就变得很困难。

在一个应用中,总是有一些模型处于核心的地位,比如电子商务应用中的订单、客户和产品等模型,应用中的各种组件,都或多或少需要用到这些核心模型。如此多的依赖关系,导致核心模型的修改变得很困难,大部分代码在使用时,只需要用到核心模型的部分内容。在进行读取操作时,免不了要根据使用者的需要,对模型进行投影(Projection)和转换操作。投影指的是从模型中选择所需要的数据子集,而转换则是把模型转换成另外一种格式。在进行更新操作时,也需要先把客户端发送的模型转换成内部的单一模型,这样的模型转换会带来一定的性能开销。

模型转换的问题在使用关系型数据库时尤为明显。这是因为关系型数据库在设计时需要遵循不同的范式。规范化的结果是数据查询时可能需要进行多表的连接操作,影响性能。对于这个问题,通常的做法是创建一个单独的报表数据库来满足查询请求。报表数据库的表设计方便更好地满足查询需求,而数据则来源于业务数据库。两个数据库之间的数据同步和表模式转换,一般通过 ETL 工具来完成。这实际上是对更新和查询使用不同模型的做法的一种应用。

CQRS 的应用范围

与传统应用使用单一模型进行全部操作相比,CQRS 分别使用两个不同的模型来进行更新和查询操作。从一个模型到两个模型,所带来的复杂度的提升并不只是简单的翻倍,开发人员需要花费更多的时间来理解这两个模型的使用。只有当 CQRS 所带来的好处,超过它本身引入的复杂度时,使用 CQRS 技术才是有意义的。实际上,对于大部分应用来说,使用传统的单一模型的方式确实更好。适合于 CQRS 技术的应用主要有两类:第一类应用的更新模型和查询模型本身就存在很大差异,第二类应用在更新和查询操作时有不同的性能要求。

如果回顾第 17 课时介绍的事件源技术,可以发现使用事件源技术的应用在更新和查询时的模型是不相同的。事件源技术使用不同类型的事件来表示对状态的修改,而查询时则通过依次应用事件的修改,从而得到相关的结果对象。这使得事件源技术很适合与 CQRS 技术一块使用,实际上,这两者也经常被一块提及。

有些应用在更新和查询时有不同的性能需求,使用单一模型没办法满足这一性能需求,这一点与在算法设计时选择数据结构的思路是相似的。在修改和访问这两种操作中,不同数据结构的时间复杂度是不同的,有些应用的查询操作的数量远多于更新操作,因此需要对查询操作进行优化。使用 CQRS 技术把更新和查询两种操作进行分离之后,就可以对它们分别进行针对性的优化,在运行时可以采用不同的扩展策略。比如,可以为查询操作添加数量很多的运行实例。

在微服务架构的应用中,由于每个微服务各自独立,我们可以只把 CQRS 技术应用在其中的某个微服务上。这样可以充分利用 CQRS 技术的优势,同时避免对整个应用进行较大的改动。

CQRS 的设计

CQRS 的设计要点是为查询和命令创建不同的模型,命令模型用来对数据进行修改,用来描述对数据修改的意愿,而查询模型则用来读取数据,这里需要把命令和事件进行区分。命令描述的是改变状态的期望,而事件则是状态改变的结果。在电子商务网站中,用户可以通过点击相关的按钮来表示希望取消订单的意愿,这会以命令的形式发送到服务器。服务器的命令处理逻辑负责更新数据,并根据处理结果发布不同的事件。发布事件并不是必需的动作。

下图是 CQRS 设计的示意图,需要注意的是其中的箭头表示数据流向。数据查询操作从数据存储开始,转换成查询模型之后,提供给客户端使用;数据更新操作从客户端开始,转换成命令模型之后,保存到数据存储中。

命令模型中只包含必需的信息,一个应用中可能包含很多适用于不同用户场景的命令。以取消订单为例,相应的命令只需要包含订单的标识符即可。

查询模型的设计专门为满足查询请求进行了优化,查询模型在设计时,考虑更多的是使用者的需求。查询模型的使用者通常是用户界面,根据用户界面展示时的要求,来设计查询模型。比如,在电子商务应用中,用户界面需要展示所有订单分类之后的统计信息。如果使用单一模型,需要在读取数据之后,再进行额外的计算来得到统计信息,这种做法的性能相对较差。如果专门为了统计信息设计相应的查询模型,那么只需要直接读取即可,并不需要额外的计算。

CQRS 的实现

实现 CQRS 的重点是更新和查询模型的实现,结合前面课时中对事务性消息的介绍,命令本质上是一种消息。后端实现中通常会使用消息队列或消息中间件来接收命令,接收到的命令都需要进行验证来保证合法性。命令的验证包括两部分:一部分与业务无关,只是检查命令是否满足结构上的要求,比如是否缺失必需的字段等;另一部分则与业务相关,需要根据命令执行时的上下文来确定,比如订单支付命令所处理的订单对象,是否处于合法的状态。通过验证的命令会按照接收的顺序来执行。执行顺序的错误可能造成数据不一致,命令在执行时会更新数据存储。

查询模型的设计需求来自使用者,查询模型通常不包含复杂的业务逻辑,只是作为数据的容器。这使得查询模型使用起来很简单。

下面通过一个具体的实例来说明 CQRS 技术的实现,该实例以银行账户的存款和取款操作作为应用场景。

下面代码中的 AccountCommand 类是账户相关的命令的模型,Command 是所有命令的标记接口。AccountCommand 的子类 CreditCommand 和 DebitCommand 分别表示存款和取款操作。命令只是简单的 POJO 对象。

@Getterpublic abstract class AccountCommand implements Command {private final String accountId;private final AccountAction action;private final MonetaryAmount amount;private final long timestamp = System.currentTimeMillis();protected AccountCommand(final String accountId,final AccountAction action,final MonetaryAmount amount) {this.accountId = accountId;this.action = action;this.amount = amount;}}

查询模型的设计取决于对数据的使用需求。实例的两个使用需求分别是获取当前的账户余额和账户的历史操作。下面代码中的 AccountBalance 类负责维护所有账户的余额信息,其中的 updateBalance 和 getBalance 方法分别用来更新和获取账户的余额。这里使用一个内存中的哈希表作为数据存储。

public class AccountBalance {private final ConcurrentHashMap<String, MonetaryAmount> data = new ConcurrentHashMap<>();public void updateBalance(final String accountId,final MonetaryAmount delta) {pute(accountId, (id, current) -> {if (current == null) {return delta;}return current.add(delta);});}public MonetaryAmount getBalance(final String accountId) {return this.data.getOrDefault(accountId, Money.of(0, "CNY"));}}

下面代码中的 AccountHistory 类负责维护用户账户的历史操作记录,其中的 addHistoryItem 方法用来添加表示历史记录的 AccountHistoryItem 对象,而 getHistoryItems 方法则返回一个账户的全部历史记录。这里同样使用内存中的哈希表作为数据存储。

public class AccountHistory {private final ConcurrentHashMap<String, List<AccountHistoryItem>> data = new ConcurrentHashMap<>();public void addHistoryItem(final AccountHistoryItem item) {pute(item.getAccountId(), (id, items) -> {if (items == null) {items = new ArrayList<>();}items.add(item);return items;});}public List<AccountHistoryItem> getHistoryItems(final String accountId) {return this.data.getOrDefault(accountId, Collections.emptyList());}}

AccountBalance 和 AccountHistory 两个类作为命令模型和查询模型共用的数据存储。下面代码中的 CommandProcessor 类用来处理不同类型的命令。对于 AccountCommand 对象,根据命令的类型,对 AccountBalance 和 AccountHistory 中包含的数据进行不同的修改。

public class CommandProcessor {private final AccountHistory accountHistory;private final AccountBalance accountBalance;public CommandProcessor(final AccountHistory accountHistory,final AccountBalance accountBalance) {this.accountHistory = accountHistory;this.accountBalance = accountBalance;}public void process(final Command command) {if (command instanceof AccountCommand) {final AccountCommand cmd = (AccountCommand) command;final AccountHistoryItem item = new AccountHistoryItem(cmd.getAccountId(),cmd.getAction(),cmd.getAmount(),cmd.getTimestamp());this.accountHistory.addHistoryItem(item);if (cmd instanceof CreditCommand) {this.accountBalance.updateBalance(cmd.getAccountId(), cmd.getAmount());} else if (cmd instanceof DebitCommand) {this.accountBalance.updateBalance(cmd.getAccountId(), cmd.getAmount().negate());}}}}

下面代码中的 AccountQueryService 类用来执行对账户对象的查询操作,所使用的数据存储同样是 AccountBalance 和 AccountHistory 这两个对象。getAccountSummary 方法的返回值 AccountSummary 对象是查询模型,包含了账户余额和历史记录。

public class AccountQueryService {private final AccountHistory accountHistory;private final AccountBalance accountBalance;public AccountQueryService(final AccountHistory accountHistory,final AccountBalance accountBalance) {this.accountHistory = accountHistory;this.accountBalance = accountBalance;}public AccountSummary getAccountSummary(final String accountId) {return new AccountSummary(accountId,this.accountBalance.getBalance(accountId),this.accountHistory.getHistoryItems(accountId).stream().sorted(paringLong(AccountHistoryItem::getTimestamp).reversed()).map(item -> new HistoryItem(item.getAction(), item.getAmount(),this.formatDateTime(item.getTimestamp()))).collect(Collectors.toList()));}private String formatDateTime(final long timestamp) {return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(DateTimeFormatter.ISO_DATE_TIME);}}

总结

与传统应用使用单一模型相比,CQRS 技术使用两个不同的模型来进行更新和查询操作。对于某些应用来说,CQRS 技术可以更好地对应用建模,以及提高性能。本课时对 CQRS 技术的设计和实现进行了基本的介绍,通过本课时的学习,你可以对 CQRS 技术有一定的了解,知道在什么情况下使用 CQRS 技术是一个好的选择。

第23讲:如何查询乘客和司机的行程

上一课时对 CQRS 技术做了理论上的介绍,本课时将讲解 CQRS 技术在实际开发中的应用。

历史行程查询

在示例应用中,乘客和司机都需要查询历史行程的信息,行程的信息由行程管理微服务负责维护,保存在数据库中。最直接的做法是由行程管理微服务提供 API 来查询历史行程。但是这种做法有一个最大的问题在于,行程管理微服务中保存的行程信息是不完整的,行程的乘客和司机都只有标识符,没有具体的信息。相关的具体信息由乘客管理微服务和司机管理微服务来维护,这也是应用领域驱动设计的结果。乘客和司机分别是所在聚合的根实体,同样作为聚合根实体的行程,只能引用其他聚合根实体的标识符。

历史行程中需要包含乘客和司机的具体信息,这就意味着我们需要一种方式从这两个微服务中获取这些信息。

第一种获取信息的做法是在查询历史行程时,通过这两个微服务的 API 来获取。因为行程对象中已经包含了乘客和司机的标识符,只需要一个 API 调用,就可以获取到所需的信息,然后再与行程对象中的已有信息进行合并,就得到了所需要的结果。这种做法类似于关系型数据库中的表连接操作。它的缺点是性能很差,每个历史行程的获取都需要两次微服务 API 的调用。

第二种做法是修改行程管理服务中的行程对象的定义,以增加所需要的额外字段。这种做法的不足之处在于,这些附加的乘客和司机信息,与行程管理微服务的业务逻辑无关,只是为了满足特定的查询需要而引入的,并不是一个很好的设计。

第三种做法是为行程历史记录创建独立的存储,包含所需的全部数据。这种做法的好处是查询的性能很好,不需要额外的操作,从设计的角度来说,也实现了更好的职责划分。问题在于需要在不同的微服务之间同步数据。

下面通过具体的历史行程查询服务来说明第三种做法的实现。下面代码中的 Trip 类是历史行程查询服务中表示行程的领域对象类。与行程管理服务中的 Trip 类相比,下面所展示的 Trip 类的定义被简化了很多,只包含所需的属性,并没有相关的业务逻辑。这也符合对于查询模型的期望,查询模型只是作为数据的容器,其中除了一些基本的数据转换逻辑之外,并不包含与业务相关的内容。Trip 类的属性 passengerName 和 driverName 分别表示乘客和司机的名字,这是相对于行程管理服务中的 Trip 类而言,新增加的附加属性。

@Entity@Table(name = "trips")@Datapublic class Trip {@Idprivate String id;@Column(name = "passenger_id")private String passengerId;@Column(name = "passenger_name")private String passengerName;@Column(name = "driver_id")private String driverId;@Column(name = "driver_name")private String driverName;@Column(name = "start_pos_lng")private BigDecimal startPosLng;@Column(name = "start_pos_lat")private BigDecimal startPosLat;@Column(name = "end_pos_lng")private BigDecimal endPosLng;@Column(name = "end_pos_lat")private BigDecimal endPosLat;@Column(name = "state")@Enumerated(EnumType.STRING)private TripState state;}

接下来需要解决的问题是如何保持数据的同步。由于行程管理服务已经发布与行程对象相关的事件,我们只需要处理这些事件,就可以使得历史行程查询服务中的数据,与行程管理服务中的数据保持一致。当表示行程已创建的 TripCreatedEvent 事件发布时,历史行程查询服务需要创建同样的行程对象。当行程状态发生改变时,历史行程查询服务需要做出来相同的改动。

下面代码中的 TripHistoryServiceEventConsumer 类负责处理与行程对象相关的事件。当不同的事件发生时,调用 TripService 的不同方法来更新数据。通过这样的方式,可以保证查询模型中的数据的最终一致性。所处理的事件来自不同的微服务中聚合的根实体,包括行程、乘客和行程派发。

public class TripHistoryServiceEventConsumer {@AutowiredTripService tripService;public DomainEventHandlers tripDomainEventHandlers() {return DomainEventHandlersBuilder.forAggregateType("io.vividcode.happyride.tripservice.domain.Trip").onEvent(TripCreatedEvent.class, this::onTripCreated).onEvent(TripConfirmedEvent.class, this::onTripConfirmed).onEvent(TripCancelledEvent.class, this::onTripCancelled).build();}public DomainEventHandlers passengerDomainEventHandlers() {return DomainEventHandlersBuilder.forAggregateType("io.vividcode.happyride.passengerservice.domain.Passenger").onEvent(PassengerDetailsUpdatedEvent.class,this::onPassengerDetailsUpdated).build();}public DomainEventHandlers dispatchDomainEventHandlers() {return DomainEventHandlersBuilder.forAggregateType("io.vividcode.happyride.dispatchservice.domain.Dispatch").onEvent(TripAcceptanceSelectedEvent.class, this::onTripAccepted).build();}private void onTripCreated(final DomainEventEnvelope<TripCreatedEvent> envelope) {this.tripService.createTrip(envelope.getAggregateId(),envelope.getEvent().getTripDetails());}private void onPassengerDetailsUpdated(final DomainEventEnvelope<PassengerDetailsUpdatedEvent> envelope) {this.tripService.updatePassengerDetails(envelope.getAggregateId(),envelope.getEvent().getPassengerDetails());}private void onTripConfirmed(final DomainEventEnvelope<TripConfirmedEvent> envelope) {this.tripService.updateTripState(envelope.getAggregateId(), TripState.CONFIRMED);}private void onTripCancelled(final DomainEventEnvelope<TripCancelledEvent> envelope) {this.tripService.updateTripState(envelope.getAggregateId(), TripState.CANCELLED);}private void onTripAccepted(final DomainEventEnvelope<TripAcceptanceSelectedEvent> envelope) {final TripAcceptanceSelectedEvent event = envelope.getEvent();this.tripService.setTripDriver(envelope.getAggregateId(), event.getDriverId());}}

这里需要注意的是对 PassengerDetailsUpdatedEvent 事件的处理,这是一个新增的事件,用来更新 Trip 对象中的 passengerName 属性。乘客管理服务同样会对 TripCreatedEvent 事件进行处理,并根据事件中的行程对象的乘客标识符,找到乘客的名称,并发布新的 PassengerDetailsUpdatedEvent 事件。下面代码中的 PassengerServiceEventConsumer 类展示了乘客管理服务对 TripCreatedEvent 事件的处理逻辑。

public class PassengerServiceEventConsumer {@AutowiredPassengerService passengerService;@AutowiredDomainEventPublisher domainEventPublisher;public DomainEventHandlers domainEventHandlers() {return DomainEventHandlersBuilder.forAggregateType("io.vividcode.happyride.tripservice.domain.Trip").onEvent(TripCreatedEvent.class, this::onTripCreated).build();}private void onTripCreated(final DomainEventEnvelope<TripCreatedEvent> envelope) {final String passengerId = envelope.getEvent().getTripDetails().getPassengerId();this.passengerService.getPassenger(passengerId).ifPresent(passenger -> {final PassengerDetails passengerDetails = new PassengerDetails(passenger.getName());this.domainEventPublisher.publish("io.vividcode.happyride.passengerservice.domain.Passenger",passengerId,Collections.singletonList(new PassengerDetailsUpdatedEvent(passengerDetails)));});}}

如果乘客修改了自己的名字,乘客管理服务同样可以发布 PassengerDetailsUpdatedEvent 事件,这样历史行程查询服务中的行程信息同样会被更新。当乘客名称更新时,该乘客的所有行程记录都会被更新。

Axon 框架实现

Axon 框架提供了事件源和 CQRS 技术的完整实现,如果使用 Axon 框架,可以更容易的实现 CQRS。下面介绍如何使用 Axon 框架来实现行程管理服务。

聚合

Axon 框架对领域驱动设计中的一些概念提供了原生的支持。以聚合来说,只需要在领域对象类上添加 @Aggregate 注解,就可以声明它是一个聚合的根实体。聚合对象类中包含了表示状态的属性,以及改变状态的方法。通过在不同的属性和方法上添加 Axon 框架的注解,就可以定义对于聚合对象的不同操作。

下面代码中的 Trip 类是行程管理服务的聚合根实体,其中用到了几个重要的 Axon 框架注解。

@AggregateIdentifier 注解表示的是聚合根实体对象的标识符,该标识符应该是全局唯一的,该注解的作用是声明属性 id 作为 Trip 对象的标识符。

@CommandHandler 注解用来声明处理命令的构造器或方法,命令处理器方法的第一个参数表示了所接受的命令类型。如果该注解添加在构造器上,则说明在处理该命令时,会创建聚合根实体的一个新对象。命令处理器中包含的是处理请求的业务逻辑。

@EventSourcingHandler 注解用来声明事件源技术中事件的处理器,事件处理器中包含的是改变对象状态的逻辑。在事件源技术中,对象状态的所有修改都以事件的形式来描述。只需要重放所有的事件,再应用事件处理器中所做的修改,就得到了对象的当前状态。

@Aggregatepublic class Trip {@AggregateIdentifierprivate String id;private TripState state;@CommandHandlerpublic Trip(final CreateTripCommand command) {apply(new TripCreatedEvent(command.getTripId(), command.getTripDetails()));}@CommandHandlerpublic void handle(final CancelTripCommand command) {if (this.state != TripState.CREATED) {throw new IllegalTripStateException(this.state, TripState.CANCELLED);}apply(new TripCancelledEvent(command.getTripId()));}@CommandHandlerpublic void handle(final ConfirmTripCommand command) {if (this.state != TripState.CREATED) {throw new IllegalTripStateException(this.state, TripState.CONFIRMED);}apply(new TripConfirmedEvent(command.getTripId()));}@EventSourcingHandlerpublic void on(final TripCreatedEvent event) {this.id = event.getTripId();this.state = TripState.CREATED;}@EventSourcingHandlerpublic void on(final TripCancelledEvent event) {this.state = TripState.CANCELLED;}@EventSourcingHandlerpublic void on(final TripConfirmedEvent event) {this.state = TripState.CONFIRMED;}protected Trip() {}}

命令处理器

命令处理器所处理的命令只是简单的 POJO 对象,命令对象通常表示来自客户端的请求。下面代码中的 ConfirmTripCommand 类表示的是确认行程的命令,命令对象中需要包含一个属性来声明处理该命令的聚合对象的标识符,这是通过 @TargetAggregateIdentifier 注解来表示的。

@Data@NoArgsConstructor@RequiredArgsConstructorpublic class ConfirmTripCommand {@NonNull@TargetAggregateIdentifierprivate String tripId;}

在命令处理器中,AggregateLifecycle 类的静态方法 apply 用来发布事件消息,所发布的事件会触发对应的事件源处理器,从而改变对象的状态。在处理 ConfirmTripCommand 命令的 handle 方法中,首先检查当前行程对象的状态是否合法,如果合法的话,则使用 apply 方法来发布 TripConfirmedEvent 事件。而 TripConfirmedEvent 事件的处理器,把当前行程对象的状态修改为 CONFIRMED。

从这里可以看出来 CQRS 技术中命令模型的基本处理流程,那就是命令处理器发布事件,事件处理器更新对象状态。

一般来说,会有一个命令负责创建聚合对象,通过添加了 @CommandHandler 注解的构造器来实现。在 Trip 类中,CreateTripCommand 命令用来创建 Trip 对象。在构造器所发布的 TripCreatedEvent 事件的处理器中,必须要设置聚合对象的标识符,这样后续的命令才能找到对应的对象。

发送命令

Axon 框架中的 CommandGateway 用来发送命令。下面代码中的 TripService 使用 CommandGateway 的 send 方法来发送命令,send 方法的返回值是 CompletableFuture 对象,也可以使用 sendAndWait 方法来发送命令并等待完成。

@Servicepublic class TripService {@AutowiredCommandGateway commandGateway;public CompletableFuture<String> createTrip(String passengerId, PositionVO startPos,PositionVO endPos) {String tripId = UUID.randomUUID().toString();TripDetails tripDetails = new TripDetails(passengerId, startPos, endPos);CreateTripCommand command = new CreateTripCommand(tripId, tripDetails);return commandGateway.send(command);}public CompletableFuture<Void> cancelTrip(String tripId) {CancelTripCommand command = new CancelTripCommand(tripId);return commandGateway.send(command);}public CompletableFuture<Void> confirmTrip(String tripId) {ConfirmTripCommand command = new ConfirmTripCommand(tripId);return commandGateway.send(command);}}

查询模型

在设计查询模型时,要满足的需求是查询历史行程的相关信息。为了查询方便,使用关系型数据库来保存行程数据,通过 Spring Data JPA 来实现。下面代码中的 TripView 是表示历史行程的领域对象类,同时也是 JPA 的实体类。

@Entity@Table(name = "trip_view")@Datapublic class TripView {@Idprivate String id;@Column(name = "start_pos_lng")private BigDecimal startPosLng;@Column(name = "start_pos_lat")private BigDecimal startPosLat;@Column(name = "end_pos_lng")private BigDecimal endPosLng;@Column(name = "ent_pos_lat")private BigDecimal endPosLat;@Column(name = "state")@Enumerated(EnumType.STRING)private TripState state;}

查询模型中的数据更新来自对不同事件的处理。下面代码中的 TripViewEventHandler 类中包含了不同的事件处理方法,通过 @EventHandler 注解来声明。在处理事件时,只需要根据事件的类型,对相应的 TripView 对象使用 TripViewRepository 进行修改即可。所有的修改都会保存在数据库中,与命令模型中的状态保持一致。

@Service@Transactionalpublic class TripViewEventHandler {@AutowiredTripViewRepository repository;@EventHandlerpublic void on(TripCreatedEvent event) {TripView tripView = new TripView();tripView.setId(event.getTripId());TripDetails tripDetails = event.getTripDetails();PositionVO startPos = tripDetails.getStartPos();tripView.setStartPosLng(startPos.getLng());tripView.setStartPosLat(startPos.getLat());PositionVO endPos = tripDetails.getEndPos();tripView.setEndPosLng(endPos.getLng());tripView.setEndPosLat(endPos.getLat());tripView.setState(TripState.CREATED);repository.save(tripView);}@EventHandlerpublic void on(TripCancelledEvent event) {repository.findById(event.getTripId()).ifPresent(tripView -> tripView.setState(TripState.CANCELLED));}@EventHandlerpublic void on(TripConfirmedEvent event) {repository.findById(event.getTripId()).ifPresent(tripView -> tripView.setState(TripState.CONFIRMED));}}

处理查询

Axon 框架提供了对查询的支持,由查询请求查询处理器两部分组成。查询请求是一个 POJO 对象,包含了自定义的查询元数据。查询处理器的方法通过 @QueryHandler 注解来进行声明。查询处理器方法的参数是它所能处理的查询请求对象,而返回值则是查询的处理结果。

Axon 框架支持 3 种不同类型的查询,如下表所示。

查询请求由 QueryGateway 来进行处理,应用代码调用 QueryGateway 的不同方法来执行查询,并获取结果。QueryGateway 中的方法如下表所示,类型参数 R 表示查询结果的类型。

在下面的代码中,TripService的queryTrip 方法用来处理 FetchTripQuery 类型的查询请求,查询结果以 TripSummary 对象表示。

@Servicepublic class TripService {@QueryHandlerpublic TripSummary queryTrip(final FetchTripQuery query) {return this.tripViewRepository.findById(query.getTripId()).map(tripView -> {final TripSummary tripSummary = new TripSummary();tripSummary.setId(tripView.getId());tripSummary.setStartPos(new PositionVO(tripView.getStartPosLng(),tripView.getStartPosLat()));tripSummary.setEndPos(new PositionVO(tripView.getEndPosLng(), tripView.getEndPosLat()));tripSummary.setState(tripView.getState());return tripSummary;}).orElseThrow(() -> new TripNotFoundException(query.getTripId()));}}

下面代码中的 TripController 是行程管理服务 REST API 的实现,其中的 getTrip 方法使用 QueryGateway 的 query 方法来发送点对点的 FetchTripQuery 类型的查询请求,并把查询结果返回给客户端。

@RestControllerpublic class TripController {@AutowiredQueryGateway queryGateway;@GetMapping("{id}")public CompletableFuture<TripSummary> getTrip(@PathVariable("id") final String tripId) {return this.queryGateway.query(new FetchTripQuery(tripId), TripSummary.class);}}

总结

本课时介绍了 CQRS 技术的两种实现方式,第一种方式是创建新的查询模型并进行存储,再通过事件来与更新模型中的数据保持一致;第二种方式是使用 Axon 框架提供的事件源和 CQRS 技术的支持。通过本课时的学习,你应该掌握根据应用的不同需求来选择合适的 CQRS 技术的实现方式。最后需要强调的是,CQRS 技术有其特定的适用范围,盲目使用该技术可能带来更多的问题,在使用之前需要充分调研和谨慎对待。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。