一,引入本地依赖, 注意自己的电脑是否配置有本地库,没配置问一下别人
<!--<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>--> 这个就不要了,关于 rocketmq的一切依赖都可以删掉,只需要保留以下本地库
<dependency>
<groupId>org.dxstudio</groupId>
<artifactId>dx-rocketmq-starter</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
二,配置文件增加以下内容
environment: local #这里各个环境要使用不同的环境标记,叫什么不重要,目前统一 local | dev | pron | prod
rocketmq:
consumer:
# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
pull-batch-size: 10
producer:
group: dx-dump-group
# 发送消息超时时间,默认3000
sendMessageTimeout: 3000
# 发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
# 异步消息重试此处,默认2
retryTimesWhenSendAsyncFailed: 2
# 消息最大长度,默认1024 * 1024 * 4(默认4M)
maxMessageSize: 4096
# 压缩消息阈值,默认4k(1024 * 4)
compressMessageBodyThreshold: 4096
# 是否在内部发送失败时重试另一个broker,默认false
retryNextServer: false
name-server: 192.168.222.10:9876
三,main上添加注解
为了自动注入三方库中的类。因为在配置中做了 注解拦截和修改,用于自动隔离环境,自动更改你的侦听器里的group和topic
@MapperScan("org.dxstudio.dump.core.mapper")
@EnableAsync
@SpringBootApplication
@ComponentScan("org.dxstudio.*") //加上这个
public class DumpApplication {
public static void main(String[] args) {
SpringApplication.run(DumpApplication.class, args);
}
}
四,发布消息
已经使用项目组的规则做了封装,为保持统一,虽然没有二次封装 rocketMQTemplate,也请使用封装好的 topic 和message 来
@Autowired
RocketMQTemplate rocketMQTemplate;
private SendResult sendMsg(int taskId ){
//第一个参数是标签,我们约定为事件类型,
//第二个参数是消息的key(用于在mq后台查询),这里如果是订单 直接订单号,其他使用ID即可
//第三个参数是你要发送的消息体,范型,爱传什么传什么
DxMQTpl<Integer> t = new DxMQTpl<>("task", String.valueOf( taskId ), taskId);
//没有再二次封装 rocketMQTemplate, 所以可以网上搜索这个的发送方法即可,没有特别情况一般就用 syncSend
SendResult result = rocketMQTemplate.syncSend( t.topic(), t.message() );
log.info("发送消息 {}", result) ;
return result;
}
五,消费消息
消费消息已经封装好了处理方法,继承DxMQMessageHandler<T>、并实现 RocketMQListener< DxMQMessage<T> > 即可。
@RocketMQMessageListener 这个是 rocketMQTemplate 的注释,可网上查资料,一般就用我下面的即可,改成自己要订阅的频道
@Slf4j
@RocketMQMessageListener(consumerGroup = "dx-dump-group",
topic = "dx-dump",
selectorExpression = "task",
selectorType = SelectorType.TAG,
consumeThreadMax = 1,
consumeThreadNumber = 1)
public class Consumer extends DxMQMessageHandler<Long> implements RocketMQListener< DxMQMessage<Long> > {
@Autowired
TaskScanService taskScanService;
//这里固定写 执行超类方法即可
@Override
public void onMessage( DxMQMessage<Long> msg) {
super.dispatchMessage( msg );
}
//这里是自己真正要处理的逻辑,有问题毫无犹豫抛出异常即可
@Override
protected void handleMessage(DxMQMessage<Long> message) throws Exception {
log.info("处理消息 {}", message);
// throw new RuntimeException("测试一下异常");
}
//这里是异常时要做的事情,比如标记异常。
//这里必须是重试都失败了才会最后触发,比如关闭 isRetry 和 throwException。 throwException开启会触发rocketmq的自动重发
@Override
protected void handleMaxRetriesExceeded(DxMQMessage<Long> message) {
log.error("消费失败,后续处理,比如标记异常");
}
//过滤方法,返回true不会执行handleMessage。 用于限制重复的消息消费(rocketmq不会保证不重复),比如订单已经处理过了,在这里个方法里检查好,就不应该再继续处理
@Override
protected boolean filter(DxMQMessage<Long> message) {
return super.filter(message);
}
//如果要自己修改重试规则(mq的重试是一直会有),开启true,则每次发生异常,是由代码控制 重新发消息。这样自己可以控制重试多少次(5秒间隔),达到最大次数后触发handleMaxRetriesExceeded
//这个要看自己的业务,允许异常的业务 用这个,不允许异常的,使用mq的重试
@Override
protected boolean isRetry() {
return true;
}
@Override
protected boolean throwException() {
return true;
}
}
六,附上rocketmq的重发机制
