商务网站开发步骤百度推广网页版
SpringBoot整合RocketMQ发送延时消息
springboot rocketmq 延迟消息
Windows下RocketMQ安装及可视化界面搭建
Java 客户端
RocketMQ延迟消息
项目背景
项目中有延时消息的需求,综合考量RocketMQ比较适合。
-
RocketMQ支持多维度的延迟级别
-
支持多种消息类型
-
基于分布式架构时间高可用
-
有重试机制
-
高吞吐量,流处理
基本使用
依赖
<!--rocketMq依赖-->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>自己选择版本</version>
</dependency>
配置
#这两个是必须配置项
rocketmq:name-server: localhost:9876producer:group: xxx
生产者
@Data
@Component
@ConfigurationProperties(prefix = "mq")
public class MqConfig {/*** 是否开启mq延迟消息*/private Boolean enabled;/*** 缺货mq主题*/private String topic = "topic";/*** 发送超时时间,单位:毫秒(ms)*/private Long timeout = 3000L;/*** 延时级别为(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)其下标,从1开始*/private Integer delayLevel = 3;
}
@Component
@Slf4j
public class Producer {@Autowiredprivate MqConfig mqConfig;@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送延迟消息* @param msg*/public void sendScheduleMsg(String msg) {Message<String> message = MessageBuilder.withPayload(msg).build();String topic = mqConfig.getTopic();SendResult sendResult = rocketMQTemplate.syncSend(topic, message, mqConfig.getTimeout(), mqConfig.getDelayLevel());if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {log.info("【sendScheduleMsg】 发送延迟[{}]消息[{}]成功", topic, msg);} else {log.warn("【sendScheduleMsg】 发送延迟[{}]消息[{}]失败", topic, msg);}}
}
消费者
@Component
@RocketMQMessageListener(consumerGroup = "group", topic = "topic")
@Slf4j
public class Consumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {String msg = new String(messageExt.getBody(), StandardCharsets.UTF_8);log.info("收到延迟消息:{}", msg);}
}
可以看到broker是收到数据的