博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ笔记(7):如何清理无效的延时消息?
阅读量:6148 次
发布时间:2019-06-21

本文共 8007 字,大约阅读时间需要 26 分钟。

ActiveMQ的延时消息是一个让人又爱又恨的功能,具体使用可参考上篇,在很多需要消息延时投递的业务场景十分有用,但是也有一个缺陷,在一些大访问量的场景,如果瞬间向MQ发送海量的延时消息,超过MQ的调度能力,就会造成很多消息到了该投递的时刻,却没有投递出去,形成积压,一直停留在ActiveMQ web控制台的Scheduled面板中。

下面的代码演示了,如何清理activemq中的延时消息(包括:全部清空及清空指定时间段的延时消息),这也是目前唯一可行的办法。

为了演示方便,先封装一个小工具类:

package cn.mwee.utils.mq;import cn.mwee.utils.list.ListUtil;import cn.mwee.utils.log4j2.MwLogger;import org.apache.commons.lang.StringUtils;import org.slf4j.Logger;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessagePostProcessor;import javax.jms.ConnectionFactory;import javax.jms.JMSException;import javax.jms.TextMessage;import java.util.ArrayList;import java.util.List;import java.util.stream.Collectors;/** * Created by yangjunming on 6/20/16. */public final class MessageUtil {    private Logger logger = new MwLogger(MessageUtil.class);//这里就是一个Log4j2的实例,大家可以换成原生的log4j2或类似工具    private ConnectionFactory connectionFactory;    private long receiveTimeout;//接收超时时间    private JmsTemplate jmsTemplate;    private List
destinationQueueNames; private final static String BACKUP_QUEUE_SUFFIX = "_B"; private boolean autoBackup = false;//是否自动将消息备份到_b的队列,方便调试 public MessageUtil(final ConnectionFactory connectionFactory, final long receiveTimeout, final List
destinationQueueNames) { this.connectionFactory = connectionFactory; this.receiveTimeout = receiveTimeout; this.destinationQueueNames = new ArrayList<>(); this.destinationQueueNames.addAll(destinationQueueNames.stream().collect(Collectors.toList())); jmsTemplate = new JmsTemplate(this.connectionFactory); jmsTemplate.setReceiveTimeout(this.receiveTimeout); } public MessageUtil(ConnectionFactory connectionFactory, List
destinationQueueNames) { this(connectionFactory, 10000, destinationQueueNames); } public void convertAndSend(Object message) { if (ListUtil.isEmpty(destinationQueueNames)) { logger.error("目标队列为空,无法发送,请检查配置!message => " + message.toString()); return; } for (String dest : destinationQueueNames) { jmsTemplate.convertAndSend(dest, message); if (autoBackup) { jmsTemplate.convertAndSend(dest + BACKUP_QUEUE_SUFFIX, message); } } } public void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) { if (ListUtil.isEmpty(destinationQueueNames)) { logger.error("目标队列为空,无法发送,请检查配置!message => " + message.toString()); return; } for (String dest : destinationQueueNames) { jmsTemplate.convertAndSend(dest, message, messagePostProcessor); if (autoBackup) { jmsTemplate.convertAndSend(dest + BACKUP_QUEUE_SUFFIX, message, messagePostProcessor); } } } public void convertAndSend(String destinationName, Object message) { if (StringUtils.isBlank(destinationName)) { logger.error("目标队列为空,无法发送,请检查配置!message => " + message.toString()); return; } jmsTemplate.convertAndSend(destinationName, message); if (autoBackup) { jmsTemplate.convertAndSend(destinationName + BACKUP_QUEUE_SUFFIX, message); } } public void convertAndSend(String destinationName, Object message, MessagePostProcessor messagePostProcessor) { if (StringUtils.isBlank(destinationName)) { logger.error("目标队列为空,无法发送,请检查配置!message => " + message.toString()); return; } jmsTemplate.convertAndSend(destinationName, message, messagePostProcessor); if (autoBackup) { jmsTemplate.convertAndSend(destinationName + BACKUP_QUEUE_SUFFIX, message, messagePostProcessor); } } public static String getText(javax.jms.Message message) { if (message instanceof TextMessage) { try { return ((TextMessage) message).getText(); } catch (JMSException e) { return message.toString(); } } return message.toString(); } public String getFirstDestination() { if (ListUtil.isEmpty(destinationQueueNames)) { return null; } return destinationQueueNames.get(0); } public boolean isAutoBackup() { return autoBackup; } public void setAutoBackup(boolean autoBackup) { this.autoBackup = autoBackup; }}

