分布式系统事务一致性解决方案(基于事务消息)

参考:https://rocketmq.apache.org/zh/docs/featureBehavior/04transactionmessage/

文章目录

    • 概要
    • 错误的方案
    • 方案一:业务方自己实现
    • 方案二:RocketMQ 事务消息
      • 什么是事务消息
      • 事务消息处理流程
      • 事务消息生命周期
      • 使用限制
      • 使用示例
      • 使用建议


概要

说起分布式事务,就会谈到 Bob 给 Smith 账户转账的问题:2 个账号, 分别处于 A、B 两个 DB,或者说 2 个不同的子系统中,Bob 的账户要扣钱,Smith 的账户要加钱,如何保证原子性?

一般思路是通过消息中间件来实现 “最终一致性”:A系统扣钱,然后发送消息给 MQ,B系统接收此消息,进行加钱。

但这里有个问题:是先update DB后发消息?还是先发消息后update DB呢?

  • 假设先update DB成功发送消息时网络问题,重发又失败,怎么办?
  • 假设先发送消息成功,update DB失败。消息已经发出去了,又不能撤回,怎么办?

所以,只要发送消息和update DB这2个操作不是原子的,无论谁先谁后,都会有问题。

错误的方案

有人可能会想到,可以把 “发送消息” 这个网络调用和 update DB 放在同一个事务中,如果发送消息失败,update DB 自动回滚。这样不就保证原子性了吗?

这个方案看似正确,但其实是错误的,原因:

  1. 把网络调用放在 DB 事务中,可能会因为网络的延时,导致 DB 长事务。严重的还会 block 整个 DB,这个风险很大。
  2. 如果发送消息失败,发送方并不知道是 MQ 没收到消息,还是已收到消息只是返回 response 的时候失败了。如果接收方已经收到消息了,而发送方认为没有收到,执行 update DB 回滚的操作,会导致 A 账户钱没有扣,B 账户的钱却增加了。

方案一:业务方自己实现

假设消息中间件没提供 “事务消息” 功能,比如 Kafka。那该如何解决这个问题?

结局方案如下:

  1. Producer 准备 1 个消息表,把 update DB 和 send message 这 2 个操作,放在一个DB事务中。
  2. 准备一个后台程序,不停地把消息表中的 message 传送给消息中间件。发送失败则不断重试,直到成功。允许消息重复,但消息不会丢。
  3. Consumer 准备 1 个判重表。处理过的消息记录在里面,实现业务的幂等性

通过以上 3 步,就基本解决了原子性问题

但此方案缺点是:需要设计 DB 消息表,同时还需要 1 个后台任务,不断扫描本地消息。导致消息的处理和业务逻辑耦合,额外增加业务方的负担。

方案二:RocketMQ 事务消息

目前各大知名电商平台和互联网公司,几乎都是使用 “事务消息” 这种方案来实现 “最终一致性” 的。这种方式适用的业务场景广泛,且比较靠谱。

什么是事务消息

事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

事务消息处理流程

事务消息交互流程如下图所示。

在这里插入图片描述

  1. 生产者将消息发送至Apache RocketMQ服务端。

  2. Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

  3. 生产者开始执行本地事务逻辑。

  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查(服务端回查的间隔时间和最大回查次数,请参见参数限制)。

  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

事务消息生命周期

在这里插入图片描述

  • 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。

  • 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。

  • 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。

  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试。

  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制。

使用限制

  1. 消息类型一致性
    事务消息仅支持在 MessageType 为 Transaction 的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。

  2. 消费事务性
    Apache RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。

  3. 中间状态可见性
    Apache RocketMQ 事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景。

  4. 事务超时机制
    Apache RocketMQ 事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。事务超时时间,请参见参数限制。

使用示例

创建主题

Apache RocketMQ 5.0版本下创建主题操作,推荐使用mqadmin工具,需要注意的是,对于消息类型需要通过属性参数添加。示例如下:

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=TRANSACTION

发送消息

事务消息相比普通消息发送时需要修改以下几点:

  • 发送事务消息前,需要开启事务并关联本地的事务执行。

  • 为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。

创建事务主题

NORMAL类型Topic不支持TRANSACTION类型消息,生产消息会报错。

./bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster -a +message.type=TRANSACTION

  • -c 集群名称
  • -t Topic名称
  • -n nameserver地址
  • -a 额外属性,本例给主题添加了message.type为TRANSACTION的属性用来支持事务消息

