博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SpringBoot 2整合rocketMQ 顺序消息、延时消息、批量消息
阅读量:2169 次
发布时间:2019-05-01

本文共 4925 字,大约阅读时间需要 16 分钟。

顺序消息

RocketMQ 默认消息同时的,但是在有些场景需要队列中的消息有顺序性,比如:发送了一条添加数据的消息,然后接着发送了一条更改数据的消息,此时肯定需要添加数据的消息先被消费,在rocketmq中可以指定消费者的consumeMode,其有两个属性CONCURRENTLY、ORDERLY,ORDERLY便是让一个队列中的数据有顺序的消费,先进先出。

RocketMQ 一个Topic其实是分成了4个队列,所以在消息投递时,需要顺序的消息,要放在同一个队列中。

消费者

指定 consumeMode = ConsumeMode.ORDERLY
 

@Slf4j@Component@RocketMQMessageListener(topic = "bxctopic", consumerGroup = "bxctopic1",consumeMode = ConsumeMode.ORDERLY)public class JmsOrderConsumer implements RocketMQListener
{ @Override public void onMessage(MessageExt msg) { try { int time = (int)(Math.random()*5+1)*100; Thread.sleep(time); log.info("消费者2-->"+ Thread.currentThread().getName() + "," + "队列" + msg.getQueueId() + msg.getQueueId()+",内容"+ new String(msg.getBody())+" "+time); } catch (Exception e) { e.printStackTrace(); } }}

投递消息,需要顺序的放在一个队列中

@RestControllerpublic class OrderMsg {    @Autowired    private RocketMQTemplate rocketMQTemplate;    /*    根据code选择队列存放     */    class TopicMessageQueueSelector implements MessageQueueSelector{        @Override        public MessageQueue select(List
mqs, Message message, Object o) { int orderId = (int) o; long index = orderId % mqs.size(); return mqs.get((int) index); } } @GetMapping("/OrderQueue") public String sendMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { JmsMsg msg1 = new JmsMsg(200, "AAAAAAAA"); Message message1 = new Message("bxctopic", JSONObject.toJSONString(msg1).getBytes()); rocketMQTemplate.getProducer().send(message1,new TopicMessageQueueSelector(),msg1.getCode()); JmsMsg msg2 = new JmsMsg(200, "BBBBB"); Message message2 = new Message("bxctopic", JSONObject.toJSONString(msg2).getBytes()); rocketMQTemplate.getProducer().send(message2,new TopicMessageQueueSelector(),msg2.getCode()); JmsMsg msg3 = new JmsMsg(200, "CCCCC"); Message message3 = new Message("bxctopic", JSONObject.toJSONString(msg3).getBytes()); rocketMQTemplate.getProducer().send(message3,new TopicMessageQueueSelector(),msg3.getCode()); JmsMsg msg4 = new JmsMsg(201, "AAAAAAAA"); Message message4 = new Message("bxctopic", JSONObject.toJSONString(msg4).getBytes()); rocketMQTemplate.getProducer().send(message4,new TopicMessageQueueSelector(),msg4.getCode()); JmsMsg msg5 = new JmsMsg(201, "BBBBB"); Message message5 = new Message("bxctopic", JSONObject.toJSONString(msg5).getBytes()); rocketMQTemplate.getProducer().send(message5,new TopicMessageQueueSelector(),msg5.getCode()); JmsMsg msg6 = new JmsMsg(201, "CCCCC"); Message message6 = new Message("bxctopic", JSONObject.toJSONString(msg6).getBytes()); rocketMQTemplate.getProducer().send(message6,new TopicMessageQueueSelector(),msg6.getCode()); return "success"; }}

访问接口效果

在这里插入图片描述

延迟消息

1、延迟消息介绍

延迟消息也叫做定时消息,比如在电商项目的交易系统中,当用户下单之后超过一段时间之后仍然没有支付,此时就需要将该订单关l闭。要实现该功能的话,可以在用户创建订单时就发送一条包含订单内容的延迟消息,该消息在一段时间之后投递给消息消费者,当消息消费者接收到该消息后,判断该订单的支付状态,如果处于未支付状态,则将该订单关闭。

RocketMQ的延迟消息实现非常简单,只需要发送消息前设置延迟的时间,延迟时间存在十八个等级(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h),调用setDelayTimeLevel()设置与时间相对应的延迟级别即可。

2、同步消息延迟

生产端:

/**  * 发送延迟消息  * 消息内容为json格式  */ public void sendMsgByJsonDelay(String topic, OrderExt orderExt) throws JsonProcessingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {     //发送同步消息,消息内容将orderExt转为json     Message
message = MessageBuilder.withPayload(orderExt).build(); //指定发送超时时间(毫秒)和延迟等级 this.rocketMQTemplate.syncSend(topic,message,1000,3); ​ System.out.printf("send msg : %s",orderExt); }

 

异步消息延迟

 

生产端:

/**  * 发送异步延迟消息  * 消息内容为json格式  */ public void sendAsyncMsgByJsonDelay(String topic, OrderExt orderExt) throws JsonProcessingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {     //消息内容将orderExt转为json     String json = this.rocketMQTemplate.getObjectMapper().writeValueAsString(orderExt);     org.apache.rocketmq.common.message.Message message = new org.apache.rocketmq.common.message.Message(topic,json.getBytes(Charset.forName("utf-8")));     //设置延迟等级     message.setDelayTimeLevel(3);     //发送异步消息     this.rocketMQTemplate.getProducer().send(message,new SendCallback() {         @Override         public void onSuccess(SendResult sendResult) {             System.out.println(sendResult);         } ​         @Override         public void onException(Throwable throwable) {             System.out.println(throwable.getMessage());         }     }); ​ ​     System.out.printf("send msg : %s",orderExt); }

批量发送消息可提高传递小消息的性能。同时也需要满足以下特征:

  • 批量消息要求必要具有同一topic、相同消息配置
  • 不支持延时消息
  • 建议一个批量消息最好不要超过1MB大小,

在发送批量消息时先构建一个消息对象集合,然后调用send(Collection msg)方法即可。由于批量消息的1M限制,所以一般情况下在集合中添加消息需要先计算当前集合中消息对象的大小是否超过限制,如果超过限制也可以使用分割消息的方式进行多次批量发送。

转载地址:http://shxzb.baihongyu.com/

你可能感兴趣的文章
【LEETCODE】143- Reorder List [Python]
查看>>
【LEETCODE】82- Remove Duplicates from Sorted List II [Python]
查看>>
【LEETCODE】86- Partition List [Python]
查看>>
【LEETCODE】147- Insertion Sort List [Python]
查看>>
【算法】- 动态规划的编织艺术
查看>>
用 TensorFlow 让你的机器人唱首原创给你听
查看>>
对比学习用 Keras 搭建 CNN RNN 等常用神经网络
查看>>
深度学习的主要应用举例
查看>>
word2vec 模型思想和代码实现
查看>>
怎样做情感分析
查看>>
用深度神经网络处理NER命名实体识别问题
查看>>
用 RNN 训练语言模型生成文本
查看>>
RNN与机器翻译
查看>>
用 Recursive Neural Networks 得到分析树
查看>>
RNN的高级应用
查看>>
TensorFlow-7-TensorBoard Embedding可视化
查看>>
轻松看懂机器学习十大常用算法
查看>>
一个框架解决几乎所有机器学习问题
查看>>
特征工程怎么做
查看>>
机器学习算法应用中常用技巧-1
查看>>