其中主要就用到了convertAndSend(Object message, MessagePostProcessor messagePostProcessor) 这个方法,其它代码可以无视。

先来模拟瞬间向MQ发送大量延时消息:

/**     * 发送延时消息     *     * @param messageUtil     */    private static void sendScheduleMessage(MessageUtil messageUtil) {        for (int i = 0; i < 10000; i++) {            Object obj = "test:" + i;            messageUtil.convertAndSend(obj, new ScheduleMessagePostProcessor(1000 + i * 1000));        }    }

这里向MQ发送了1w条延时消息,每条消息延时1秒*i,上面代码中的ScheduleMessagePostProcessor类可在上篇中找到。

运行完之后,MQ中应该堆积着了很多消息了:

下面的代码可以清空所有延时消息:

/**     * 删除所有延时消息     *     * @param connectionFactory     * @throws JMSException     */    private static void deleteAllScheduleMessage(final ConnectionFactory connectionFactory) throws JMSException {        Connection conn = connectionFactory.createConnection();        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);        Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);        MessageProducer producer = session.createProducer(management);        Message request = session.createMessage();        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);        producer.send(request);    }

清空所有延时消息,有些用力过猛了,很多时候,我们只需要清理掉过期的延时消息(即:本来计划是8:00投递出去的消息,结果过了8点还没投递出去) 

/**     * 删除过期的延时消息     *     * @param connectionFactory     * @throws JMSException     */    private static void deleteExpiredScheduleMessage(final ConnectionFactory connectionFactory) throws JMSException {        long start = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(12);//删除:当前时间前12小时范围的延时消息        long end = System.currentTimeMillis();        Connection conn = connectionFactory.createConnection();        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);        Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);        MessageProducer producer = session.createProducer(management);        Message request = session.createMessage();        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start));        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end));        producer.send(request);    }

与上一段代码基本相似,只是多指定了删除消息的起止时间段。  

最后贴一段spring的配置文件及main函数入口

1 
2
5 6
7
8
9
11
12
13
14
15 16
17
18
19
20
21
dest1
22
dest2
23
24
25
26
27 28
View Code

main函数:

public static void main(String[] args) throws InterruptedException, JMSException {        ApplicationContext context = new ClassPathXmlApplicationContext("spring-sender.xml");        ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class, "jmsFactory");        MessageUtil messageUtil = context.getBean(MessageUtil.class);//        sendScheduleMessage(messageUtil);//        deleteAllScheduleMessage(connectionFactory);        deleteExpiredScheduleMessage(connectionFactory);    }

参考文章:

转载地址:http://gamya.baihongyu.com/

你可能感兴趣的文章
bzoj 5006(洛谷 4547) [THUWC2017]Bipartite 随机二分图——期望DP
查看>>
CF 888E Maximum Subsequence——折半搜索
查看>>
欧几里德算法(辗转相除法)
查看>>
面试题1-----SVM和LR的异同
查看>>
MFC控件的SubclassDlgItem
查看>>
如何避免历史回退到登录页面
查看>>
《图解HTTP》1~53Page Web网络基础 HTTP协议 HTTP报文内的HTTP信息
查看>>
unix环境高级编程-高级IO(2)
查看>>
树莓派是如何免疫 Meltdown 和 Spectre 漏洞的
查看>>
雅虎瓦片地图切片问题
查看>>
HTML 邮件链接,超链接发邮件
查看>>
HDU 5524:Subtrees
查看>>
手机端userAgent
查看>>
pip安装Mysql-python报错EnvironmentError: mysql_config not found
查看>>
http协议组成(请求状态码)
查看>>
怎样成为一个高手观后感
查看>>
[转]VC预处理指令与宏定义的妙用
查看>>
JQuery radio单选框应用
查看>>
MySql操作
查看>>
python 解析 XML文件
查看>>