博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
最佳实践:如何基于MNS实现事务消息
阅读量:6207 次
发布时间:2019-06-21

本文共 3626 字,大约阅读时间需要 12 分钟。

事务消息的背景

有时候我们需要实现本地操作和消息发送的事务一致性功能。即:消息发送成功,则本地操作成功;反之,如果消息发送失败,本地操作失败(成功也需要rollback)。保证不出现操作成功但消息发送失败;或者操作失败但消息发送成功的情况;

另外,消费端,我们也希望消息一定被成功处理一次,不会因为消息端程序崩溃而导致消息没有成功处理,进而需要人工重置消费进度。

 

解决方案

利用消息服务MNS的延迟消息来实现。

准备工作

创建两个队列:

1.事务消息队列

消息的有效期小于消息延迟时间。即如果生产者不主动修改(提交)消息可见时间,消息对消费者不可见;

2.操作日志队列

记录事务消息的操作记录信息。消息延迟时间为事务操作超时时间。日志队列中的消息确认(删除)后将对消费者不可见。

 

具体步骤

1.发送一条事务准备消息到事务消息队列;

2.写操作日志信息到操作日志队列,日志中包含步骤1消息的消息句柄;

3.执行本地事务操作;

4.如果步骤3成功,提交消息(消息对消费者可见);反之,回滚消息;

5.确认步骤2中的操作日志(删除该日志消息);

6.步骤4后,消费者可以接收到事务消息;

7.消费者处理消息;

8.消费者确认删除消息;

 

如下图:

7df0a2f8929e58548b01b836546ad6fa60cdd096

异常分析:

生产者异常(例如:进程重启):

A.读取操作日志队列超时未确认日志

B.检查事务结果

C.如果检查得到事务已经成功,则提交消息(重复提交无副作用,同一句柄的消息只能成功提交一次)

D.确认操作日志

 

消费者异常(例如:进程重启):

消息服务提供至少保证消费一次的特性,只要步骤8不成功,消息在一段时间后可以继续可见,被当前消费者或者其他消费者处理。

 

消息服务不可达(例如:断网)

消息发送和接收处理状态以及操作日志都在消息服务端,消息服务本身具备高可靠和高可用的特点,所以只要网络恢复,事务可以继续,能保证只要生产者:操作成功,则消费者一定能够拿到消息并处理成功;或操作失败, 则消费者收不到消息的最终一致性。

 

代码实现:

MNS最新的Java SDK (1.1.5)()中的TransactionQueue支持上述事务消息方案。使用者只需要在TransactionOperationsTransactionChecker 两个接口添加业务操作和检查逻辑,就可以方便的实现事务消息。

Demo 代码

public class TransactionMessageDemo{    public class MyTransactionChecker implements TransactionChecker    {        public boolean checkTransactionStatus(Message message)        {            boolean checkResult = false;            String messageHandler = message.getReceiptHandle();            try{                //TODO: check if the messageHandler related transaction is success.                checkResult = true;            }catch(Exception e)            {                checkResult = false;            }            return checkResult;        }    }    public class MyTransactionOperations implements TransactionOperations    {        public boolean doTransaction(Message message)        {            boolean transactionResult = false;            String messageHandler = message.getReceiptHandle();            String messageBody = message.getMessageBody();            try{                //TODO: do your local transaction according to the messageHandler and messageBody here.                transactionResult = true;            }catch(Exception e)            {                transactionResult = false;            }            return transactionResult;        }    }    public static void main(String[] args) {        System.out.println("Start TransactionMessageDemo");        String transQueueName = "transQueueName";        String accessKeyId = ServiceSettings.getMNSAccessKeyId();        String accessKeySecret = ServiceSettings.getMNSAccessKeySecret();        String endpoint = ServiceSettings.getMNSAccountEndpoint();	        CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);        MNSClient client = account.getMNSClient(); //this client need only initialize once        // create queue for transaction queue.        QueueMeta queueMeta = new QueueMeta();        queueMeta.setQueueName(transQueueName);        queueMeta.setPollingWaitSeconds(15);                TransactionMessageDemo demo = new TransactionMessageDemo();        TransactionChecker transChecker = demo.new MyTransactionChecker();        TransactionOperations transOperations = demo.new MyTransactionOperations();                TransactionQueue transQueue = client.createTransQueue(queueMeta, transChecker);        // do transaction.        Message msg = new Message();        String messageBody = "TransactionMessageDemo";        msg.setMessageBody(messageBody);         transQueue.sendTransMessage(msg, transOperations);        // delete queue and close client if we won't use them.        transQueue.delete();        // close the client at the end.        client.close();        System.out.println("End TransactionMessageDemo");    }}

转载地址:http://ahhca.baihongyu.com/

你可能感兴趣的文章
IntelliJ IDEA2017 修改缓存文件的路径
查看>>
无向图的点双连通分量(tarjan模板)
查看>>
关于Unity中UI中的Image节点以及它的Image组件
查看>>
神奇的图像处理算法
查看>>
java maven 操作 收集的一些命令
查看>>
springcloud-05-ribbon中不使用eureka
查看>>
SPOJ QTREE6 lct
查看>>
Java - 对象(object) 具体解释
查看>>
[.NET跨平台]Jexus独立版本的便利与过程中的一些坑
查看>>
swift菜鸟入门视频教程-09-类和结构体
查看>>
Oracle数据库备份恢复,巡检须要关注的对象设置以及相关恢复概述
查看>>
jQuery序列化表单为JSON对象
查看>>
使用malloc分别分配2KB的空间,然后用realloc调整为6KB的内存空间,打印指针地址...
查看>>
Redis 它是什么?它用来做什么?它的优势与短板如何?
查看>>
vue13过滤器 debounce延迟、limitBy、filterBy、orderBy
查看>>
MSSQL分组取后每一组的最新一条记录
查看>>
jQ效果:简单的手风琴效果
查看>>
Android Studio环境搭建
查看>>
Java Persistence with MyBatis 小结2
查看>>
python socket 多人聊天室
查看>>