【消息队列】SpringBoot集成RocketMQ

想要优雅的在项目中使用RocketMQ?看这篇就够了!

目前,如果SpringBoot项目想要集成RocketMQ,有两种集成方式:1、rocketmq-client,2、rocketmq-spring-boot-starter。本文会一步一步教大家如何集成这两种,实现优雅的在项目中使用RocketMQ,并且对比这两者的优缺点。

第一种:rocketmq-client

这一种方式,也是目前最成熟、使用最多的集成方式。

依赖

<!-- rocketmq -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.0</version>
</dependency>

 

消息处理接口 MessageProcessor

public interface MessageProcessor {
    /**
     * 处理消息
     *
     * @param messageExt
     * @return
     */
    boolean handle(MessageExt messageExt);
}

 

消息处理实现类 MessageProcessorImpl

@Service
public class MessageProcessorImpl implements MessageProcessor {

    @Override
    public boolean handle(MessageExt messageExt) {
        // 收到的body(消息体),字节类型,需转为String
        String result = new String(messageExt.getBody());
        System.out.println("MessageProcessor|监听到了消息,消息为:" + result);
        return true;
    }
}

 

上面这两个类,功能职责以及代码都很简单,当监听到了消息,我们就可以在 MessageProcessorImpl 中执行我们的逻辑,这里我就省略了,只是做个最简单的集成。

配置文件

server.port=8088

rocketmq.producer.groupName=ProducerGroup
rocketmq.producer.namesrvAddr=127.0.0.1:9876
rocketmq.producer.instanceName=ProducerGroup
rocketmq.producer.topic=TestTopic2020
rocketmq.producer.tag=test
rocketmq.producer.maxMessageSize=131072
rocketmq.producer.sendMsgTimeout=10000

rocketmq.consumer.namesrvAddr=127.0.0.1:9876
rocketmq.consumer.groupName=ProducerGroup
rocketmq.consumer.topic=TestTopic2020
rocketmq.consumer.tag=test
rocketmq.consumer.consumeThreadMin=20
rocketmq.consumer.consumeThreadMax=64

 

配置文件中的“groupName”、“instanceName”、“topic”和“tag”都是直接自己命名一个就行,不需要事先创建授权之类的,RocketMQ默认会自动创建,不需要去控制台手动创建。至于命名的规范,每个公司可能都不太一样,这里我就不遵循这些规范了。

生产者 RocketMQProducer

定义生产者,在向MQ发送消息时,会用到。

@Component
@Slf4j
public class RocketMQProducer {

    @Value("${rocketmq.producer.groupName}")
    private String groupName;
    @Value("${rocketmq.producer.namesrvAddr}")
    private String nameserAddr;
    @Value("${rocketmq.producer.instanceName}")
    private String instanceName;
    @Value("${rocketmq.producer.maxMessageSize}")
    private int maxMessageSize;
    @Value("${rocketmq.producer.sendMsgTimeout}")
    private int sendMsgTimeout;

    private DefaultMQProducer producer;

    @Bean
    public DefaultMQProducer getRocketMQProducer() {

        producer = new DefaultMQProducer(groupName);
        producer.setNamesrvAddr(nameserAddr);
        producer.setInstanceName(instanceName);
        producer.setMaxMessageSize(maxMessageSize);
        producer.setSendMsgTimeout(sendMsgTimeout);
        producer.setVipChannelEnabled(false);

        try {
            producer.start();
            log.info("RocketMQProducer|生产者创建并启动完成,groupName:{},nameserAddr:{}", groupName, nameserAddr);
        } catch (MQClientException e) {
            log.error("RocketMQProducer|生产者启动异常", e);
            e.printStackTrace();
        }
        return producer;
    }

}

 

消息监听 MessageListen

每当有消息推过来时,都会执行这个类里面的代码。MessageExt 中包含消息的topic、tag等诸多信息。如果消费失败result为false,会进入重试阶段。

public class MessageListen implements MessageListenerConcurrently {
    @Autowired
    private MessageProcessor messageProcessor;

