吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 2959|回复: 6
收起左侧

[Java 转载] 基于rocketmq的分布式事务

[复制链接]
especial 发表于 2020-8-11 15:30
本帖最后由 especial 于 2020-8-11 15:35 编辑

一、linux安装启动rocketmq注意点
    1、在安装完rocketmq之后要先启动 mqnamesrv 再启动 mqbroker
    2、在启动mqbroker的时候要记得处理ip的问题,默认启动会使用docker的内网地址,需要配置成虚拟机ip
[Shell] 纯文本查看 复制代码
#启动mqnamesrv
nohup sh mqnamesrv &
#创建一个mqbroker的配置文件
echo brokerIP1=192.168.147.130 >broker.properties
#启动mqbroker 开启自动创建主题
nohup sh mqbroker -n 你的ip:9876 -c broket.properties autoCreateTopicEnable=true &
#查看是否启动成功
tail -f ~/logs/rocketmqlogs/broker.log 





二、java代码通过rocketmq的事务 处理分布式事务 并封装(个人基本封装例子,可通过基于配置文件配置,更灵活简便)这里直接使用代码配置


1、引入依赖
[XML] 纯文本查看 复制代码
<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-remoting</artifactId>
            <version>4.4.0</version>
        </dependency>



2、启动加载生产者实例工具类,使用的是TransactionMqProducer  使用事务  默认的是不具有事务的,并且配置事务处理的类 实现TransactionListener  这里没有独立写 直接new了



[Java] 纯文本查看 复制代码
@Configuration
public class rocketMqConfig {

    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    private static final String PRODUCER_GROUP = "xd_producer_group";

    @Bean
    public ITransactionRocketMqUtil rocketMqUtilSs(){

        TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
        producer.setVipChannelEnabled(false);
        producer.setNamesrvAddr(namesrvAddr);
        //创建一个自定义线程工厂
        ThreadFactory threadFactory =
                new ThreadFactoryBuilder().setNameFormat("transaction-thread-name-%s").build();
        //创建一个线程池
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(
                        2, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), threadFactory);
        //设置生产者线程池
        producer.setExecutorService(executor);
        //设置事务监听器
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                try {
                    LoggerUtil.info("开始事务处理");
                    System.out.println(1/0);
                    LoggerUtil.info("完成事务");
                    return LocalTransactionState.COMMIT_MESSAGE;
                }catch (Exception e){
                    LoggerUtil.info("失败事务");
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }finally {
                    LoggerUtil.info("事务处理结束");
                    return null;
                }
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                try {
                    LoggerUtil.info("开始事务回查");
                    System.out.println(1/1);
                    return LocalTransactionState.COMMIT_MESSAGE;
                }catch (Exception e){
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }finally {
                    LoggerUtil.info("事务回查结束");
                }
            }
        });
        return new TransactionRocketMqUtil(producer);
    }
}



3、生产者消息发送工具类


[Java] 纯文本查看 复制代码
public class TransactionRocketMqUtil implements ITransactionRocketMqUtil {

    private TransactionMQProducer producer;

    public TransactionRocketMqUtil(TransactionMQProducer producer){
        LoggerUtil.info("初始化类");
        this.producer = producer;
        start();
    }

    public TransactionSendResult transactionSend (String topic,String msg) throws Exception {
        Message message = new Message(topic,msg.getBytes());
        TransactionSendResult result =  producer.sendMessageInTransaction(message,null);
        return result;
    }

    public void shutdown(){
        this.producer.shutdown();
    }

    /**
     * 使用前先调用start方法
     */
    public void start(){
        try {
            this.producer.start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}



4、消费者注册封装


[Java] 纯文本查看 复制代码
@Configuration
public class ConsumerContext {

    private DefaultMQPushConsumer consumer;

    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;
    @Bean
    public ContextContextTool testConsumer() throws Exception {
        new ContextContextTool().ContextTool();
        return null;
    }
}



这里我写死了消费者的类可以在 ContextTool() 方法中配置传入消费者的 类 以及组 和 主题   消费者的类需要实现MessageListenerConcurrently接口,逻辑写在consumeMessage方法中


[Java] 纯文本查看 复制代码
public class ContextContextTool {
    @Value("${apache.rocketmq.namesrvAddr}")
    private static String namesrvAddr;
    public void ContextTool() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_group");
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.subscribe("test_topic","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    LoggerUtil.info("消费成功"+msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        LoggerUtil.info("创建消费者并启动监听:test_topic");
    }
}



5、消息发送


[Java] 纯文本查看 复制代码
 public void sendProducerTransaction() throws Exception {
        TransactionSendResult result = transactionRocketMqUtil.transactionSend("test_topic","test_msg");
        LoggerUtil.info("发送消息到rocketmq事务监听"+result);
    }



6、结果


其中可能还存在某些问题 欢迎大佬指正



测试

测试
图片.png

免费评分

参与人数 2吾爱币 +5 热心值 +2 收起 理由
alian2018 + 1 谢谢@Thanks!
苏紫方璇 + 5 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

头像被屏蔽
偶尔平凡 发表于 2020-8-12 12:14
提示: 作者被禁止或删除 内容自动屏蔽
JsFuck攻城狮 发表于 2020-8-12 16:59
怎么保证消息的丢失问题,之前业务上的做法是把消息持久化到库里
 楼主| especial 发表于 2020-8-13 10:01
JsFuck攻城狮 发表于 2020-8-12 16:59
怎么保证消息的丢失问题,之前业务上的做法是把消息持久化到库里

1、rocketmq节点集群
2、rocketmq事务是一个2段式提交,事务开始只是提交了预备消息  在事务开始前提交,只有事务完成才会commit 提交真正的消息,在没有收到commit事务确认,预备消息会进行回查,确认后再进行消费,消费失败会重复调用消费,分布式事务做到的是数据最终一致。如果说在预备消息发送就丢失了,则第一段事务是根本不会执行的。
3、如果在第一段提交成功 commit成功后 还没消费 消息丢失,这种情况只能通过附加业务冗余来保证稳定性,但是rocketmq是十分稳定的,可以这么说,你整个分布式系统挂了,redis 挂了rocketmq都不会挂。
(个人见解)
c03xp 发表于 2020-8-13 10:36
长知识了
drundragon 发表于 2020-8-21 16:09
谢谢分享
头像被屏蔽
xiadongming 发表于 2021-9-4 14:28
提示: 作者被禁止或删除 内容自动屏蔽
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-1-14 02:20

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表