一、序言
有时候我们追求最快的方式发送消息,我们就采用的异步方式,并且不持久化。但是这样带来的问题有这样几个:
1.如果消费者的消费能力低于生产者,那么消息就会积压在broker, 从而导致broker 可能挂掉。
2.我们知道存放内存的模式,只要出现宕机或者其他问题,容易丢消息,因此得看情况而定
对于问题1,activemq 采用了限流 内存溢出提醒的方式进行处理,下面是一些实例过程。
官方介绍可以参考:http://activemq.apache.org/producer-flow-control.html
二、操作过程:
1.我用的activemq.5.11 版本,下载下来打开conf/activemq.xml默认配置改为异步persistent=false
<broker xmlns="http://activemq.apache.org/schema/core" persistent="false" brokerName="localhost" dataDirectory="${activemq.data}">
同时开启限流模式,未了测试方便,我们限流5M
<policyEntry queue=">" producerFlowControl="true" memoryLimit="5mb" /> # 经过测试 producerFlowControl 是默认开启的
这里有原文信息:
Note that, since the introduction of the new file cursor in ActiveMQ 5.x, non-persisted messages are shunted into the temporary file store to reduce the amount of memory used for non-persistent messaging. As a result, you may find that a queue's memoryLimit is never reached, as the cursor doesn't use very much memory. 英文不好,大概意思是: activemq 5.0 以后引入了new file cursor ,非持久化消息会分流到temporary file 存储,用来减少实际内存使用。结果你会发现queue 的内存限制,无法超标,cursor 不会消耗很多内存。 这里有个疑问,反正temporary file 我是没看到- -,不知道为什么 注意:这里是每个queue 限制5M,不是总的
2.测试方式:
打开borker,然后发送10W消息到queue,你可以看到到了一定条数,producer 就不动了,这时候你再打开customer ,才会正常消费。
3.当内存达到limit 限制的时候,我们可能需要做其他处理,方便我们得知,比如:让生产者异常
<systemUsage> <systemUsage sendFailIfNoSpace="true"> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> </systemUsage> </systemUsage>
然后生产者,或者说客户端就可以捕获:javax.jms.ResourceAllocationException
当然这个会直接抛出异常,后面了个属性,表示5秒后才抛出这个异常。
<systemUsage sendFailIfNoSpaceAfterTimeout="5000">
我们捕获异常之后就可以持续发送消息之后,就可以做其他处理了,broker 也不会爆掉了。
3.Disabling Flow Control 不控制流量的做法
其实这是为了使用整个服务器的配置资源,也就是说上面是单个控制,这里是整体控制
# 默认的系统配置,分别代表
#memoryUsage 内存
#storeUsage 持久化
#tempUsage 临时数据
#当整体数据 操作下面的时候,才爆异常,就不用单个控制了
# 比如用异步 非持久化 测试,Memory percent used 到100 就不动了
# 这部分的使用可以参考:
# http://akuntamukkala.blogspot.com/2014/01/understanding-memory-usage-in-activemq.html
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="64 mb" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb" />
</storeUsage>
<tempUsage>
<tempUsage limit="10 gb" />
</tempUsage>
</systemUsage>
</systemUsage>
三、基础伪代码
@Autowired private Destination destination; @Autowired JmsTemplate jmsTemplate; @Test public void Sender(){ for(int i=0;i<50000;i++){ sendMsg(i); } } // 消息发送 private void sendMsg(int i){ try { jmsTemplate.convertAndSend(destination,""+i); }catch (Exception e){ sendMsg(i); } }
<bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate"> <!-- 链接工长 --> <property name="connectionFactory" ref="cachingConnectionFactory"/> <!-- 1:非持久化,2:持久化 --> <property name="deliveryMode" value="1" /> <!-- 消息转换器 --> <property name="messageConverter" ref="msgConvert"/> </bean> <!-- 队列的目的地描述 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 通过 构造 设定 队列的名字 --> <constructor-arg index="0" value="orderQueue"/> </bean>
四、测试步骤:
1.开启broker,开启限流5M
2.发送queue 10W消息,如果没配置异常,则会一直阻塞,配置了则会抛异常
3.阻塞之后开启消费者,消息能正常收到。
小结:
1.这里将官网的东西拿来做了个简单的说明和解释,如果需要这里的详细实习,得去看ActiveMQMessageProducer 类
2.我这里用的spring +mq 的形式,如果用原生的东西,有朋友没测试成功,测试的时候尽量剔除其他东西,比如事务之类的。。
3.有朋友问自己发送了1W消息,为什么控制台只有几千条,因为你发送1W消息其实达到limit 限制之后,后面的消息没有进入quque,因此你在控制台看到的消息是没有那么多的,假设是8K条,而且你的1W消息 只是根据发送点打印的,没进入queue.那么那些消息去哪儿了呢?源码还没具体分析,但是肯定是在内存中,没进入队列。
这种场景我们很好思考:我限制房间只有100个人,但是来的人很快,一次来10-20个,那么满了的时候就会剩下一些在门口进不去,而客户端觉得我已经发送了120个人过去了。。。,实际我房间只有100个,实际多的20个还在客户端的发送队列里面,如果这时候客户端挂了,那么消费者只能收到100个~。~
4.这种场景是允许的,如果保证消息不丢失,那么就要持久化的,一般高效的东西都会容忍这类事件的。
5.有错误的请指点,不胜感谢~。~
相关推荐
spring+activemq必备jar包:activeio-core-3.1.4.jar,activemq-all-5.13.2.jar,activemq-pool-5.13.2.jar,commons-pool2-2.4.2.jar
SpringBoot+ActiveMq+MQTT实现消息的发送和接收 后台消费者、生产者、消息发送接口、发送消息业务类等相关配置
spring+activemq,示例工程,演示JMS消息通信。。。。。。。
基于Spring+JMS+ActiveMQ+Tomcat,我使用的版本情况如下所示:Spring 3.2.0,ActiveMQ 5.4.3,Tomcat 6.0.43。本例通过详细的说明和注释,实现消息服务的基本功能:发送与接收。Spring对JMS提供了很好的支持,可以...
NULL 博文链接:https://ihenu.iteye.com/blog/2270078
整合Spring + ActiveMQ 的朋友可以下载看一下 简单易懂
Spring+ActiveMQ整合实例代码工程,朋友提供给我参考的,我备个份以便下次查阅,也分享给大家,看看对大家有没有帮助了
springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot...
基于Spring+JMS+ActiveMQ+Tomcat,我使用的版本情况如下所示: •Spring 2.5 •ActiveMQ 5.4.0 •Tomcat 6.0.30 下面通过学习与配置,实现消息服务的基本功能:发送与接收。Spring对JMS提供了很好的支持,可以...
spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...
基于Maven的Spring+ActiveMQ,比较贴合实际生产,只实现了Topic,queue改点配置就行了
Windowss 下 Spring + ActiveMq + maven 集成配置分为两章,环境搭建之前参考了网上的博客以及同事的配置。
资源的内容是Spring+ActiveMQ的整合,包括所有的架包。
Spring+ActiveMQ active基本例子 注解的完整实例,包含jar包
spring+activeMQ 嵌入式配置 完整demo(包含jar包)
maven spring mq整合,注意最新版的mq的jar包是集成了spring的,我用的5.11.1的。 运行之前,先要下载mq服务本地运行http://apache.fayea.com//activemq/5.14.3/apache-activemq-5.14.3-bin.zip
spring+springmvc+mybatis+mongodb+ActiveMQ+CXF
基于Spring+JMS+ActiveMQ+Tomcat,做一个Spring4.1.0和ActiveMQ5.11.1整合实例,实现了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,简单实例,不包含任何业务。
type="org.apache.activemq.command.ActiveMQQueue" description="My Message Queue" factory="org.apache.activemq.jndi.JNDIReferenceFactory" physicalName="MyMessageQueue"/> 说明:...
Spring+JMS+ActiveMQ+Tomcat jar下载,在博客主页有实例,欢迎换看