【问题标题】:How to create a durable subscriber for topic on Jboss 5?如何为 Jboss 5 上的主题创建持久订阅者?
【发布时间】:2012-03-26 18:21:50
【问题描述】:

我正在尝试写一个具有持久订阅者的主题。我得到了我的基本主题:

<?xml version="1.0" encoding="UTF-8"?>
<server>
    <mbean code="org.jboss.jms.server.destination.TopicService" name="jboss.messaging.destination:service=Topic,name=durableTopic" xmbean-dd="xmdesc/Topic-xmbean.xml">
    <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
    <depends>jboss.messaging:service=PostOffice</depends>
</mbean>  
</server>

我得到了订阅 MDB:

@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "topic/durableTopic"),
@ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "Durable") })
public class DurableSubscriberOne implements MessageListener {
// ...

但是当我进入 jmx-console 或 admin-console 时,我看到我的主题有一个非持久订阅且没有持久订阅。

我犯了一些错字或一些小错误,还是比这更棘手?我正在使用 JBoss 5.1.0.GA。

【问题讨论】:

  • 有完全相同的问题。它甚至在日志中说“durable=true”,但在 JMX 控制台中我只有一个 NonDurableSubscription。

标签: jboss jms publish-subscribe


【解决方案1】:

遇到了同样的问题,我最终设法通过添加两个 @ActivationConfigProperty 使 MDB 创建持久订阅:

@ActivationConfigProperty(propertyName = "subscriptionName", propertyValue ="SomeSubscriptionName")
@ActivationConfigProperty(propertyName = "clientId", propertyValue ="SomeClientId")

【讨论】:

  • 这似乎没问题,虽然现在我们的系统规格略有变化;)现在我们有一个连接到该主题的 jboss 桥,它需要两个属性:&lt;attribute name="SubName"&gt;subName&lt;/attribute&gt; &lt;attribute name="ClientID"&gt;clientId&lt;/attribute&gt;,所以我假设相同MDB 需要(尽管我相信手册说默认情况下订阅名称将由 MDB jndi 名称、jar 名称和其他一些东西创建)。
【解决方案2】:

我真的找不到任何错误。对我有用的代码。我包括我的代码示例和屏幕截图供您参考。

package com.jboss.example;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.Message;
import javax.jms.MessageListener;

/**
 * Message-Driven Bean implementation class for: DurableMessageListener
 * 
 */
@MessageDriven(activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
        @ActivationConfigProperty(propertyName = "destination", propertyValue = "topic/durableTopic"),
        @ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "Durable") })
//, mappedName = "durableTopic")
public class DurableMessageListener implements MessageListener {

    /**
     * Default constructor.
     */
    public DurableMessageListener() {
        // TODO Auto-generated constructor stub
    }

    /**
     * @see MessageListener#onMessage(Message)
     */
    public void onMessage(Message message) {
        // TODO Auto-generated method stub
        System.out.println("Received Message " + message);
    }

}

主题订阅者示例

package com.jboss.example;

import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class DurableTopicSubscriber {
    TopicConnection conn = null;
    TopicSession session = null;
    Topic topic = null;

    public void setupPubSub() throws JMSException, NamingException {

        Properties env = new Properties();
        env.setProperty("java.naming.factory.initial",
                "org.jnp.interfaces.NamingContextFactory");
        env.setProperty("java.naming.factory.url.pkgs",
                "org.jboss.naming:org.jnp.interfaces");
        env.setProperty("java.naming.provider.url", "jnp://localhost:1099");
        InitialContext iniCtx = new InitialContext(env);
        Object tmp = iniCtx.lookup("ConnectionFactory");

        TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
        conn = tcf.createTopicConnection("guest", "guest");
        conn.setClientID("Dirabla");
        topic = (Topic) iniCtx.lookup("topic/durableTopic");

        session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
        conn.start();
    }

    public void recvSync() throws JMSException, NamingException {
        System.out.println("Begin recvSync");
        // Setup the pub/sub connection, session
        setupPubSub();
        // Wait upto 5 seconds for the message
        TopicSubscriber recv = session.createSubscriber(topic);
        //TopicSubscriber recv = session.createDurableSubscriber(topic, "durableTopicName");
        Message msg = recv.receive(5000);
        while (msg != null) {
            System.out.println("DurableTopicClient.recv, msgt=" + msg);
            msg = recv.receive(5000);
        }
    }

    public void stop() throws JMSException {
        conn.stop();
        session.close();
        conn.close();
    }

    public static void main(String args[]) throws Exception {
        System.out.println("Begin DurableTopicRecvClient, now="
                + System.currentTimeMillis());
        DurableTopicSubscriber client = new DurableTopicSubscriber();
        client.recvSync();
        client.stop();
        System.out.println("End DurableTopicRecvClient");
        System.exit(0);
    }

}

主题示例发布者

package com.jboss.example;

import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class DurableTopicPublisher {
    TopicConnection conn = null;
    TopicSession session = null;
    Topic topic = null;

    public void setupPubSub() throws JMSException, NamingException {

        Properties env = new Properties();
        env.setProperty("java.naming.factory.initial",
                "org.jnp.interfaces.NamingContextFactory");
        env.setProperty("java.naming.factory.url.pkgs",
                "org.jboss.naming:org.jnp.interfaces");
        env.setProperty("java.naming.provider.url", "jnp://localhost:1099");
        InitialContext iniCtx = new InitialContext(env);
        Object tmp = iniCtx.lookup("ConnectionFactory");

        TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
        conn = tcf.createTopicConnection("guest", "guest");
        conn.setClientID("Dirabla");
        topic = (Topic) iniCtx.lookup("topic/durableTopic");

        session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
    }

    public void recvSync() throws JMSException, NamingException {
        System.out.println("Begin recvSync");
        setupPubSub();
        TopicPublisher topicPublisher = session.createPublisher(topic);

        Message message = session.createMessage();

        for (int i = 0; i < 10; i++) {
            message.setIntProperty("id", i);
            topicPublisher.publish(message);
        }
    }

    public void stop() throws JMSException {
        conn.stop();
        session.close();
        conn.close();
    }

    public static void main(String args[]) throws Exception {
        System.out.println("Begin DurableTopicRecvClient, now="
                + System.currentTimeMillis());
        DurableTopicPublisher client = new DurableTopicPublisher();
        client.recvSync();
        client.stop();
        System.out.println("End DurableTopicRecvClient");
        System.exit(0);
    }

}

主题声明和你的一样

<?xml version="1.0" encoding="UTF-8"?>
<server>
    <mbean code="org.jboss.jms.server.destination.TopicService" name="jboss.messaging.destination:service=Topic,name=durableTopic" xmbean-dd="xmdesc/Topic-xmbean.xml">
    <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
    <depends>jboss.messaging:service=PostOffice</depends>
</mbean>  
</server>

截图

【讨论】:

  • 我已将您的 DurableMessageListener 和 DurableTopicSubscriber 添加到我的 jboss 中,但我仍然可以看到 4 个非持久订阅和 0 个持久订阅(其中两个是我以前的 MDB)。可能我的jboss有问题,稍后会尝试调查。
  • 不,不是 jboss。我已经下载了新的 jboss-5.1.0.GA-jdk6,复制了所有的 jars 和 service.xml 来部署,在 defaultall 配置中运行它并且仍然一堆非持久订阅。
猜你喜欢
  • 2020-01-12
  • 1970-01-01
  • 2013-11-10
  • 1970-01-01
  • 2018-01-15
  • 2015-08-11
  • 2013-07-03
  • 1970-01-01
  • 2012-03-07
相关资源
最近更新 更多