问题:可以截获 SOAP 消息,但在与原始调用非常不同的位置
地点。很难将 SOAP 消息传递回原始调用站点,尤其是在
多线程或异步环境。
我能看到的唯一解决方案是明确地只有一个 JAX-WS 代理,它有一个处理程序,每个
要求。应用程序只有一个代理会成为瓶颈,因此需要使用
允许并行和异步执行的多线程工具。
这是我的想法,在代码中。首先我一步一步地完成它,最后有一个转储
代码。
更新:我已将LinkedBlockingQueue 替换为静态ThreadLocal<SoapApiWrapper>
实例,以及带有newWorkStealingPool() 的执行者。查看更改的编辑历史记录!
我已将其设置为使用http://www.dneonline.com/calculator.asmx。它编译并运行,但我没有
花了很多时间确保它正常工作或处于最佳状态。我确定有问题(我的 CPU 风扇
正在努力工作,即使我没有运行代码)。请注意!
(有谁知道我可以在本地运行或泛滥的更好的公共 SOAP API
请求?)
如果您想测试,这里有一些公共 SOAP API:
https://documenter.getpostman.com/view/8854915/Szf26WHn
一步一步
-
实现将捕获消息的SOAPHandler 类,称为SoapMessageHandler。
public class SoapMessageHandler implements SOAPHandler<SOAPMessageContext> {
// capture messages in a list
private final List<SOAPMessageContext> messages = new ArrayList<>();
// get & clear messages
public List<SOAPMessageContext> collectMessages() {
var m = new ArrayList<>(messages);
messages.clear();
return m;
}
@Override
public boolean handleMessage(SOAPMessageContext context) {
messages.add(context); // collect message
return true;
}
@Override
public boolean handleFault(SOAPMessageContext context) {
messages.add(context); // collect error
return true;
}
}
-
定义一个SoapApiWrapper 类
- 创建一个
SoapMessageHandler,
- 创建一个 JAX-WS 代理,
- 并将处理程序添加到代理。
class SoapApiWrapper {
// 1. create a handler
private final SoapMessageHandler soapMessageHandler = new SoapMessageHandler();
private final CalculatorSoap connection;
public SoapApiWrapper() {
// 2. create one connection
var factoryBean = new JaxWsProxyFactoryBean();
factoryBean.setAddress("http://www.dneonline.com/calculator.asmx");
factoryBean.setServiceClass(CalculatorSoap.class);
// 3. add the Handler
factoryBean.setHandlers(Collections.singletonList(soapMessageHandler));
connection = factoryBean.create(CalculatorSoap.class);
}
}
-
定义一个SoapApiManager 具有
-
ExecutorService,它将管理 SOAP 请求和响应
- 一个
ThreadLocal<SoapApiWrapper>,所以每个线程都有一个JAX-WS Proxy(idea
来自https://stackoverflow.com/a/16680215/4161471)
public class SoapApiManager {
// 1. request executor
private static final ExecutorService executorService = Executors.newWorkStealingPool(THREAD_LIMIT);
private static final ThreadLocal<SoapApiWrapper> soapApiWrapper = ThreadLocal.withInitial(SoapApiWrapper::new);
}
-
SoapApiManager 有一个方法,submitRequest(...)。它将返回 SOAP API 响应**
和** SOAP 消息。
public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT>> submitRequest(
SoapRequestRunner<ResponseT> requestRunner
) {
//...
}
参数是SoapRequestRunner,一个接受 JAX-WS 代理并返回 SOAP 的 lambda
回应。
@FunctionalInterface
interface SoapRequestRunner<ResponseT> {
ResponseT sendRequest(CalculatorSoap calculatorSoap);
}
调用时,submitRequest(...) 执行以下操作:
- 用
CompleteableFuture.supplyAsync(...) 包裹SoapRequestRunner,并使用
我们的ExectutorService
- 从
ThreadLocal 中获取SoapApiWrapper,
- 调用 SOAP API(通过将
SoapRequestRunner 应用到 SoapApiWrapper 的 JAX-WS
代理)
- 等待 SOAP 结果,
- 从
SoapApiWrapper的SOAPHandler中提取SOAP消息,
- 最后,将 SOAP 结果和 SOAP 消息捆绑在 DTO 中,
SoapResponseHolder
public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT>> submitRequest(
SoapRequestRunner<ResponseT> requestRunner
) { // 1. use CompletableFuture & executorService
return CompletableFuture.supplyAsync(createRequestCall(requestRunner), executorService);
}
private <ResponseT> Supplier<SoapResponseHolder<ResponseT>> createRequestCall(
SoapRequestRunner<ResponseT> requestRunner
) {
return () -> {
SoapApiWrapper api = null;
try {
api = soapApiWrapperQueue.get(); // 2. fetch an API Wrapper
var response = requestRunner.sendRequest(api.connection); // 3&4. request & response
var messages = api.soapMessageHandler.collectMessages(); // 5. extract raw SOAP messages
return new SoapResponseHolder<>(response, messages); // 6. bundle into DTO
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (api != null) {
soapApiWrapperQueue.offer(api);
}
}
};
}
示例用法
public class Main {
public static void main(String[] args) {
SoapApiManager apiManager = new SoapApiManager();
apiManager
.submitRequest((soapApi) -> soapApi.add(5, 4))
.thenAccept(response -> {
// we can get the SOAP API response
var sum = response.getResponse();
// and also the intercepted messages!
var messages = response.getMessages();
var allXml = messages.stream().map(Main::getRawXml).collect(Collectors.joining("\n---\n"));
System.out.println("sum: " + sum + ",\n" + allXml);
});
}
public static String getRawXml(SOAPMessageContext context) {
try {
ByteArrayOutputStream byteOS = new ByteArrayOutputStream();
context.getMessage().writeTo(byteOS);
return byteOS.toString(StandardCharsets.UTF_8);
} catch (SOAPException | IOException e) {
throw new RuntimeException(e);
}
}
}
输出
sum: 105,
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
<soap:Body>
<Add xmlns="http://tempuri.org/">
<intA>73</intA>
<intB>32</intB>
</Add>
</soap:Body>
</soap:Envelope>
---
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body>
<AddResponse xmlns="http://tempuri.org/">
<AddResult>105</AddResult>
</AddResponse>
</soap:Body>
</soap:Envelope>
所有代码
这是一个工作示例,完成了对响应的验证。
它会创建大量 (REQUESTS_COUNT) 请求并将它们全部提交给 SoapApiManager。
每个请求都会打印出线程的名称,以及 JAX-WS 代理的哈希码(我想检查
它们被重用),以及基本的输入/输出(例如-9 - 99 = -108)。
验证以确保每个 SoapResponseHolder 具有正确的结果和原始 SOAP
消息,并且发送了正确数量的请求。
Main.java
import com.github.underscore.lodash.Xml;
import com.github.underscore.lodash.Xml.XmlStringBuilder.Step;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.xml.soap.SOAPException;
import javax.xml.ws.handler.soap.SOAPMessageContext;
public class Main implements AutoCloseable {
private final SoapApiManager apiManager = new SoapApiManager();
private static final int THREAD_COUNT = 4;
private static final int REQUESTS_COUNT = 500;
private final AtomicInteger i = new AtomicInteger();
public static void main(String[] args)
throws InterruptedException {
try (var m = new Main()) {
m.run();
}
}
private void run() throws InterruptedException {
var executor = Executors.newFixedThreadPool(THREAD_COUNT);
var tasks = Stream.generate(() -> Map.entry(randomInt(), randomInt()))
.limit(REQUESTS_COUNT)
.map(intA -> (Callable<Boolean>) () -> {
sendAndValidateRequest(intA.getKey(), intA.getValue());
i.incrementAndGet();
return true;
})
.collect(Collectors.toList());
executor.invokeAll(tasks);
var waiter = Executors.newSingleThreadScheduledExecutor();
waiter.scheduleWithFixedDelay(
() -> {
var size = i.get();
System.out.println(">waiting... (size " + size + ")");
if (size >= REQUESTS_COUNT) {
System.out.println(">finished waiting! " + size);
waiter.shutdownNow();
}
},
3, 3, TimeUnit.SECONDS
);
System.out.println("Finished sending tasks " + waiter.awaitTermination(10, TimeUnit.SECONDS));
waiter.shutdownNow();
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
executor.shutdown();
System.out.println(
"executor.awaitTermination " + executor.awaitTermination(10, TimeUnit.SECONDS));
if (!executor.isTerminated()) {
System.out.println("executor.shutdownNow " + executor.shutdownNow());
}
if (i.get() != REQUESTS_COUNT) {
throw new RuntimeException(
"Test did not execute " + REQUESTS_COUNT + " times, actual: " + i.get()
);
}
}
private int randomInt() {
return ThreadLocalRandom.current().nextInt(-100, 100);
}
private void sendAndValidateRequest(int a, int b) {
apiManager
.submitRequest((soapApi) -> {
var response = soapApi.add(a, b);
System.out.printf(
"[%-12s / %-18s] %4d %s %3d = %4d\n",
soapApi.hashCode(),
Thread.currentThread().getName(),
a,
(b >= 0 ? "+" : "-"),
Math.abs(b),
response
);
return response;
})
.thenAcceptAsync(response -> {
var sum = response.getResponse();
var messages = response.getMessages();
var allXml = messages.stream().map(Main::getRawXml)
.collect(Collectors.joining("\n---\n"));
if (sum != a + b) {
throw new RuntimeException(
"Bad sum, sent " + a + " + " + b + ", result: " + sum + ", xml: " + allXml
);
}
if (messages.size() != 2) {
throw new RuntimeException(
"Bad messages, expected 1 request and 1 response, but got " + messages.size()
+ ", xml: " + allXml
);
}
if (!allXml.contains("<AddResult>" + (a + b) + "</AddResult>")) {
throw new RuntimeException(
"Bad result, did not contain AddResult=" + (a + b) + ", actual: " + allXml
);
}
});
}
public static String getRawXml(SOAPMessageContext context) {
try (var byteOS = new ByteArrayOutputStream()) {
context.getMessage().writeTo(byteOS);
var rawSoap = byteOS.toString(StandardCharsets.UTF_8);
return Xml.formatXml(rawSoap, Step.TWO_SPACES);
} catch (SOAPException | IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
apiManager.close();
}
}
SoapApiManager.java
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.function.Supplier;
import javax.xml.ws.handler.soap.SOAPMessageContext;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.tempuri.CalculatorSoap;
public class SoapApiManager implements AutoCloseable {
private static final int THREAD_LIMIT = Math.min(Runtime.getRuntime().availableProcessors(), 5);
private static final ExecutorService executorService = Executors.newWorkStealingPool(THREAD_LIMIT);
private static final ThreadLocal<SoapApiWrapper> soapApiWrapper = ThreadLocal.withInitial(SoapApiWrapper::new);
@Override
public void close() {
executorService.shutdown();
}
private static class SoapApiWrapper {
private final CalculatorSoap connection;
private final SoapMessageHandler soapMessageHandler = new SoapMessageHandler();
public SoapApiWrapper() {
var factoryBean = new JaxWsProxyFactoryBean();
factoryBean.setAddress("http://www.dneonline.com/calculator.asmx");
factoryBean.setServiceClass(CalculatorSoap.class);
factoryBean.setHandlers(Collections.singletonList(soapMessageHandler));
connection = factoryBean.create(CalculatorSoap.class);
}
}
public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT>> submitRequest(
SoapRequestRunner<ResponseT> requestRunner
) {
return CompletableFuture.supplyAsync(createRequestCall(requestRunner), executorService);
}
private <ResponseT> Supplier<SoapResponseHolder<ResponseT>> createRequestCall(
SoapRequestRunner<ResponseT> requestRunner
) {
return () -> {
SoapApiWrapper api = soapApiWrapper.get();
var response = requestRunner.sendRequest(api.connection);
var messages = api.soapMessageHandler.collectMessages();
return new SoapResponseHolder<>(response, messages);
};
}
@FunctionalInterface
interface SoapRequestRunner<ResponseT> {
ResponseT sendRequest(CalculatorSoap calculatorSoap);
}
public static class SoapResponseHolder<ResponseT> {
private final List<SOAPMessageContext> messages;
private final ResponseT response;
SoapResponseHolder(
ResponseT response,
List<SOAPMessageContext> messages
) {
this.response = response;
this.messages = messages;
}
public ResponseT getResponse() {
return response;
}
public List<SOAPMessageContext> getMessages() {
return messages;
}
}
}
SoapMessageHandler.java
package org.example;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import javax.xml.namespace.QName;
import javax.xml.ws.handler.MessageContext;
import javax.xml.ws.handler.soap.SOAPHandler;
import javax.xml.ws.handler.soap.SOAPMessageContext;
public class SoapMessageHandler implements SOAPHandler<SOAPMessageContext> {
private final List<SOAPMessageContext> messages = new ArrayList<>();
public List<SOAPMessageContext> collectMessages() {
var m = new ArrayList<>(messages);
messages.clear();
return m;
}
@Override
public Set<QName> getHeaders() {
return Collections.emptySet();
}
@Override
public boolean handleMessage(SOAPMessageContext context) {
messages.add(context);
return true;
}
@Override
public boolean handleFault(SOAPMessageContext context) {
messages.add(context);
return true;
}
@Override
public void close(MessageContext context) {
}
}
build.gradle.kts
plugins {
java
id("com.github.bjornvester.wsdl2java") version "1.2"
}
group = "org.example"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
implementation(enforcedPlatform("org.apache.cxf:cxf-bom:3.4.4"))
implementation("org.apache.cxf:cxf-core")
implementation("org.apache.cxf:cxf-rt-frontend-jaxws")
implementation("org.apache.cxf:cxf-rt-transports-http")
implementation("org.apache.cxf:cxf-rt-databinding-jaxb")
// implementation("org.apache.cxf:cxf-rt-transports-http-jetty")
implementation("org.apache.cxf:cxf-rt-transports-http-hc")
implementation("com.sun.activation:javax.activation:1.2.0")
implementation("javax.annotation:javax.annotation-api:1.3.2")
implementation("com.sun.xml.messaging.saaj:saaj-impl:1.5.1")
implementation("com.github.javadev:underscore:1.68")
//<editor-fold desc="JAXB">
implementation("org.jvnet.jaxb2_commons:jaxb2-basics-runtime:1.11.1")
xjcPlugins("org.jvnet.jaxb2_commons:jaxb2-basics:1.11.1")
//</editor-fold>
//<editor-fold desc="Test">
testImplementation(enforcedPlatform("org.junit:junit-bom:5.7.2")) // JUnit 5 BOM
testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.0")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
//</editor-fold>
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(11))
}
}
wsdl2java {
cxfVersion.set("3.4.4")
options.addAll("-xjc-Xequals", "-xjc-XhashCode")
}
tasks.test {
useJUnitPlatform()
}