目前,如果SpringBoot项目想要集成RocketMQ,有两种集成方式:1、rocketmq-client,2、rocketmq-spring-boot-starter。本文会一步一步教大家如何集成这两种,实现优雅的在项目中使用RocketMQ,并且对比这两者的优缺点。
第一种:rocketmq-client
这一种方式,也是目前最成熟、使用最多的集成方式。
依赖
<!-- rocketmq --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.0</version> </dependency>
消息处理接口 MessageProcessor
public interface MessageProcessor { /** * 处理消息 * * @param messageExt * @return */ boolean handle(MessageExt messageExt); }
消息处理实现类 MessageProcessorImpl
@Service public class MessageProcessorImpl implements MessageProcessor { @Override public boolean handle(MessageExt messageExt) { // 收到的body(消息体),字节类型,需转为String String result = new String(messageExt.getBody()); System.out.println("MessageProcessor|监听到了消息,消息为:" + result); return true; } }
上面这两个类,功能职责以及代码都很简单,当监听到了消息,我们就可以在 MessageProcessorImpl 中执行我们的逻辑,这里我就省略了,只是做个最简单的集成。
配置文件
server.port=8088 rocketmq.producer.groupName=ProducerGroup rocketmq.producer.namesrvAddr=127.0.0.1:9876 rocketmq.producer.instanceName=ProducerGroup rocketmq.producer.topic=TestTopic2020 rocketmq.producer.tag=test rocketmq.producer.maxMessageSize=131072 rocketmq.producer.sendMsgTimeout=10000 rocketmq.consumer.namesrvAddr=127.0.0.1:9876 rocketmq.consumer.groupName=ProducerGroup rocketmq.consumer.topic=TestTopic2020 rocketmq.consumer.tag=test rocketmq.consumer.consumeThreadMin=20 rocketmq.consumer.consumeThreadMax=64
配置文件中的“groupName”、“instanceName”、“topic”和“tag”都是直接自己命名一个就行,不需要事先创建授权之类的,RocketMQ默认会自动创建,不需要去控制台手动创建。至于命名的规范,每个公司可能都不太一样,这里我就不遵循这些规范了。
生产者 RocketMQProducer
定义生产者,在向MQ发送消息时,会用到。
@Component @Slf4j public class RocketMQProducer { @Value("${rocketmq.producer.groupName}") private String groupName; @Value("${rocketmq.producer.namesrvAddr}") private String nameserAddr; @Value("${rocketmq.producer.instanceName}") private String instanceName; @Value("${rocketmq.producer.maxMessageSize}") private int maxMessageSize; @Value("${rocketmq.producer.sendMsgTimeout}") private int sendMsgTimeout; private DefaultMQProducer producer; @Bean public DefaultMQProducer getRocketMQProducer() { producer = new DefaultMQProducer(groupName); producer.setNamesrvAddr(nameserAddr); producer.setInstanceName(instanceName); producer.setMaxMessageSize(maxMessageSize); producer.setSendMsgTimeout(sendMsgTimeout); producer.setVipChannelEnabled(false); try { producer.start(); log.info("RocketMQProducer|生产者创建并启动完成,groupName:{},nameserAddr:{}", groupName, nameserAddr); } catch (MQClientException e) { log.error("RocketMQProducer|生产者启动异常", e); e.printStackTrace(); } return producer; } }
消息监听 MessageListen
每当有消息推过来时,都会执行这个类里面的代码。MessageExt 中包含消息的topic、tag等诸多信息。如果消费失败result为false,会进入重试阶段。
public class MessageListen implements MessageListenerConcurrently { @Autowired private MessageProcessor messageProcessor; public void setMessageProcessor(MessageProcessor messageProcessor) { this.messageProcessor = messageProcessor; } @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt ext = list.get(0); boolean result = messageProcessor.handle(ext); if (!result) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
消费者 RocketMQConsumer
@Component public class RocketMQConsumer { private static final Logger log = LoggerFactory.getLogger(RocketMQConsumer.class); @Autowired private MessageProcessor messageProcessor; @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.consumer.groupName}") private String groupName; @Value("${rocketmq.consumer.topic}") private String topic; @Value("${rocketmq.consumer.tag}") private String tag; @Value("${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value("${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Bean public DefaultMQPushConsumer getRocketMQConsumer() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.setVipChannelEnabled(false); // 我们自己实现的监听类 MessageListen messageListen = new MessageListen(); messageListen.setMessageProcessor(messageProcessor); consumer.registerMessageListener(messageListen); try { consumer.subscribe(topic, tag); consumer.start(); log.info("RocketMQConsumer|消费者创建并启动完成,groupName:{},topic:{}", groupName, topic); } catch (MQClientException e) { log.error("RocketMQConsumer|消费者启动异常", e); e.printStackTrace(); } return consumer; } }
在这个类中,主要是定义消费者信息,同时,订阅我们需要消费的topic,以及指定消息处理类,去处理消息。
生产消息测试
@RestController public class TestController { @Autowired private ApplicationContext context; @GetMapping("/test") public void TestSend() { DefaultMQProducer producer = context.getBean(DefaultMQProducer.class); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String body = "Hello RocketMQ,now is " + sdf.format(new Date()) + "."; Message message = new Message("TestTopic2020", "test", body.getBytes()); try { producer.send(message); } catch (Exception e) { e.printStackTrace(); } } }
到这,不知道你是否发现了一个问题:只能处理一个topic下的消息吗?实际应用中,若还需要处理多个不同topic的消息怎么办?别急,我们接着简单改造下~
改造 消息处理接口
public interface MessageProcessor<T> { /** * 处理消息 * * @param messageExt * @return */ boolean handle(T messageExt); }
这里改成泛型,因为不同的消息体,对应着不同的对象。例如,订单消息MQ,那我处理的时候,肯定是有个订单类会方便很多的。
改造 消息处理实现类
MessageProcessorImpl 重命名为 UserMessageProcessor
@Service public class UserMessageProcessor implements MessageProcessor<User> { @Override public boolean handle(User user) { System.out.println("UserMessageProcessorImpl|监听到了消息,消息为:" + user); System.out.println("用户名:" + user.getUsername()); return true; } }
重命名只是为了方便区分。同时,将MessageExt换成了User这个对象(自己定义的),当然,实际业务中,一般都是DTO,很少是PO。
新增 消息处理实现类
既然要测试同时处理不同topic的消息,那我们再新建一个。
@Service public class OrderMessageProcessor implements MessageProcessor<Order> { @Override public boolean handle(Order order) { System.out.println("OrderMessageProcessorImpl|监听到了消息,消息为:" + order); return true; } }
改造 消息监听类
public class MessageListen implements MessageListenerConcurrently { private Map<String, MessageProcessor> handleMap = new HashMap<>(); public void registerHandler(String tags, MessageProcessor messageProcessor) { handleMap.put(tags, messageProcessor); } @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt ext = list.get(0); // 获取消息 String message = new String(ext.getBody()); // 获取tag String tag = ext.getTags(); // 根据tag,获取对应的消息处理类 MessageProcessor messageProcessor = handleMap.get(tag); Class<?> clzz = getMessageType(messageProcessor); boolean result = messageProcessor.handle(JSON.parseObject(message,clzz)); if (!result) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } private static Class<?> getMessageType(Object o) { Type[] interfaces = AopUtils.getTargetClass(o).getGenericInterfaces(); for (Type type : interfaces) { if (type instanceof ParameterizedType) { ParameterizedType parameterizedType = (ParameterizedType) type; Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { return (Class<?>) actualTypeArguments[0]; } else { return Object.class; } } } return Object.class; } }
因为要处理多个Topic,所以单个MessageProcessor已经无法满足了,所以需要引入了一个Map,用来记录Tag和消息处理类的对应关系,同时,registerHandler方法也做了相应的改动。当消息推过来,根据Tag获取到了对应的处理类之后,我们还要去获取这个泛型类的实际类,以实现在调用handle方法的时候,传入的是实现类的具体对象,其中getMessageType方法就是做这件事情,用到了反射相关知识。
新增配置文件
rocketmq.consumer.orderTopic=OrderTopic2020 rocketmq.consumer.orderTopic.tag=orderTag
改造 消费者 RocketMQConsumer
@Component public class RocketMQConsumer { private static final Logger log = LoggerFactory.getLogger(RocketMQConsumer.class); @Autowired private UserMessageProcessor userMessageProcessor; @Autowired private OrderMessageProcessor orderMessageProcessor; @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.consumer.groupName}") private String groupName; @Value("${rocketmq.consumer.topic}") private String topic; @Value("${rocketmq.consumer.tag}") private String tag; @Value("${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value("${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Value("${rocketmq.consumer.orderTopic}") private String orderTopic; @Value("${rocketmq.consumer.orderTopic.tag}") private String orderTopicTag; /** * 如果只想消费topic下的某几个tag,以用 “||”隔开。比如:consumer.subscribe("topic2020", "Tag1 || Tag2"); * 如果想消费topic下所有的tag,用“*”。比如:consumer.subscribe("topic2020", "*"); */ @Bean public DefaultMQPushConsumer getRocketMQConsumer() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.setVipChannelEnabled(false); MessageListen messageListen = new MessageListen(); //在监听类中增加消息处理类,当然可以增加更多,也就是往MessageListen类中的map集合放入处理类。 messageListen.registerHandler(tag, userMessageProcessor); messageListen.registerHandler(orderTopicTag, orderMessageProcessor); consumer.registerMessageListener(messageListen); try { consumer.subscribe(topic, this.tag); consumer.subscribe(orderTopic, this.orderTopicTag); consumer.start(); log.info("RocketMQConsumer|消费者创建并启动完成,groupName:{}", groupName); } catch (MQClientException e) { log.error("RocketMQConsumer|消费者启动异常", e); e.printStackTrace(); } return consumer; } }
这里主要就是新增Topic的订阅(subscribe),以及,registerHandler注册tag和对应的消息处理类。
增加 测试类
@Autowired private ApplicationContext context; @GetMapping("/test") public void TestSend() { DefaultMQProducer producer = context.getBean(DefaultMQProducer.class); User user = User.builder().username("张三").age(18).build(); Message message = new Message("TestTopic2020", "test", JSON.toJSONString(user).getBytes()); try { producer.send(message); System.out.println("用户消息生产成功~"); } catch (Exception e) { e.printStackTrace(); } } @GetMapping("/testOrder") public void testOrder(){ DefaultMQProducer producer2 = context.getBean(DefaultMQProducer.class); Order order = Order.builder().orderId(38193632323128L).createTime(new Date()).build(); Message orderMessage = new Message("OrderTopic2020","orderTag", JSON.toJSONString(order).getBytes()); try { producer2.send(orderMessage); System.out.println("订单消息生产成功~"); } catch (Exception e) { e.printStackTrace(); } }
到这,处理多个消息已经实现了,但比较简陋,也有很多不太合理、可完善的地方,例如:handleMap应该是Topic与对应消息处理类的映射关系,而不是Tag,因为可能不同的Topic有相同的Tag。本文的最后,会再进行完善。先继续讲讲第二种集成方式。
第二种:rocketmq-spring-boot-starter
目前这个依赖,还比较简单,也有不少问题。可能会因版本变化有变动,下文只是针对2.1.1版本下的集成方案。
依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.1</version> </dependency>
配置
rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=ProducerGroup rocketmq.producer.topic=TestTopic2020 rocketmq.producer.sendMessageTimeout=10000
这里的key的命名,需要按照上面这种格式。
生产消息
@Resource private RocketMQTemplate rocketMQTemplate; @GetMapping("/starterTest") public void starterTest() { User user = User.builder().username("张三").age(18).build(); SendResult sendResult = rocketMQTemplate.syncSend("TestTopic2020", JSON.toJSONString(user)); System.out.println("发送结果:" + sendResult); }
在rocketMQTemplate这个类中,集成了很多生产消息的方法,例如:同步、异步等。
消费者
@Service @RocketMQMessageListener(topic = "${rocketmq.producer.topic}", consumerGroup = "${rocketmq.producer.group}") public class UserMessageProcessor implements RocketMQListener<User> { @Override public void onMessage(User user) { System.out.println("UserMessageProcessorImpl|监听到了消息,消息为:" + user); System.out.println("用户名:" + user.getUsername()); } }
相较于客户端,starter的方式简化了很多。上述例子,默认是并发消费的,如果需要顺序消费,可以这么设置:
@RocketMQMessageListener(topic = "${rocketmq.producer.topic}", consumerGroup = "${rocketmq.producer.group}",consumeMode = ConsumeMode.ORDERLY)
在客户端的使用时,如果消费失败,会进行重试。使用starter时,如果也想进行重试,需要这么操作
@Service @RocketMQMessageListener(topic = "${rocketmq.producer.topic}", consumerGroup = "${rocketmq.producer.group}",consumeMode = ConsumeMode.ORDERLY) public class UserMessageProcessor implements RocketMQListener<User> { @Override public void onMessage(User user) { System.out.println("UserMessageProcessorImpl|监听到了消息,消息为:" + user); System.out.println("用户名:" + user.getUsername()); throw new RuntimeException("我就是故意抛出一个异常"); } }
需要在代码中,手动抛出异常,此时会触发重试机制,默认为无限重试。
不过,截至本文完成时,也没有找到在当前版本的依赖下,如何设置重试次数的方式。最后在贴上官方文档地址:https://github.com/apache/rocketmq-spring/wiki
两种集成方式的对比
其实应该非常明了了。客户端的方式,缺点就是比较复杂,有很多代码需要实现,但换来的优点就是非常灵活,可以根据自己项目的需要,进行定制化。而对于starter来说,使用起来就非常简单了,几个注解就搞定了,但是正如上面提到的,目前还不是很完善,网上能够找到的教程也比较少,还需要我们给时间让其成长和完善。
优雅的使用RocketMQ
如果是想使用starter,那肯定优雅不起来的…这里我接着rocketmq-client的集成方式,继续优化上面的代码。在优化之前,我先说一下我想要实现/优化的地方:
- 把Tag与消息处理类的映射,改为Topic与消息处理类的映射。
- 希望能够实现自动或者半自动的监听,这样就不需要每加一个消息处理类,我还需要去加一行registerHandler,对于消息的订阅subscribe也是如此。
下面主要是改造消息监听类MessageListen和消费配置类RocketMQConsumer
改造MessageListen
先说一下思路,再贴代码。
首先把handleMap这个Map干掉,这个Map前面有讲到,主要是记录“消息”与“消息处理类”之间的对应关系,并且还说到,用Tag其实是不对的,应该用Topic,因为Topic是唯一的。对于“消息”与“消息处理类”这种关系,这个还是由我们手动配置的,这一点不变。所以,我把handleMap这个Map的作用,换成了一个工厂类去实现它,进行一个解耦,也方便我们手动配置。
@Component @Service public class TopicFactory { @Value("${rocketmq.consumer.topics.userTopic}") private String userTopic; @Value("${rocketmq.consumer.topics.orderTopic}") private String orderTopic; private static final Map<String, Class<? extends MessageProcessor<?>>> CACHE = new HashMap<>(16); @Resource private ApplicationContext applicationContext; @PostConstruct public void init() { CACHE.put(userTopic, UserMessageProcessor.class); CACHE.put(orderTopic, OrderMessageProcessor.class); } public MessageProcessor<?> getProcessor(String topicName) { Class<? extends MessageProcessor<?>> clazz = CACHE.get(topicName); if (clazz != null) { return applicationContext.getBean(clazz); } return null; } }