请查看我在Karaf的OSGi中构建普通CXF服务(不使用Camel)的其他文章 。
这是有关如何
- 创建一个CXF REST服务
- 使用骆驼多播(并并行化)传入的请求
- 来自两个不同服务的源数据
- 汇总响应并
- 最后将合并结果作为JSON返回给最终用户。
简单来说,此应用程序可以做什么
此服务的预期结果是一个硬编码的响应,看起来像
从图像中可以看到,响应的顶部来自一个名为NameEmailService的服务,响应的第二部分来自一个名为AgePhoneService的服务。 同时充实数据的调用和合并结果实体– ConsolidatedSearchResult被填充。
项目结构如下所示:
步骤1有两个婴儿步骤。
步骤1.a –创建CXF REST服务
您可能已经猜到了,这一步没有什么复杂的。 只是一个接口和一个实现。
接口
@Path("rest")
public interface RestService {
@GET
@Path("query/{queryString}")
@Produces(MediaType.APPLICATION_JSON)
public String sourceResultsFromTwoSources(@PathParam("queryString") String queryString);
}
实作
public class RestServiceImpl implements RestService {
private static Logger logger= LoggerFactory.getLogger(AgePhoneServiceImpl.class);
private NameEmailService nameEmailService;
private AgePhoneService agePhoneService;
public RestServiceImpl(){
}
//Do nothing. Camel intercepts and routes the requests
public String sourceResultsFromTwoSources(String queryString) {
return null;
}
public NameEmailResult getNameEmailResult(String queryString){
logger.info("Invoking getNameEmailResult from RestServiceImpl");
return nameEmailService.getNameAndEmail(queryString);
}
public AgePhoneResult getAgePhoneResult(String queryString){
logger.info("Invoking getAgePhoneResult from RestServiceImpl");
return agePhoneService.getAgePhoneResult(queryString);
}
public NameEmailService getNameEmailService() {
return nameEmailService;
}
public AgePhoneService getAgePhoneService() {
return agePhoneService;
}
public void setNameEmailService(NameEmailService nameEmailService) {
this.nameEmailService = nameEmailService;
}
public void setAgePhoneService(AgePhoneService agePhoneService) {
this.agePhoneService = agePhoneService;
}
}
请注意,方法实现sourceResultsFromTwoSources返回null。 事实是,进行REST调用时甚至不会调用此方法。 骆驼拦截所有对URL的请求并将其路由到各个端点(在我们的例子中,调用两个方法– getNameEmailResult()和getAgePhoneResult() )。
步骤1.b –创建服务实现
NameEmailService和AgePhoneService的Kiddish实现如下:
NameEmailServiceImpl
public class NameEmailServiceImpl implements NameEmailService {
public NameEmailResult getNameAndEmail(String queryString){
return new NameEmailResult("Arun", "[email protected]");
}
}
AgePhoneServiceImpl
public class AgePhoneServiceImpl implements AgePhoneService {
public AgePhoneResult getAgePhoneResult(String queryString){
return new AgePhoneResult(32, "111-222-333");
}
}
第2、3、4和5步
好吧,当我说2、3、4和5是4个步骤时,我撒了谎。 使用Camel路由及其企业集成模式实现,所有这些操作都只需一步即可完成。
RestToBeanRouter
public class RestToBeanRouter extends RouteBuilder {
@Override
public void configure() throws Exception {
from ("cxfrs://bean://rsServer")
.multicast()
.parallelProcessing()
.aggregationStrategy(new ResultAggregator())
.beanRef("restServiceImpl", "getNameEmailResult")
.beanRef("restServiceImpl", "getAgePhoneResult")
.end()
.marshal().json(JsonLibrary.Jackson)
.to("log://camelLogger?level=DEBUG");
}
}
我们的路由说明
简而言之,routerbuilder所做的就是
1) from ("cxfrs://bean://rsServer")拦截对在rest-blueprint.xml定义的JAX-RS服务器端点的所有请求,如下所示:
rest-blueprint.xml
<cxf:rsServer id="rsServer" address="/karafcxfcamel"
serviceClass="me.rerun.karafcxfcamel.rest.RestServiceImpl"
loggingFeatureEnabled="true" />
2) .multicast()将未更改的原始请求转发到
1. `getNameEmailResult` & 2. `getAgePhoneResult` methods in `RestServiceImpl`
3) .parallelProcessing()并发调用这些方法。
4) .aggregationStrategy(new ResultAggregator())指定如何汇总来自各种多播源的结果。
我们的聚合器看起来像:
结果聚合器
public class ResultAggregator implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
ConsolidatedSearchResult consolidatedSearchResult=null;
if (oldExchange==null){
consolidatedSearchResult=new ConsolidatedSearchResult();
}
else{
consolidatedSearchResult=oldExchange.getIn().getBody(ConsolidatedSearchResult.class);
}
NameEmailResult nameEmailResult=newExchange.getIn().getBody(NameEmailResult.class);
AgePhoneResult agePhoneResult=newExchange.getIn().getBody(AgePhoneResult.class);
if (nameEmailResult!=null){
consolidatedSearchResult.setNameEmailResult(nameEmailResult);
}
if (agePhoneResult!=null){
consolidatedSearchResult.setAgePhoneResult(agePhoneResult);
}
newExchange.getIn().setBody(consolidatedSearchResult);
return newExchange;
}
}
我们的聚合器说明
我们的ResultAggregator中的aggregate方法虽然有些粗糙,但是可以完成工作。
- 每当所有多播端点完成时,都将调用
aggregate方法。 - 因此,第一次,oldExchange将为null。 我们以此为契机来构建我们想要响应用户的最终合并结果实体。
- 我们检查传入的newExchange是调用NameEmailService还是AgePhoneService的结果,并相应地填充合并的实体。
- 最后,我们返回合并的实体–返还完成两项工作。
- 合并实体以oldExchange的形式出现,以供下一次调用
aggregate方法。 (更像是链接–实体最后返回的对象是作为下一个调用的传入交换传入的对象) - 如果它是
aggregate的最后一个调用(所有多播端点调用已完成),则将其返回给用户。
- 合并实体以oldExchange的形式出现,以供下一次调用