以Java语言为例,使用事务消息示例参考如下:

    //演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。
    private static boolean checkOrderById(String orderId) {
        return true;
    }
    
    //演示demo,模拟本地事务的执行结果。
    private static boolean doLocalTransaction() {
        return true;
    }
    
    public static void main(String[] args) throws ClientException {
        ClientServiceProvider provider = new ClientServiceProvider();
        MessageBuilder messageBuilder = new MessageBuilderImpl();
        //构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。
        Producer producer = provider.newProducerBuilder()
                .setTransactionChecker(messageView -> {
                    /**
                     * 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。
                     * 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。
                     */
                    final String orderId = messageView.getProperties().get("OrderId");
                    if (Strings.isNullOrEmpty(orderId)) {
                        // 错误的消息,直接返回Rollback。
                        return TransactionResolution.ROLLBACK;
                    }
                    return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
                })
                .build();
        //开启事务分支。
        final Transaction transaction;
        try {
            transaction = producer.beginTransaction();
        } catch (ClientException e) {
            e.printStackTrace();
            //事务分支开启失败,直接退出。
            return;
        }
        Message message = messageBuilder.setTopic("topic")
                //设置消息索引键,可根据关键字精确查找某条消息。
                .setKeys("messageKey")
                //设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("messageTag")
                //一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。
                .addProperty("OrderId", "xxx")
                //消息体。
                .setBody("messageBody".getBytes())
                .build();
        //发送半事务消息
        final SendReceipt sendReceipt;
        try {
            sendReceipt = producer.send(message, transaction);
        } catch (ClientException e) {
            //半事务消息发送失败,事务可以直接退出并回滚。
            return;
        }
        
        /**
         * 执行本地事务,并确定本地事务结果。
         * 1. 如果本地事务提交成功,则提交消息事务。
         * 2. 如果本地事务提交失败,则回滚消息事务。
         * 3. 如果本地事务未知异常,则不处理,等待事务消息回查。
         */
        boolean localTransactionOk = doLocalTransaction();
        if (localTransactionOk) {
            try {
                transaction.commit();
            } catch (ClientException e) {
                // 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。
                e.printStackTrace();
            }
        } else {
            try {
                transaction.rollback();
            } catch (ClientException e) {
                // 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。
                e.printStackTrace();
            }
        }
    }

使用建议

  1. 避免大量未决事务导致超时

    Apache RocketMQ支持在事务提交阶段异常的情况下发起事务回查,保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损,容易导致事务处理延迟。

  2. 正确处理"进行中"的事务

    消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。 一般出现消息回查时事务正在处理的原因为:事务执行较慢,消息回查太快。解决方案如下:

    • 将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。

    • 程序能正确识别正在进行中的事务。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/582159.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

进迭时空宣布开源RISC-V芯片的AI核心技术

仟江水商业电讯&#xff08;4月29日 北京 委托发布&#xff09;4月29日&#xff0c;在“创芯生生不息——进迭时空2024年度产品发布会”上&#xff0c;进迭时空CEO、创始人&#xff0c;陈志坚博士宣布将开源进迭时空在自研RISC-V AI CPU上的核心技术&#xff0c;包括AI扩展指令…

数据科学导论续

一、大数据采集的流程和方法 大数据采集的流程和方法 系统日志采集方法 很多互联网企业都有自己的海量数据采集工具&#xff0c;多用于系统日志采集&#xff0c;例如&#xff1a; Flume&#xff1a;分布式日志收集系统&#xff0c;最初由Cloudera开发&#xff0c;现是Apache的…

SPSS之判别分析

SPSS的判别分析过程中默认使用的是Fisher判别法和Bayes判别法&#xff0c;并以前者为主&#xff0c;在指定选项后也可以给出Bayes判别法的结果。 SPSS中判别分析在【分析】—【分类】—【判别】中完成。选定类别变量放入【分组变量】框中&#xff0c;单击定义范围(D)按钮给出类…

《Fundamentals of Power Electronics》——Buck、Boost、Buck-Boost三个电路的CCM-DCM工作特性总结

Buck、Boost、Buck-Boost这三个电路的CCM-DCM工作特性总结如下表所示&#xff1a; Buck、Boost、Buck-Boost这三个电路工作在DCM模式下电压传输比的对比图如下所示&#xff1a; 由上图可知&#xff0c;Buck-Boost电路的工作特性是一条斜率为的直线&#xff0c;Buck电路和Boost电…

IDEA 中的奇技淫巧

IDEA 中的奇技淫巧 书签 在使用ctrlalt方向键跳转时&#xff0c;或者追踪代码时&#xff0c;经常遇到的情况是层级太多&#xff0c;找不到代码的初始位置&#xff0c;入口。可以通过书签的形式去打上一个标记&#xff0c;后续可以直接跳转到书签位置。 标记书签&#xff1a;c…

Qt窗口

QMainWindow Qt 窗⼝ 是通过 QMainWindow类 来实现的。 QMainWindow 是⼀个为⽤⼾提供主窗⼝程序的类&#xff0c;继承⾃ QWidget 类&#xff0c;并且提供了⼀个预定义的 布局。QMainWindow 包含 ⼀个菜单栏&#xff08;menu bar&#xff09;、多个⼯具栏(tool bars)、多个浮动…

Python并发编程:揭开多线程与异步编程的神秘面纱

第一章&#xff1a;并发编程导论 1.1 并发与并行概念解析 1.1.1 并发性与并行性的区别 想象一下繁忙的厨房中多位厨师同时准备不同的菜肴——即使他们共享有限的空间和资源&#xff0c;也能协同工作&#xff0c;这就是并发性的一个生动比喻。并发性意味着多个任务在同一时间…

getchar和putchar的用法

getchar() 和 putchar() 是一对字符输入/输出函数.他们通常比scanf() 和printf() 函数更快更便捷。 getchar()不带任何参数&#xff0c;其实getchar() 和putchar()与scanf() 和printf()功能相似。 接下来博主简单的跟大家解释一下。 1.getchar 通常把输入的字符赋予一个字符变…

uReport2 报表设计

最近刚好用到这个报表工具&#xff0c;刚开始接触都还不会用&#xff0c;学习了一下&#xff0c;在这边做个记录。 数据源 目前报表框架支持和使用的数据源连接有两种方式&#xff1a;添加数据库连接 和添加内置数据源连接。 进入报表设计 http://IP:端口/context-path/urepor…

计算机网络之传输层TCP\UDP协议

UDP协议 用户数据报协议UDP概述 UDP只在IP数据报服务之上增加了很少功能&#xff0c;即复用分用和差错检测功能 UDP的主要特点&#xff1a; UDP是无连接的&#xff0c;减少开销和发送数据之前的时延 UDP使用最大努力交付&#xff0c;即不保证可靠交付&#xff0c;可靠性由U…

一款神奇的地理数据可视化python库

在地理信息系统&#xff08;GIS&#xff09;和地理数据可视化领域&#xff0c;Python的易用性和强大的库支持使其成为处理地理数据的理想选择之一。今天我们介绍Cartopy库&#xff0c;它为地理数据可视化提供了强大的支持。无论是对于GIS专业人士还是对地理数据可视化感兴趣的初…

网络编程——TCP

socket socket类型 流式套接字(SOCK_STREAM) TCP 提供了一个面向连接、可靠的数据传输服务&#xff0c;数据无差错、无重复、无丢失、无失序的发送且按发送顺序接收。内设置流量控制&#xff0c;避免数据流淹没慢的接收方。数据被看作是字节流&#xff0c;无长度限制。 数据报…

异常处理方式

在定义方法时&#xff0c;首先需要先对参数数据进行合法判断 数据若不合法&#xff0c;使用抛出异常的方式来告诉调用者&#xff0c;传递合法的数据进来 在方法内使用 throw 抛出指定异常对象&#xff0c;throw new XxxException(“异常产生原因”) 创建的是运行时异常&…

ROS学习笔记(14)拉普拉斯变换和PID

0.前提 近些时间在对睿抗的ROS仿真赛进行小组安排&#xff0c;对小组成员进行了一些安排&#xff0c;也要求他们以本次比赛写下自己的比赛经历博客&#xff0c;他们的培训由我来安排和负责&#xff0c;因此我得加吧油&#xff0c;起码保证我的进度得快过他们&#xff0c;才能安…

使用yolov8+QT+onnrunxtime进行开发的注意事项

1、本来想尝试做一个C的yolov8在QT5.15.2的应用&#xff1b; 因此&#xff0c;在实现这个目标的时候&#xff0c;我先用了yolov8自带的export进行导出&#xff0c;使用的代码很简单&#xff0c;如下所示&#xff1a; import os from ultralytics import YOLO# model YOLO(&q…

SpringBoot 快速开始 Dubbo RPC

文章目录 SpringBoot 快速开始 Dubbo RPC下载 Nacos项目启动项目的创建创建主项目接口定义服务的创建Dubbo 服务提供者的创建服务的消费者创建 添加依赖给 Provider、Consumer 添加依赖 开始写代码定义接口在 Provider 中实现在 Consumer 里面使用创建启动类 注册中心配置启动 …

绘唐科技AIGC怎么激活

绘唐科技AIGC怎么激活绘唐科技AIGC怎么激活绘唐科技AIGC怎么激活绘唐科技AIGC怎么激活 这里激活免费3天体验 Docshttps://qvfbz6lhqnd.feishu.cn/wiki/D3YLwmIzmivZ7BkDij6coVcbn7W

架构师技能:技术深度硬实力透过问题看本质--深入分析nginx偶尔502错误根因

以架构师的能力标准去分析每个问题&#xff0c;过后由表及里分析问题的本质&#xff0c;复盘总结经验&#xff0c;并把总结内容记录下来。当你解决各种各样的问题&#xff0c;也就积累了丰富的解决问题的经验&#xff0c;解决问题的能力也将自然得到极大的提升。励志做架构师的…

Spring IOC(二)

1. Bean的定义与获取 1.1 定义Bean 在Spring 中定义Bean的方式主要有三种&#xff1a; 1、基于XML配置文件的方式&#xff08;了解&#xff09;&#xff1a;通常会在配置文件中使用<bean>标签来定义Bean&#xff0c;并设置Bean的属性、依赖关系等信息。 2、基于注解的方…

C语言程序设计(一)

1、指令、程序、软件 2、计算机语言&#xff1a;机器语言、汇编语言、高级语言 高级语言的发展&#xff1a;非结构化语言&#xff08;FORTRAN&#xff09;、结构化语言&#xff08;C语言&#xff09;、面向对象的语言&#xff08;C、面向对象&#xff09; 3、源程序、二进制…
最新文章