好友
阅读权限10
听众
最后登录1970-1-1
|
本帖最后由 especial 于 2020-8-11 15:35 编辑
一、linux安装启动rocketmq注意点
1、在安装完rocketmq之后要先启动 mqnamesrv 再启动 mqbroker
2、在启动mqbroker的时候要记得处理ip的问题,默认启动会使用docker的内网地址,需要配置成虚拟机ip
[Shell] 纯文本查看 复制代码 #启动mqnamesrv
nohup sh mqnamesrv &
#创建一个mqbroker的配置文件
echo brokerIP1=192.168.147.130 >broker.properties
#启动mqbroker 开启自动创建主题
nohup sh mqbroker -n 你的ip:9876 -c broket.properties autoCreateTopicEnable=true &
#查看是否启动成功
tail -f ~/logs/rocketmqlogs/broker.log
二、java代码通过rocketmq的事务 处理分布式事务 并封装(个人基本封装例子,可通过基于配置文件配置,更灵活简便)这里直接使用代码配置
1、引入依赖
[XML] 纯文本查看 复制代码 <dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-remoting</artifactId>
<version>4.4.0</version>
</dependency>
2、启动加载生产者实例工具类,使用的是TransactionMqProducer 使用事务 默认的是不具有事务的,并且配置事务处理的类 实现TransactionListener 这里没有独立写 直接new了
[Java] 纯文本查看 复制代码 @Configuration
public class rocketMqConfig {
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
private static final String PRODUCER_GROUP = "xd_producer_group";
@Bean
public ITransactionRocketMqUtil rocketMqUtilSs(){
TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
producer.setVipChannelEnabled(false);
producer.setNamesrvAddr(namesrvAddr);
//创建一个自定义线程工厂
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("transaction-thread-name-%s").build();
//创建一个线程池
ThreadPoolExecutor executor =
new ThreadPoolExecutor(
2, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), threadFactory);
//设置生产者线程池
producer.setExecutorService(executor);
//设置事务监听器
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
try {
LoggerUtil.info("开始事务处理");
System.out.println(1/0);
LoggerUtil.info("完成事务");
return LocalTransactionState.COMMIT_MESSAGE;
}catch (Exception e){
LoggerUtil.info("失败事务");
return LocalTransactionState.ROLLBACK_MESSAGE;
}finally {
LoggerUtil.info("事务处理结束");
return null;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
try {
LoggerUtil.info("开始事务回查");
System.out.println(1/1);
return LocalTransactionState.COMMIT_MESSAGE;
}catch (Exception e){
return LocalTransactionState.ROLLBACK_MESSAGE;
}finally {
LoggerUtil.info("事务回查结束");
}
}
});
return new TransactionRocketMqUtil(producer);
}
}
3、生产者消息发送工具类
[Java] 纯文本查看 复制代码 public class TransactionRocketMqUtil implements ITransactionRocketMqUtil {
private TransactionMQProducer producer;
public TransactionRocketMqUtil(TransactionMQProducer producer){
LoggerUtil.info("初始化类");
this.producer = producer;
start();
}
public TransactionSendResult transactionSend (String topic,String msg) throws Exception {
Message message = new Message(topic,msg.getBytes());
TransactionSendResult result = producer.sendMessageInTransaction(message,null);
return result;
}
public void shutdown(){
this.producer.shutdown();
}
/**
* 使用前先调用start方法
*/
public void start(){
try {
this.producer.start();
}catch (Exception e){
e.printStackTrace();
}
}
}
4、消费者注册封装
[Java] 纯文本查看 复制代码 @Configuration
public class ConsumerContext {
private DefaultMQPushConsumer consumer;
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@Bean
public ContextContextTool testConsumer() throws Exception {
new ContextContextTool().ContextTool();
return null;
}
}
这里我写死了消费者的类可以在 ContextTool() 方法中配置传入消费者的 类 以及组 和 主题 消费者的类需要实现MessageListenerConcurrently接口,逻辑写在consumeMessage方法中
[Java] 纯文本查看 复制代码 public class ContextContextTool {
@Value("${apache.rocketmq.namesrvAddr}")
private static String namesrvAddr;
public void ContextTool() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_group");
consumer.setNamesrvAddr(namesrvAddr);
consumer.subscribe("test_topic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
LoggerUtil.info("消费成功"+msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
LoggerUtil.info("创建消费者并启动监听:test_topic");
}
}
5、消息发送
[Java] 纯文本查看 复制代码 public void sendProducerTransaction() throws Exception {
TransactionSendResult result = transactionRocketMqUtil.transactionSend("test_topic","test_msg");
LoggerUtil.info("发送消息到rocketmq事务监听"+result);
}
6、结果
其中可能还存在某些问题 欢迎大佬指正
|
-
测试
-
免费评分
-
查看全部评分
|
发帖前要善用【论坛搜索】功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。 |
|
|
|
|