admin管理员组文章数量:1635850
1.1通配符
1"." 用于作为路径上名字间的分割符
2">" 用于递归的匹配任何以这个名字开始的Destination
3 "*"用于作为路径上任何名字
1.2组合队列(Composite Destination)
组合队列允许用一个虚拟的destination代表多个destionations。这样就可以通过组合的destination在操作中同时向多个queue发送消息。
客户端实现:
在compositedestination,多个destination之间采用逗号分割,比如:
Queuequeue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C");
如果你希望使用不同类型的destination,那么需要加上前缀queue://或
topic://,比如:
Queuequeue = new ActiveMQQueue("FOO.A,topic://Notify.FOO.A");
XML配置实现方式:activemq> broker
<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <compositeQueue name="MY.QUEUE"> <forwardTo> <queue physicalName="my-queue" /> <queue physicalName="my-queue2" /> </forwardTo> </compositeQueue> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors> |
然后代码创建destination时候,就发到MY.QUEUE
publicclass QueueSender { privatestatic final StringBROKER_URL = "tcp://192.168.3.10:61616"; publicstatic void main(String[]args) { ConnectionFactoryconnectionFactory = null; Connectionconnection = null; Sessionsession = null; try { connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destinationdestination = session.createQueue("my-queue,nicky-queue"); MessageProducerproducer = session.createProducer(destination); TextMessagemessage = null; for(inti = 0 ; i < 3;i++){ message = (TextMessage)session.createTextMessage(); message.setText("Hello "+i); producer.send(message); } sessionmit(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } publicclass QueueReciever { privatestatic final StringBROKER_URL = "tcp://192.168.3.10:61716"; publicstatic void main(String[]args) { ConnectionFactoryconnectionFactory = null; Connectionconnection = null; Sessionsession = null; try { connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destinationdestination = session.createQueue("my-queue"); MessageConsumerconsumer = session.createConsumer(destination);
int i = 0; while (i < 100) { Messagemessage = consumer.receive(); sessionmit(); if (messageinstanceof TextMessage){ System.out.println("Message: "+((TextMessage)message).getText()); }else if (messageinstanceof MapMessage) {
} }
session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } publicclass QueueReciever2 { privatestatic final StringBROKER_URL = "tcp://192.168.3.10:61716"; publicstatic void main(String[]args) { ConnectionFactoryconnectionFactory = null; Connectionconnection = null; Sessionsession = null; try { connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destinationdestination = session.createQueue("nicky-queue"); MessageConsumerconsumer = session.createConsumer(destination);
int i = 0; while (i < 100) { Messagemessage = consumer.receive(); sessionmit(); if (messageinstanceof TextMessage){ System.out.println("Message: "+((TextMessage)message).getText()); }else if (messageinstanceof MapMessage) {
} }
session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } |
使用filtered destinations,在xml配置如下:
<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <compositeQueue name="MY.QUEUE"> <forwardTo> <filteredDestination selector="odd='yes'" queue="FOO" /> <filteredDestination selector="i = 5" topic="BAR" /> </forwardTo> </compositeQueue> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors> |
避免在network连接到broker,出现重复消息:
<networkConnectors> <networkConnector uri= "static://(tcp://localhost:61616) " > <excludedDestinations> <queue physicalName="Consumer.*VirtualTopic.> " /> </ excludedDestinations> </ networkConnector> </ networkConnectors> |
在ActiveMQ启动时候就创建Destination:
<broker xmlns="http://activemq.apache/schema/core"> <destinations> <queue physicalName="FOO.BAR" /> <queue physicalName="SOME.TOPIC" /> </destinations> </broker> |
1.3删除无用的队列
一种方式:可以通过web控制台或是JMX方式来删除掉
二种方式:通过配置文件,自动探测无用的队列并删除掉,回收响应资源,配置如下:
<broker xmlns="http://activemq.apache/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulePeriodForDestinationPurge="10000"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimeoutBeforeGC="30000" /> </policyEntries> </policyMap> </destinationPolicy> </broker> |
SchedulePeriodForDestinationPurge:设置多长时间检查一次。
inactiveTimeoutBeforeGC:设置当destination为空后,多长时间被删除,这里是30s,默认为60
gcInactiveDestinations:设置删除掉不活动队列,默认为false
1.4Destination 选项
这个是给消费者在JMS规范之外添加的功能特性,通过在队列名称后面使用类似url的语法添加多个选项。包括:
1 consumer.perfetchSize,消费者持有的未确认的最大消费数量
2consumer.maximumPendingMessageLimit: 用来控制非持久化的topic在存在慢消费者的情况下,丢弃的数量,默认为0
3consumer.noLocal: 默认false
4consumer.dispatchAsync: 是否异步分发,默认true
5consumer.retroactive: 是否为回溯消费者,默认false
6consumer.selector: JMS的selector,默认null
7consumer.exclusive: 是否为独占消费者,默认false
8consumer.priority:设置消费者的优先级,默认0
使用示例:
Queue queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync= false&consumer.perfetchSize=10"); Consumer consumer = session.createConsumer(queue); |
1.5Virtual Destination
虚拟destination用来创建逻辑destination,客户端可以通过它来生产和消费消息,它会把消息映射到物理destination.ActiveMQ支持2种方式:
虚拟主题
为什么使用虚拟主题?
ActiveMQ只有在持久订阅才是持久化的。持久订阅时,每一个持久订阅者,都相当于一个queue的客户端,它会收取所有消息。这种情况下存在两个问题:
第一:同一应用内消费者端护在均衡的问题。也就是说一个应用程序内的持久化消息,不能使用对个消费者共同承担消息处理能力。因为每个消费者都会获取所有消息。因为每一个消费者都会获取所有信息。
Queue到时可以解决这个问题,但broker端又不能将消息发送到多个应用端,所以纪要发布订阅,又要让消费者分组,这个功能JMS本身是没有的
第二:同一应用内消费者端failover问题,由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理,系统的健壮性不高。
如何使用虚拟topic?
第一:对于消息发布者来说,就是一个正常的topic,名称以VirtualTopic.开始,比如VirtualTopic.Orders,代码示例如下:
Topicdestination = session.createTopic("VirtualTopic.Orders");
第二:对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列名称,即可表明自己的身份即可实现消费端应用分组。
例如Consumer.A.VirtualTopic.Orders说明它是名称为A的消费端,同理Consumer.B VirtualTopic.Orders说明是一名称为B的消费端。可以在同一个应用中使用多个消费者消费这个队列
又因为不同应用使用的topic名称不一样,前缀不同,所以不同应用中都可以接受到全部消息。每一个客户端相当于一个持久订阅者,而且这个客户端可以使用多个消费者共同来承担消费任务。
代码示例:
Destination dest = session.createQueue("Consumer.A.VirtualTopic.Orders"); |
publicclass PersistentTopicSender { privatestatic final StringBROKER_URL = "tcp://192.168.3.10:61616"; publicstatic void main(String[]args) throws JMSException { ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory(BROKER_URL); Connectionconn = connectionFactory.createConnection();
Sessionsession = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destinationdest = session.createTopic("VirtualTopic.Orders"); MessageProducerproducer = session.createProducer(dest); producer.setDeliveryMode(DeliveryMode.PERSISTENT); conn.start(); for (inti = 0; i < 3;i++) { TextMessagemessage = session.createTextMessage(); message.setText("Persistence Topic Message : "+ (i+1)); producer.send(message); } sessionmit(); session.close(); conn.close(); } } publicclass QueueReciever2 { privatestatic final StringBROKER_URL = "tcp://192.168.3.10:61716"; publicstatic void main(String[]args) { ConnectionFactoryconnectionFactory = null; Connectionconnection = null; Sessionsession = null; try { connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destinationdestination = session.createQueue("Consumer.A.VirtualTopic.Orders"); MessageConsumerconsumer = session.createConsumer(destination);
int i = 0; while (i < 3) { Messagemessage = consumer.receive(); sessionmit(); if (messageinstanceof TextMessage){ System.out.println("Message: "+((TextMessage)message).getText()); }else if (messageinstanceof MapMessage) {
} }
session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } |
其实把消费者队列化了。
修改虚拟主题的前缀:
默认前缀是VirtualTopic.>
自定义消费虚拟地址默认格式:Consumer.*.VirtualTopic.>
修改配置:
<broker xmlns="http://activemq.apache/schema/core"> <destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false" /> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors> </broker> |
组合destination
1.6Mirrored Queue 镜像队列
ActiveMQ每一个queue中消息只能被一个消费者消费,然而,有时候,你希望能够监视生产者和消费者之间的消息流。你可以通过使用VirtualDestinations来建立一个virtualqueue来吧消息转发到多个queue中。但是,为系统每一个queue都进行如此的配置可能会很麻烦。
MirroredQueue: Broker会把发送到某一个队列上的所有消息转发到一个名称类似的topic,因此监控程序只需要订阅这个mirroredqueue topic.为启用MirroredQueue,首先要将BrokerService的useMirrored
Queues属性设置为true:
<broker xmlns="http://activemq.apache/schema/core"use MirroredQueue="true"> </broker> |
然后可以通过destinationInterceptors设置其属性,如mirrortopic的前缀,缺省是VritualTopic.Mirror.
修改后缀的配置示例:
<broker xmlns="http://activemq.apache/schema/core"> <destinationInterceptors> <mirroredQueue copyMessage="true" postfix=".qmirror" prefix="" /> </destinationInterceptors> </broker> |
根虚拟topic相反,这时发布的是队列,但是消费者时topic,相当于消费者主题化。
本文标签: 特性高级ActiveMqdestination
版权声明:本文标题:ActiveMQ Destination高级特性 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dianzi/1729216997a1190540.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论