    public void setMessageProcessor(MessageProcessor messageProcessor) {
        this.messageProcessor = messageProcessor;
    }

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        MessageExt ext = list.get(0);
        boolean result = messageProcessor.handle(ext);
        if (!result) {
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

 

消费者 RocketMQConsumer

@Component
public class RocketMQConsumer {
    private static final Logger log = LoggerFactory.getLogger(RocketMQConsumer.class);
    @Autowired
    private MessageProcessor messageProcessor;

    @Value("${rocketmq.consumer.namesrvAddr}")
    private String namesrvAddr;
    @Value("${rocketmq.consumer.groupName}")
    private String groupName;
    @Value("${rocketmq.consumer.topic}")
    private String topic;
    @Value("${rocketmq.consumer.tag}")
    private String tag;
    @Value("${rocketmq.consumer.consumeThreadMin}")
    private int consumeThreadMin;
    @Value("${rocketmq.consumer.consumeThreadMax}")
    private int consumeThreadMax;

    @Bean
    public DefaultMQPushConsumer getRocketMQConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.setVipChannelEnabled(false);
        // 我们自己实现的监听类
        MessageListen messageListen = new MessageListen();
        messageListen.setMessageProcessor(messageProcessor);
        consumer.registerMessageListener(messageListen);
        try {
            consumer.subscribe(topic, tag);
            consumer.start();
            log.info("RocketMQConsumer|消费者创建并启动完成,groupName:{},topic:{}", groupName, topic);
        } catch (MQClientException e) {
            log.error("RocketMQConsumer|消费者启动异常", e);
            e.printStackTrace();
        }
        return consumer;
    }
}

 

在这个类中,主要是定义消费者信息,同时,订阅我们需要消费的topic,以及指定消息处理类,去处理消息。

生产消息测试

@RestController
public class TestController {


    @Autowired
    private ApplicationContext context;

