【问题标题】:How to handle failover in case of TIBCO在 TIBCO 的情况下如何处理故障转移
【发布时间】:2018-08-03 14:44:25
【问题描述】:

我正在努力在 tibco JMS 提供程序中设置故障转移。我知道在 ActiveMQ 的情况下如何做到这一点。

我尝试过的如下

    public class TibcoJMSQueueProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(FDPMetaCacheProducer.class);

    private static QueueConnectionFactory factory;
    private QueueConnection connection;
    private QueueSession session;

    @Inject
    private FDPTibcoConfigDAO fdpTibcoConfigDao;

    private String providerURL;

    private String userName;
    private String password;



    @PostConstruct
    public void constructProducer(){
        configure();
    }

    private void configure() {
        try {
            List<FDPTibcoConfigDTO> tibcoConfigList = fdpTibcoConfigDao.getAllTibcoConfig();
            if(!tibcoConfigList.isEmpty()){
                FDPTibcoConfigDTO fdpTibcoConfigDTO = tibcoConfigList.get(tibcoConfigList.size()-1);
                String providerURL = getProviderUrl(fdpTibcoConfigDTO);

                setProviderUrl(providerURL);
                String userName = fdpTibcoConfigDTO.getUserName();
                String password = fdpTibcoConfigDTO.getPassword();
                this.userName = userName;
                this.password=password;
                factory = new com.tibco.tibjms.TibjmsQueueConnectionFactory(providerURL);

            }

        } catch (Exception e) {
            System.err.println("Exitting with Error");
            e.printStackTrace();
            System.exit(0);
        }

    }

    private void setProviderUrl(String providerURL) {
        this.providerURL = providerURL;
    }

    private String getProviderUrl(final FDPTibcoConfigDTO FDPTibcoConfigDTO) {
        return TibcoConstant.TCP_PROTOCOL + FDPTibcoConfigDTO.getIpAddress().getValue() + TibcoConstant.COLON_SEPERATOR + FDPTibcoConfigDTO.getPort();
    }

    private Object lookupQueue(String queueName) {

        Properties props = new Properties();
        Object tibcoQueue = null;
        props.setProperty(Context.INITIAL_CONTEXT_FACTORY, TibcoConstant.TIB_JMS_INITIAL_CONTEXT_FACTORY);
        props.setProperty(Context.PROVIDER_URL, this.providerURL);
        props.setProperty(TibcoConstant.TIBCO_CONNECT_ATTEMPT, "20,10");
        props.setProperty(TibcoConstant.TIBCO_RECOVER_START_UP_ERROR, "true");
        props.setProperty(TibcoConstant.TIBCO_RECOVER_RECONNECT_ATTEMPT, "20,10");

        InitialContext context;
        try {
            context = new InitialContext(props);
            tibcoQueue = context.lookup(queueName);
        } catch (NamingException e) {
            System.out.println(e.getMessage());
        }

        return tibcoQueue;
    }

    public void pushIntoQueueAsync(String message,String queueName) throws JMSException {
        connection = factory.createQueueConnection(userName, password);
        connection.start();
        session = connection.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
        Queue pushingQueue = (Queue)lookupQueue(queueName);
        QueueSender queueSender = session.createSender(pushingQueue);
        queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        TextMessage sendXMLRequest = session.createTextMessage(message);
        queueSender.send(sendXMLRequest);
        LOGGER.info("Pushing Queue {0} ,Pushing Message : {1}", pushingQueue.getQueueName(), sendXMLRequest.getText());
    }

    public String pushIntoQueueSync(String message,String queueName,String replyQueueName) throws JMSException {
        connection = factory.createQueueConnection(userName, password);
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = (Destination)lookupQueue(queueName);
        MessageProducer messageProducer = session.createProducer(destination);

        session = connection.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
        UUID randomUUID =UUID.randomUUID();
       TextMessage textMessage = session.createTextMessage(message);
       String correlationId = randomUUID.toString();
       //Create Reply To Queue
       Destination replyDestination = (Destination)lookupQueue(queueName);
       textMessage.setJMSReplyTo(replyDestination);
       textMessage.setJMSCorrelationID(correlationId);
       String messgeSelector = "JMSCorrelationID = '" + correlationId + "'";

       MessageConsumer replyConsumer = session.createConsumer(replyDestination,messgeSelector);
       messageProducer.send(textMessage, javax.jms.DeliveryMode.PERSISTENT,   javax.jms.Message.DEFAULT_PRIORITY, 1800000);

       Message replayMessage = replyConsumer.receive();

       TextMessage replyTextMessage = (TextMessage) replayMessage;
       String replyText = replyTextMessage.getText();
       LOGGER.info("Pushing Queue {0} ,Pushing Message : {1}", queueName, message);
        return replyText;
    }

    public static QueueConnectionFactory getConnectionFactory(){
        return factory;
    }


}

