【发布时间】:2019-03-10 08:54:25
【问题描述】:
我们有一个独立的 java 应用程序使用第三方工具来管理连接池,它在 v6_client + v6_server 设置中为我们工作了很长时间。
现在我们正在尝试从 v6 迁移到 v9(是的,我们来晚了.....),发现 v9_client 到 v6_server 的连接不断中断,意思是:
- XAQueueConnectionFactory#createXAConnection() 创建的Socket 总是立即关闭,而创建的XAConnection 似乎并没有察觉。
- 由于上面提到的套接字关闭,从 XAConnection.createXASession() 创建的 XASession 总是会创建一个新的套接字并在 XASession.close() 之后关闭该套接字。
我们浏览了 v9_client 的完整可调参数列表(XAQCF https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.0.0/com.ibm.mq.ref.dev.doc/q111800_.html 中的列),并且只发现了我们在 v6_client 中没有使用的两个潜在的新配置,SHARECONVALLOWED 和 PROVIDERVERSION。不幸的是,两者都没有帮助我们......具体来说:
- 考虑到 v6_server 端没有 SHARECNV 属性,我们尝试了 setShareConvAllowed(WMQConstants.WMQ_SHARE_CONV_ALLOWED_[YES/NO]),这不足为奇。
- 我们通过 setProviderVersion("[6/7/8") 尝试了“迁移/受限/正常模式”([7/8] 抛出异常,正如预期的那样......)。
只是想知道是否有其他人有类似的经历并可以分享一些见解。我们尝试了 v9_server+v9_client 并没有看到任何类似的问题,所以这也可能是我们最终的解决方案......
顺便说一句,WMQ 托管在 linux (RedHat) 上,我们只在客户端使用 MQXAQueueConnectionFactory 的产品(用于 jms 的 ibm mq 类)。
谢谢。
其他详细信息/更新。
[update-1]
--[游乐场设置]
v9_client 罐子:
javax.jms-api-2.0.jar
com.ibm.mq.allclient(-9.0.0.[1/5]).jar
v6_client 罐子: 除了 v9_client jars,在eclipse classpath中引入了以下jars
com.ibm.dhbcore-1.0.jar
com.ibm.mq.pcf-6.0.3.jar
com.ibm.mqjms-6.0.2.2.jar
com.ibm.mq-6.0.2.2.jar
com.ibm.mqetclient-6.0.2.2.jar
connector.jar
jta-1.1.jar
测试代码-单线程:
import javax.jms.*;
import com.ibm.mq.jms.*;
import com.ibm.msg.client.wmq.WMQConstants;
public class MQSeries_simpleAuditQ {
private static String queueManager = "QM.RCTQ.ALL.01";
private static String host = "localhost";
private static int port = 26005;
public static void main(String[] args) throws JMSException {
MQXAQueueConnectionFactory queueFactory= new MQXAQueueConnectionFactory();
System.out.println("\n\n\n*******************\nqueueFactory implementation version: " +
queueFactory.getClass().getPackage().getImplementationVersion() + "*****************\n\n\n");
queueFactory.setHostName(host);
queueFactory.setPort(port);
queueFactory.setQueueManager(queueManager);
queueFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
if (queueFactory.getClass().getPackage().getImplementationVersion().split("\\.")[0].equals("9")) {
queueFactory.setProviderVersion("6");
//queueFactory.setShareConvAllowed(WMQConstants.WMQ_SHARE_CONV_ALLOWED_YES);
}
XASession xaSession;
javax.jms.QueueConnection xaQueueConnection;
try {
// Obtain a QueueConnection
System.out.println("Creating Connection...");
xaQueueConnection = (QueueConnection)queueFactory.createXAConnection(" ", "");
xaQueueConnection.start();
for (int counter=0; counter<2; counter++) {
try {
xaSession = ((XAConnection)xaQueueConnection).createXASession();
xaSession.close();
} catch (Exception ex) {
System.out.println(ex);
}
}
System.out.println("Closing connection.... ");
xaQueueConnection.close();
} catch (Exception e) {
System.out.println("Error processing " + e.getMessage());
}
}
}
--[观察] v6_client 仅创建和关闭单个套接字,而 v9_client(均为 9.0.0.[1/5]):
- 在
xaQueueConnection = (QueueConnection)queueFactory.createXAConnection(" ", "");之后创建并关闭套接字 - 在内部for循环中,套接字在
xaSession = ((XAConnection)xaQueueConnection).createXASession();之后创建,在xaSession.close();之后关闭
我天真地期待套接字保持打开状态直到xaQueueConnection.close()。
[更新-2]
对 v9_server+v9_client 使用 queueFactory.setProviderVersion("9"); 和 queueFactory.setShareConvAllowed(WMQConstants.WMQ_SHARE_CONV_ALLOWED_YES);,我们在 v6_server+v9_client 中没有看到相同的持续套接字关闭问题,这是一个好消息。
[update-3] MCAUSER 在 v6_server 上所有 SVRCONN 频道的属性上。在 v9_server 上相同(与相同的 v9_client 连接时不会出现相同的套接字关闭问题)。
display channel (SYSTEM.ADMIN.SVRCONN)
MCAUSER(mqm)
display channel (SYSTEM.AUTO.SVRCONN)
MCAUSER( )
display channel (SYSTEM.DEF.SVRCONN)
MCAUSER( )
[update-4]
我尝试将 MCAUSER() 设置为 mqm,然后从客户端同时使用 (空白)和 mqm,两者都可以创建连接,但仍然看到使用 v9_client+v6_user 关闭相同的意外套接字。更新 MCAUSER() 后,我总是添加refresh security,然后重新启动 qmgr。
在使用空白用户创建连接之前,我还尝试在 Eclipse 中将系统变量设置为空白,但也没有帮助。
[update-5]
我们的讨论仅限于 v9_client+v9_server。下面的异步测试代码使用有限数量的现有连接生成大量 xasession 创建/关闭请求。使用 SHARECNV(1) 也会导致无法承受的高 TIME_WAIT 计数,但使用大于 1 的 SHARECNV(例如 10)可能会带来额外的性能损失......
异步测试代码
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import javax.jms.*;
import com.ibm.mq.jms.*;
import com.ibm.msg.client.wmq.WMQConstants;
public class MQSeries_simpleAuditQ_Async_v9 {
private static String queueManager = "QM.ALPQ.ALL.01";
private static int port = 26010;
private static String host = "localhost";
private static int connCount = 20;
private static int amp = 100;
private static ExecutorService amplifier = Executors.newFixedThreadPool(amp);
public static void main(String[] args) throws JMSException {
MQXAQueueConnectionFactory queueFactory= new MQXAQueueConnectionFactory();
System.out.println("\n\n\n*******************\nqueueFactory implementation version: " +
queueFactory.getClass().getPackage().getImplementationVersion() + "*****************\n\n\n");
queueFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
if (queueFactory.getClass().getPackage().getImplementationVersion().split("\\.")[0].equals("9")) {
queueFactory.setProviderVersion("9");
queueFactory.setShareConvAllowed(WMQConstants.WMQ_SHARE_CONV_ALLOWED_YES);
}
queueFactory.setHostName(host);
queueFactory.setPort(port);
queueFactory.setQueueManager(queueManager);
//queueFactory.setChannel("");
ArrayList<QueueConnection> xaQueueConnections = new ArrayList<QueueConnection>();
try {
// Obtain a QueueConnection
System.out.println("Creating Connection...");
//System.setProperty("user.name", "mqm");
//System.out.println("system username: " + System.getProperty("user.name"));
for (int ct=0; ct<connCount; ct++) {
// xaQueueConnection instance of MQXAQueueConnection
QueueConnection xaQueueConnection = (QueueConnection)queueFactory.createXAConnection(" ", "");
xaQueueConnection.start();
xaQueueConnections.add(xaQueueConnection);
}
ArrayList<Double> totalElapsedTimeRecord = new ArrayList<Double>();
ArrayList<FutureTask<Double>> taskBuffer = new ArrayList<FutureTask<Double>>();
for (int loop=0; loop <= 10; loop++) {
try {
for (int i=0; i<amp; i++) {
int idx = (int)(Math.random()*((connCount)));
System.out.println("Using connection: " + idx);
FutureTask<Double> xaSessionPoker = new FutureTask<Double>(new XASessionPoker(xaQueueConnections.get(idx)));
amplifier.submit(xaSessionPoker);
taskBuffer.add(xaSessionPoker);
}
System.out.println("loop " + loop + " completed");
} catch (Exception ex) {
System.out.println(ex);
}
}
for (FutureTask<Double> xaSessionPoker : taskBuffer) {
totalElapsedTimeRecord.add(xaSessionPoker.get());
}
System.out.println("Average xaSession poking time: " + calcAverage(totalElapsedTimeRecord));
System.out.println("Closing connections.... ");
for (QueueConnection xaQueueConnection : xaQueueConnections) {
xaQueueConnection.close();
}
} catch (Exception e) {
System.out.println("Error processing " + e.getMessage());
}
amplifier.shutdown();
}
private static double calcAverage(ArrayList<Double> myArr) {
double sum = 0;
for (Double val : myArr) {
sum += val;
}
return sum/myArr.size();
}
// create and close session through QueueConnection object passed in.
private static class XASessionPoker implements Callable<Double> {
// conn instance of MQXAQueueConnection. ref. QueueProviderService
private QueueConnection conn;
XASessionPoker(QueueConnection conn) {
this.conn = conn;
}
@Override
public Double call() throws Exception {
XASession xaSession;
double elapsed = 0;
try {
final long start = System.currentTimeMillis();
// ref. DualSessionWrapper
// xaSession instance of MQXAQueueSession
xaSession = ((XAConnection) conn).createXASession();
xaSession.close();
final long end = System.currentTimeMillis();
elapsed = (end - start) / 1000.0;
} catch (Exception e) {
// TODO Auto-generated catch block
System.out.println(e);
}
return elapsed;
}
}
}
【问题讨论】:
-
评论不用于扩展讨论;这个对话是moved to chat。
-
作为本次讨论的受益者,我非常感谢您的热情、知识和帮助 JoshMc,绝对提升了我作为新手用户对这个 mq 产品的理解。同时,我将尝试分享我在 mq 服务器端使用的示例脚本,以防万一它有助于呈现我们遇到的问题的全貌。我了解 v6 是一个非常旧的版本,不再受支持,因此非常感谢您的支持!请不要让它过多地分散您对日常工作的注意力,因为我的目的是试图更好地了解我们遇到的问题。我们的目标是使用 v9 cli+svr。
-
我承认 #1 和 #2 超出了我的知识库,但你的建议对我来说听起来很合理。 #3,从队列管理器的角度来看,设置
SHARECNV(0)使通道在 v6 模式下工作,但不建议这样做,因为它会关闭共享对话之外的其他功能,例如双向心跳。SHARECNV(1)将为每个通道实例提供一个对话/会话,与 v6 相同,但具有双向心跳的优势。 -
SHARECNV(1)是 IBM 推荐的“然后在 8.0 版中引入了分布式服务器的性能增强。为了从与共享对话一起引入的新功能中受益,而不会对分布式服务器产生性能影响,在您的 8.0 版服务器连接通道上将 SHARECNV 设置为 1。”和“...对于分布式服务器,在使用 10 个共享会话的默认配置的通道上处理消息平均比在不使用共享会话的通道上慢 15%。” -
但是在客户端没有模拟 v6 java 类包含的连接池的设置。 #4=没有我知道的 IBM 实现。有关 v6 和 v7.0 及更高版本“IBM MQSeries connection pooling with Tomcat”的更多详细信息,请参阅我对此问题的回答。如果您能够分享,我相信您的 XASession 池解决方案的详细信息会帮助其他人,这是否可以在“自我回答”中详细说明,我会投票赞成。
标签: ibm-mq