SpringBoot集成RocketMQ

一个笨蛋的博客 / 2023-05-06 / 原文

添加pom.xml依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

创建消息消费者

@Component
@Slf4j
public class MessageConsumerService {
	@Component
	@RocketMQMessageListener(topic = RocketMQConstant.TOPIC, consumerGroup = "consumer-group1")
	public class Consumer1 implements RocketMQListener<UserChange> {
		@Override
		public void onMessage(UserChange message) {
			log.info("收到信息:{}", JSON.toJSONString(message));
		}
	}

	@Component
	@RocketMQMessageListener(topic = RocketMQConstant.TOPIC, consumerGroup = "consumer-group2")
	public class Consumer2 implements RocketMQListener<String> {
		@Override
		public void onMessage(String message) {
			log.info("收到信息:{}", message);
		}
	}

	@Component
	@RocketMQMessageListener(topic = RocketMQConstant.TOPIC, consumerGroup = "consumer-group3")
	public class Consumer3 implements RocketMQListener<MessageExt> {
		@Override
		public void onMessage(MessageExt message) {
			log.info("收到信息:{}", new String(message.getBody()));
		}
	}
}

发送消息

@RestController
public class TestController {
	/**
	 * destination包括2个部分信息,topic和tags,可以只有topic
	 */
	private final String destination = RocketMQConstant.TOPIC + ":" + RocketMQConstant.TAGS;
	@Autowired
	private RocketMQTemplate rocketMQTemplate;

	/**
	 * 同步消息发送
	 * 
	 * @return
	 */
	@GetMapping("send")
	public SendResult send() {
		UserChange change = UserChange.builder().userName("张三").remark("密码变更").build();
		Message<UserChange> message = MessageBuilder.withPayload(change).setHeader(RocketMQHeaders.KEYS, "key").build();
		return rocketMQTemplate.syncSend(destination, message);
	}
}

日志输出

2023-05-06 19:45:15.042 [ConsumeMessageThread_consumer-group2_1] INFO com.zhi.demo.consumer.MessageConsumerService$Consumer2.onMessage 38 -- 收到信息:{"userName":"张三","remark":"密码变更"}
2023-05-06 19:45:15.042 [ConsumeMessageThread_consumer-group3_1] INFO com.zhi.demo.consumer.MessageConsumerService$Consumer3.onMessage 47 -- 收到信息:{"userName":"张三","remark":"密码变更"}
2023-05-06 19:45:15.080 [ConsumeMessageThread_consumer-group1_1] INFO com.zhi.demo.consumer.MessageConsumerService$Consumer1.onMessage 29 -- 收到信息:{"remark":"密码变更","userName":"张三"}
 

 

常见错误:

1、connect to 172.17.183.41:10911 failed

防火墙需要开启10911端口

firewall-cmd --zone=public --add-port=10911/tcp --permanent
firewall-cmd --reload

2、sendDefaultImpl call timeout

消息发送超时,可调整rocketmq.producer.send-message-timeout参数,默认3秒