在activeMQ的情况下,我们使用 failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61616)?randomize=false&amp;amp;backup=true url 作为 ActiveMQconnectionfactory 构造函数中的提供程序 url 处理故障转移。我在某处看到过在 TIBCO 这样的情况下使用多个 url tcp://169.144.87.25:7222,tcp://127.0.0.1:7222

我是如何像这样检查故障转移的。

首先,我使用单个 IP (tcp://169.144.87.25:7222) 进行了检查。消息正在正常发送和接收(我没有发布 TibcoJMSReceiver 代码)。

我尝试使用另一个 IP (tcp://169.144.87.25:7222)。它工作正常。

但是当我尝试使用

最终字符串 PROVIDER_URL="tcp://169.144.87.25:7222,tcp://127.0.0.1:7222";

我开始了我的计划。但在给出输入之前,我关闭了第一台服务器。作为故障转移,应将消息发送到其他服务器。 但它告诉我session closed Exception

那么我是否以正确的方式处理故障转移,或者我需要做其他配置。

【问题讨论】:

    标签: jms tibco tibco-ems


    【解决方案1】:

    只有在两个 TIBCO EMS 守护程序中都启用容错功能时,它们才能“作为一个”工作。只有这样,他们才会彼此心跳并共享资源。你应该在远程守护进程的 tibemsd.conf 中有这个:

    listen                  = tcp://7222
    ...
    ft_active               = tcp://<ip to your box>:7222
    

    这个在你的本地盒子上:

    listen                  = tcp://7222
    ...
    ft_active               = tcp://169.144.87.25:7222
    

    而且您不需要每次都创建连接和会话!许多消息的一个连接和会话 - “容错”意味着它会自动为您重新连接。你可以有一个 init() 或 connect() 方法你调用 一次 或者只是将它添加到你的配置方法中:

     private void configure() {
          try {
            ...
            connection = factory.createQueueConnection(userName, password);
            connection.start();
            session = connection.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
    

    那么 pushIntoQueue 就变得这么简单了:

    public void pushIntoQueueAsync(String message,String queueName) throws JMSException {
        Queue pushingQueue = (Queue)lookupQueue(queueName);
        QueueSender queueSender = session.createSender(pushingQueue);
        queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        TextMessage sendXMLRequest = session.createTextMessage(message);
        queueSender.send(sendXMLRequest);
        LOGGER.info("Pushing Queue {0} ,Pushing Message : {1}", pushingQueue.getQueueName(), sendXMLRequest.getText());
    }
    

    【讨论】:

    • 感谢您的回复。我试过了,它有效。但我有一个问题。在推送任何新消息之前,我必须创建新连接并创建新会话。我不创建连接。它给了我异常连接已关闭或者如果我尝试使用相同的会话。它给了我异常会话已关闭。
    • 不确定我是否理解。你上面的代码对你有用吗?
    • 让我简化一下我的陈述。我只想知道,在处理容错时,每次将消息推送到队列时是否都必须创建新连接和新会话?我已经更新了我的代码你能看看 pushIntoQueueSync()。每次将消息推送到队列时,我都会调用此方法。如何将会话重新附加到其他活动服务器或故障活动连接到其他活动服务器?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-03-15
    • 2014-11-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-01-06
    • 2016-05-01
    相关资源
    最近更新 更多