    @GetMapping("/test")
    public void TestSend() {
        DefaultMQProducer producer = context.getBean(DefaultMQProducer.class);

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String body = "Hello RocketMQ,now is " + sdf.format(new Date()) + ".";
        Message message = new Message("TestTopic2020", "test", body.getBytes());
        try {
            producer.send(message);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

到这,不知道你是否发现了一个问题:只能处理一个topic下的消息吗?实际应用中,若还需要处理多个不同topic的消息怎么办?别急,我们接着简单改造下~

改造 消息处理接口

public interface MessageProcessor<T> {
    /**
     * 处理消息
     *
     * @param messageExt
     * @return
     */
    boolean handle(T messageExt);
}

 

这里改成泛型,因为不同的消息体,对应着不同的对象。例如,订单消息MQ,那我处理的时候,肯定是有个订单类会方便很多的。

改造 消息处理实现类

MessageProcessorImpl 重命名为 UserMessageProcessor

@Service
public class UserMessageProcessor implements MessageProcessor<User> {

    @Override
    public boolean handle(User user) {
        System.out.println("UserMessageProcessorImpl|监听到了消息,消息为:" + user);
        System.out.println("用户名:" + user.getUsername());
        return true;
    }
}

 

重命名只是为了方便区分。同时,将MessageExt换成了User这个对象(自己定义的),当然,实际业务中,一般都是DTO,很少是PO。

新增 消息处理实现类

既然要测试同时处理不同topic的消息,那我们再新建一个。

@Service
public class OrderMessageProcessor implements MessageProcessor<Order> {

    @Override
    public boolean handle(Order order) {
        System.out.println("OrderMessageProcessorImpl|监听到了消息,消息为:" + order);
        return true;
    }
}

 

改造 消息监听类

public class MessageListen implements MessageListenerConcurrently {

    private Map<String, MessageProcessor> handleMap = new HashMap<>();

    public void registerHandler(String tags, MessageProcessor messageProcessor) {
        handleMap.put(tags, messageProcessor);
    }

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        MessageExt ext = list.get(0);
        // 获取消息
        String message = new String(ext.getBody());
        // 获取tag
        String tag = ext.getTags();
        // 根据tag,获取对应的消息处理类
        MessageProcessor messageProcessor = handleMap.get(tag);
        Class<?> clzz = getMessageType(messageProcessor);
        boolean result = messageProcessor.handle(JSON.parseObject(message,clzz));
        if (!result) {
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    private static Class<?> getMessageType(Object o) {
        Type[] interfaces = AopUtils.getTargetClass(o).getGenericInterfaces();
        for (Type type : interfaces) {
            if (type instanceof ParameterizedType) {
                ParameterizedType parameterizedType = (ParameterizedType) type;

                Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
                if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
                    return (Class<?>) actualTypeArguments[0];
                } else {
                    return Object.class;
                }

            }
        }

        return Object.class;
    }
}

 

因为要处理多个Topic,所以单个MessageProcessor已经无法满足了,所以需要引入了一个Map,用来记录Tag和消息处理类的对应关系,同时,registerHandler方法也做了相应的改动。当消息推过来,根据Tag获取到了对应的处理类之后,我们还要去获取这个泛型类的实际类,以实现在调用handle方法的时候,传入的是实现类的具体对象,其中getMessageType方法就是做这件事情,用到了反射相关知识。

新增配置文件

rocketmq.consumer.orderTopic=OrderTopic2020
rocketmq.consumer.orderTopic.tag=orderTag

 

改造 消费者 RocketMQConsumer

@Component
public class RocketMQConsumer {
    private static final Logger log = LoggerFactory.getLogger(RocketMQConsumer.class);

    @Autowired
    private UserMessageProcessor userMessageProcessor;

    @Autowired
    private OrderMessageProcessor orderMessageProcessor;

    @Value("${rocketmq.consumer.namesrvAddr}")
    private String namesrvAddr;
    @Value("${rocketmq.consumer.groupName}")
    private String groupName;
    @Value("${rocketmq.consumer.topic}")
    private String topic;
    @Value("${rocketmq.consumer.tag}")
    private String tag;
    @Value("${rocketmq.consumer.consumeThreadMin}")
    private int consumeThreadMin;
    @Value("${rocketmq.consumer.consumeThreadMax}")
    private int consumeThreadMax;

    @Value("${rocketmq.consumer.orderTopic}")
    private String orderTopic;
    @Value("${rocketmq.consumer.orderTopic.tag}")
    private String orderTopicTag;

    /**
     * 如果只想消费topic下的某几个tag,以用 “||”隔开。比如:consumer.subscribe("topic2020", "Tag1 || Tag2");
     * 如果想消费topic下所有的tag,用“*”。比如:consumer.subscribe("topic2020", "*");
     */
    @Bean
    public DefaultMQPushConsumer getRocketMQConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.setVipChannelEnabled(false);
        MessageListen messageListen = new MessageListen();
        //在监听类中增加消息处理类,当然可以增加更多,也就是往MessageListen类中的map集合放入处理类。
        messageListen.registerHandler(tag, userMessageProcessor);
        messageListen.registerHandler(orderTopicTag, orderMessageProcessor);
        consumer.registerMessageListener(messageListen);
        try {
            consumer.subscribe(topic, this.tag);
            consumer.subscribe(orderTopic, this.orderTopicTag);
            consumer.start();
            log.info("RocketMQConsumer|消费者创建并启动完成,groupName:{}", groupName);
        } catch (MQClientException e) {
            log.error("RocketMQConsumer|消费者启动异常", e);
            e.printStackTrace();
        }
        return consumer;
    }
}

 

这里主要就是新增Topic的订阅(subscribe),以及,registerHandler注册tag和对应的消息处理类。

增加 测试类

@Autowired
private ApplicationContext context;

@GetMapping("/test")
public void TestSend() {
    DefaultMQProducer producer = context.getBean(DefaultMQProducer.class);
    User user = User.builder().username("张三").age(18).build();
    Message message = new Message("TestTopic2020", "test", JSON.toJSONString(user).getBytes());
    try {
        producer.send(message);
        System.out.println("用户消息生产成功~");
    } catch (Exception e) {
        e.printStackTrace();
    }
}

@GetMapping("/testOrder")
public void testOrder(){
    DefaultMQProducer producer2 = context.getBean(DefaultMQProducer.class);
    Order order = Order.builder().orderId(38193632323128L).createTime(new Date()).build();
    Message orderMessage = new Message("OrderTopic2020","orderTag", JSON.toJSONString(order).getBytes());
    try {
        producer2.send(orderMessage);
        System.out.println("订单消息生产成功~");
    } catch (Exception e) {
        e.printStackTrace();
    }
}

 

效果

到这,处理多个消息已经实现了,但比较简陋,也有很多不太合理、可完善的地方,例如:handleMap应该是Topic与对应消息处理类的映射关系,而不是Tag,因为可能不同的Topic有相同的Tag。本文的最后,会再进行完善。先继续讲讲第二种集成方式。

第二种:rocketmq-spring-boot-starter

目前这个依赖,还比较简单,也有不少问题。可能会因版本变化有变动,下文只是针对2.1.1版本下的集成方案。

依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>

 

配置

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=ProducerGroup
rocketmq.producer.topic=TestTopic2020
rocketmq.producer.sendMessageTimeout=10000

 

这里的key的命名,需要按照上面这种格式。

生产消息

@Resource
private RocketMQTemplate rocketMQTemplate;

@GetMapping("/starterTest")
public void starterTest() {
    User user = User.builder().username("张三").age(18).build();
    SendResult sendResult = rocketMQTemplate.syncSend("TestTopic2020", JSON.toJSONString(user));
    System.out.println("发送结果:" + sendResult);
}

 

在rocketMQTemplate这个类中,集成了很多生产消息的方法,例如:同步、异步等。

消费者

@Service
@RocketMQMessageListener(topic = "${rocketmq.producer.topic}",
        consumerGroup = "${rocketmq.producer.group}")
public class UserMessageProcessor implements RocketMQListener<User> {

    @Override
    public void onMessage(User user) {
        System.out.println("UserMessageProcessorImpl|监听到了消息,消息为:" + user);
        System.out.println("用户名:" + user.getUsername());
    }
}

 

相较于客户端,starter的方式简化了很多。上述例子,默认是并发消费的,如果需要顺序消费,可以这么设置:

@RocketMQMessageListener(topic = "${rocketmq.producer.topic}",
        consumerGroup = "${rocketmq.producer.group}",consumeMode = ConsumeMode.ORDERLY)

 

在客户端的使用时,如果消费失败,会进行重试。使用starter时,如果也想进行重试,需要这么操作

@Service
@RocketMQMessageListener(topic = "${rocketmq.producer.topic}",
        consumerGroup = "${rocketmq.producer.group}",consumeMode = ConsumeMode.ORDERLY)
public class UserMessageProcessor implements RocketMQListener<User> {

    @Override
    public void onMessage(User user) {
        System.out.println("UserMessageProcessorImpl|监听到了消息,消息为:" + user);
        System.out.println("用户名:" + user.getUsername());
        throw new RuntimeException("我就是故意抛出一个异常");
    }
}

 

需要在代码中,手动抛出异常,此时会触发重试机制,默认为无限重试。

不过,截至本文完成时,也没有找到在当前版本的依赖下,如何设置重试次数的方式。最后在贴上官方文档地址:https://github.com/apache/rocketmq-spring/wiki

两种集成方式的对比

其实应该非常明了了。客户端的方式,缺点就是比较复杂,有很多代码需要实现,但换来的优点就是非常灵活,可以根据自己项目的需要,进行定制化。而对于starter来说,使用起来就非常简单了,几个注解就搞定了,但是正如上面提到的,目前还不是很完善,网上能够找到的教程也比较少,还需要我们给时间让其成长和完善。

优雅的使用RocketMQ

如果是想使用starter,那肯定优雅不起来的…这里我接着rocketmq-client的集成方式,继续优化上面的代码。在优化之前,我先说一下我想要实现/优化的地方:

  • 把Tag与消息处理类的映射,改为Topic与消息处理类的映射。
  • 希望能够实现自动或者半自动的监听,这样就不需要每加一个消息处理类,我还需要去加一行registerHandler,对于消息的订阅subscribe也是如此。

下面主要是改造消息监听类MessageListen和消费配置类RocketMQConsumer

改造MessageListen

先说一下思路,再贴代码。

首先把handleMap这个Map干掉,这个Map前面有讲到,主要是记录“消息”与“消息处理类”之间的对应关系,并且还说到,用Tag其实是不对的,应该用Topic,因为Topic是唯一的。对于“消息”与“消息处理类”这种关系,这个还是由我们手动配置的,这一点不变。所以,我把handleMap这个Map的作用,换成了一个工厂类去实现它,进行一个解耦,也方便我们手动配置。

@Component
@Service
public class TopicFactory {

    @Value("${rocketmq.consumer.topics.userTopic}")
    private String userTopic;

    @Value("${rocketmq.consumer.topics.orderTopic}")
    private String orderTopic;

    private static final Map<String, Class<? extends MessageProcessor<?>>> CACHE = new HashMap<>(16);

    @Resource
    private ApplicationContext applicationContext;


    @PostConstruct
    public void init() {
        CACHE.put(userTopic, UserMessageProcessor.class);
        CACHE.put(orderTopic, OrderMessageProcessor.class);
    }

    public MessageProcessor<?> getProcessor(String topicName) {
        Class<? extends MessageProcessor<?>> clazz = CACHE.get(topicName);
        if (clazz != null) {
            return applicationContext.getBean(clazz);
        }
        return null;
    }
}

 

隐藏内容,您需要满足以下条件方可查看
End

人已赞赏
消息队列编程语言

【消息队列】RocketMQ核心概念名词扫盲

2020-11-30 12:48:09

Java基础编程语言

【JVM系列】JVM启动参数与性能调优

2020-6-7 19:19:59

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
今日签到
有新私信 私信列表
搜索