本文共 3626 字,大约阅读时间需要 12 分钟。
事务消息的背景:
有时候我们需要实现本地操作和消息发送的事务一致性功能。即:消息发送成功,则本地操作成功;反之,如果消息发送失败,本地操作失败(成功也需要rollback)。保证不出现操作成功但消息发送失败;或者操作失败但消息发送成功的情况;
另外,消费端,我们也希望消息一定被成功处理一次,不会因为消息端程序崩溃而导致消息没有成功处理,进而需要人工重置消费进度。
解决方案:
利用消息服务MNS的延迟消息来实现。
准备工作:
创建两个队列:
1.事务消息队列
消息的有效期小于消息延迟时间。即如果生产者不主动修改(提交)消息可见时间,消息对消费者不可见;
2.操作日志队列
记录事务消息的操作记录信息。消息延迟时间为事务操作超时时间。日志队列中的消息确认(删除)后将对消费者不可见。
具体步骤:
1.发送一条事务准备消息到事务消息队列;
2.写操作日志信息到操作日志队列,日志中包含步骤1消息的消息句柄;
3.执行本地事务操作;
4.如果步骤3成功,提交消息(消息对消费者可见);反之,回滚消息;
5.确认步骤2中的操作日志(删除该日志消息);
6.步骤4后,消费者可以接收到事务消息;
7.消费者处理消息;
8.消费者确认删除消息;
如下图:
异常分析:
生产者异常(例如:进程重启):
A.读取操作日志队列超时未确认日志
B.检查事务结果
C.如果检查得到事务已经成功,则提交消息(重复提交无副作用,同一句柄的消息只能成功提交一次)
D.确认操作日志
消费者异常(例如:进程重启):
消息服务提供至少保证消费一次的特性,只要步骤8不成功,消息在一段时间后可以继续可见,被当前消费者或者其他消费者处理。
消息服务不可达(例如:断网)
消息发送和接收处理状态以及操作日志都在消息服务端,消息服务本身具备高可靠和高可用的特点,所以只要网络恢复,事务可以继续,能保证只要生产者:操作成功,则消费者一定能够拿到消息并处理成功;或操作失败, 则消费者收不到消息的最终一致性。
代码实现:
MNS最新的Java SDK (1.1.5)()中的TransactionQueue支持上述事务消息方案。使用者只需要在TransactionOperations和TransactionChecker 两个接口添加业务操作和检查逻辑,就可以方便的实现事务消息。
Demo 代码
public class TransactionMessageDemo{ public class MyTransactionChecker implements TransactionChecker { public boolean checkTransactionStatus(Message message) { boolean checkResult = false; String messageHandler = message.getReceiptHandle(); try{ //TODO: check if the messageHandler related transaction is success. checkResult = true; }catch(Exception e) { checkResult = false; } return checkResult; } } public class MyTransactionOperations implements TransactionOperations { public boolean doTransaction(Message message) { boolean transactionResult = false; String messageHandler = message.getReceiptHandle(); String messageBody = message.getMessageBody(); try{ //TODO: do your local transaction according to the messageHandler and messageBody here. transactionResult = true; }catch(Exception e) { transactionResult = false; } return transactionResult; } } public static void main(String[] args) { System.out.println("Start TransactionMessageDemo"); String transQueueName = "transQueueName"; String accessKeyId = ServiceSettings.getMNSAccessKeyId(); String accessKeySecret = ServiceSettings.getMNSAccessKeySecret(); String endpoint = ServiceSettings.getMNSAccountEndpoint(); CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint); MNSClient client = account.getMNSClient(); //this client need only initialize once // create queue for transaction queue. QueueMeta queueMeta = new QueueMeta(); queueMeta.setQueueName(transQueueName); queueMeta.setPollingWaitSeconds(15); TransactionMessageDemo demo = new TransactionMessageDemo(); TransactionChecker transChecker = demo.new MyTransactionChecker(); TransactionOperations transOperations = demo.new MyTransactionOperations(); TransactionQueue transQueue = client.createTransQueue(queueMeta, transChecker); // do transaction. Message msg = new Message(); String messageBody = "TransactionMessageDemo"; msg.setMessageBody(messageBody); transQueue.sendTransMessage(msg, transOperations); // delete queue and close client if we won't use them. transQueue.delete(); // close the client at the end. client.close(); System.out.println("End TransactionMessageDemo"); }}
转载地址:http://ahhca.baihongyu.com/