【发布时间】:2018-08-03 14:44:25
【问题描述】:
我正在努力在 tibco JMS 提供程序中设置故障转移。我知道在 ActiveMQ 的情况下如何做到这一点。
我尝试过的如下
public class TibcoJMSQueueProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(FDPMetaCacheProducer.class);
private static QueueConnectionFactory factory;
private QueueConnection connection;
private QueueSession session;
@Inject
private FDPTibcoConfigDAO fdpTibcoConfigDao;
private String providerURL;
private String userName;
private String password;
@PostConstruct
public void constructProducer(){
configure();
}
private void configure() {
try {
List<FDPTibcoConfigDTO> tibcoConfigList = fdpTibcoConfigDao.getAllTibcoConfig();
if(!tibcoConfigList.isEmpty()){
FDPTibcoConfigDTO fdpTibcoConfigDTO = tibcoConfigList.get(tibcoConfigList.size()-1);
String providerURL = getProviderUrl(fdpTibcoConfigDTO);
setProviderUrl(providerURL);
String userName = fdpTibcoConfigDTO.getUserName();
String password = fdpTibcoConfigDTO.getPassword();
this.userName = userName;
this.password=password;
factory = new com.tibco.tibjms.TibjmsQueueConnectionFactory(providerURL);
}
} catch (Exception e) {
System.err.println("Exitting with Error");
e.printStackTrace();
System.exit(0);
}
}
private void setProviderUrl(String providerURL) {
this.providerURL = providerURL;
}
private String getProviderUrl(final FDPTibcoConfigDTO FDPTibcoConfigDTO) {
return TibcoConstant.TCP_PROTOCOL + FDPTibcoConfigDTO.getIpAddress().getValue() + TibcoConstant.COLON_SEPERATOR + FDPTibcoConfigDTO.getPort();
}
private Object lookupQueue(String queueName) {
Properties props = new Properties();
Object tibcoQueue = null;
props.setProperty(Context.INITIAL_CONTEXT_FACTORY, TibcoConstant.TIB_JMS_INITIAL_CONTEXT_FACTORY);
props.setProperty(Context.PROVIDER_URL, this.providerURL);
props.setProperty(TibcoConstant.TIBCO_CONNECT_ATTEMPT, "20,10");
props.setProperty(TibcoConstant.TIBCO_RECOVER_START_UP_ERROR, "true");
props.setProperty(TibcoConstant.TIBCO_RECOVER_RECONNECT_ATTEMPT, "20,10");
InitialContext context;
try {
context = new InitialContext(props);
tibcoQueue = context.lookup(queueName);
} catch (NamingException e) {
System.out.println(e.getMessage());
}
return tibcoQueue;
}
public void pushIntoQueueAsync(String message,String queueName) throws JMSException {
connection = factory.createQueueConnection(userName, password);
connection.start();
session = connection.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
Queue pushingQueue = (Queue)lookupQueue(queueName);
QueueSender queueSender = session.createSender(pushingQueue);
queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage sendXMLRequest = session.createTextMessage(message);
queueSender.send(sendXMLRequest);
LOGGER.info("Pushing Queue {0} ,Pushing Message : {1}", pushingQueue.getQueueName(), sendXMLRequest.getText());
}
public String pushIntoQueueSync(String message,String queueName,String replyQueueName) throws JMSException {
connection = factory.createQueueConnection(userName, password);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = (Destination)lookupQueue(queueName);
MessageProducer messageProducer = session.createProducer(destination);
session = connection.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
UUID randomUUID =UUID.randomUUID();
TextMessage textMessage = session.createTextMessage(message);
String correlationId = randomUUID.toString();
//Create Reply To Queue
Destination replyDestination = (Destination)lookupQueue(queueName);
textMessage.setJMSReplyTo(replyDestination);
textMessage.setJMSCorrelationID(correlationId);
String messgeSelector = "JMSCorrelationID = '" + correlationId + "'";
MessageConsumer replyConsumer = session.createConsumer(replyDestination,messgeSelector);
messageProducer.send(textMessage, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, 1800000);
Message replayMessage = replyConsumer.receive();
TextMessage replyTextMessage = (TextMessage) replayMessage;
String replyText = replyTextMessage.getText();
LOGGER.info("Pushing Queue {0} ,Pushing Message : {1}", queueName, message);
return replyText;
}
public static QueueConnectionFactory getConnectionFactory(){
return factory;
}
}
在activeMQ的情况下,我们使用
failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61616)?randomize=false&amp;backup=true url 作为 ActiveMQconnectionfactory 构造函数中的提供程序 url 处理故障转移。我在某处看到过在 TIBCO 这样的情况下使用多个 url
tcp://169.144.87.25:7222,tcp://127.0.0.1:7222
我是如何像这样检查故障转移的。
首先,我使用单个 IP (tcp://169.144.87.25:7222) 进行了检查。消息正在正常发送和接收(我没有发布 TibcoJMSReceiver 代码)。
我尝试使用另一个 IP (tcp://169.144.87.25:7222)。它工作正常。
但是当我尝试使用
最终字符串 PROVIDER_URL="tcp://169.144.87.25:7222,tcp://127.0.0.1:7222";
我开始了我的计划。但在给出输入之前,我关闭了第一台服务器。作为故障转移,应将消息发送到其他服务器。
但它告诉我session closed Exception。
那么我是否以正确的方式处理故障转移,或者我需要做其他配置。
【问题讨论】: