【问题标题】:Synchronous spring webflux call retaining the order of operation保持操作顺序的同步spring webflux调用
【发布时间】:2019-08-26 22:08:30
【问题描述】:

我有一个简单的用例,不知道该怎么做,因为我是 spring webflux 的新手。

我正在使用spring boot webflux starters。 我需要调用 2 个端点。比如说Endpoint1Endpoint2

每当Endpoint1 被请求击中时,我应该先用相同的请求击中Endpoint2,然后使用来自Endpoint2 的响应来丰富原始请求,然后再做一些事情。 在做任何事情之前,Endpoint1 的请求对象需要使用来自Endpoint2 的响应来丰富。如何使用 Spring webflux 强制执行此命令?在我的例子中,原始请求对象在进一步使用之前没有得到丰富。非常感谢您对此的任何帮助!!!

仅供参考 - 使用 webclient 完成对 Endpoint2 的调用

只是一个伪代码:

public Mono<Response1> endpoint1(Request1 request1){

  Flux<Response2> reponse2 = webclient.getEndpoint2(request1); // Returns a Flux

  //use the above reponse2 to enrich the request1

  return webclient.getSomething(request1); //Returns Mono<Response1>

}

实际代码:


 public Mono<ApplicationResponse> save(ApplicationRequest request) {

        return Mono.subscriberContext().flatMap(ctx -> {

            Mono blockingWrapper =  Mono.fromCallable(() ->
                    service.getId(request)
                            .subscriberContext(ctx)
                            .subscribe(id -> request.setId(id))
            ).subscribeOn(Schedulers.elastic());

            return blockingWrapper.flatMap(o -> authService.getAccessToken()
                    .flatMap(token -> post("/save", request,
                            token.getAccessToken(),
                            ctx)
                            .bodyToMono(ApplicationResponse.class))
                    .log());
        });
    }

【问题讨论】:

  • 在这种情况下您应该使用转换函数。但是,我看不出在这种情况下如何使用 response2 响应,因为它是 Flux。举个例子:Request2 返回一个 Flux: 1 -- 3 -- 7 -- End。您如何将它们用于 request1?
  • 请求中需要扩充的字段是一个列表。所以应该使用通量 response2 来填充该列表。如果不可能,请告诉我。

标签: spring-boot spring-webflux project-reactor reactor


【解决方案1】:

如果你确定你会有一个带有 getEndpoint2(request1) 的 Flux,在这种情况下,你可以使用 collectList():

return webclient.getEndpoint2(request1) // Flux<Response2>
         .collectList() // Mono<List<Response2>>
         .flatMap(list -> {
            // ... should handle empty list if needed
            finalRequest = createRequest(request1, list);
            return webclient.getSomething(finalRequest); // Mono<Response1>
         });

【讨论】:

  • 非常感谢您的帮助@htn。真的很有帮助!!!我看到一些有趣的事情正在发生。如果我从 Controller 类编排所有这些,它会按预期工作,而如果我从我的 Controller 类调用一个服务来编排这个流程,它似乎不会按预期工作。只是想知道我错过了什么?或者这就是它的意思?我会将工作代码和非工作代码添加到答案部分以供进一步参考。
【解决方案2】:

我看到一些有趣的事情正在发生。如果我从 Controller 类编排它,它会按预期工作,而如果我从我的 Controller 类调用一个服务来编排这个流,它似乎不会按预期工作。只是想知道我错过了什么?或者这就是它的工作方式?

这是工作代码:

@RestController
@RequestMapping("/applications")
@Slf4j
@RequiredArgsConstructor
public class ApplicationController {

    private final ApplicationService applicationService;
    private final ApplicationRequestMapper requestMapper;
    private final FeesService feesService;

    @PostMapping(value = "/save")
    public Mono<Application> saveApplication(@RequestBody ApplicationRequest request) {

        ApplicationRequest applicationRequest = requestMapper.apply(request);

        return Mono.subscriberContext()
                .flatMap(context -> feesService.calculateApplicationFees(applicationRequest)
                        .collectList())
                .map(feeItems -> applicationRequest.getFeeItems().addAll(feeItems))
                .flatMap(isRequestEnriched -> applicationService.saveApplication(applicationRequest)
                        .map(saveApplicationResponse -> {
                            Application application = new Application();
                            application.setLicenceId(saveApplicationResponse.getResponse().getLicenceNumber());
                            return application;
                        }))
                .onErrorMap(throwable -> new ApplicationException(String.format(SAVE_ERROR_MESSAGE,
                        request.getLicenceId()),
                        throwable, true, false))
                .log();
    }
}


@Service
@Slf4j
@RequiredArgsConstructor
public class ApplicationService extends ClientService{

     private final AuthenticationService authenticationService;  

         public Mono<SaveApplicationResponse> saveApplication(ApplicationRequest request) {

            return Mono.subscriberContext()
                .flatMap(context -> authenticationService.getAccessToken()
                        .flatMap(token -> post("/save",
                                request,
                                token.getAccessToken(),
                                context)
                                .bodyToMono(SaveApplicationResponse.class))
                        .log());
    }
}



@Service
@Slf4j
@RequiredArgsConstructor
public class FeesService extends ClientService{

     private final AuthenticationService authenticationService;  

        public Flux<FeeItem> calculateApplicationFees(ApplicationRequest request) {

        return Mono.subscriberContext()
                .flatMap(ctx -> authenticationService.getAccessToken()
                        .flatMap(token -> get("/fees", request, token.getAccessToken(), ctx)
                                .bodyToMono(FeeResponse.class))
                        .log())
                .flatMapMany(rsp -> Flux.fromIterable(rsp.getFeeItems()));
    }
}

如果我这样做就不起作用..意思是,请求永远不会丰富:



@RestController
@RequestMapping("/applications")
@Slf4j
@RequiredArgsConstructor
public class ApplicationController {

    private final ApplicationService applicationService;
    private final ApplicationRequestMapper requestMapper;

     @PostMapping(value = "/save")
        public Mono<Application> saveApplication(@RequestBody ApplicationRequest request) {
            return Mono.subscriberContext()
                    .flatMap(context -> applicationService.saveApplication(requestMapper.apply(request))
                            .map(saveApplicationResponse -> {
                                Application application = new Application();
                                application.setLicenceId(saveApplicationResponse.getResponse().getLicenceNumber());
                                return application;
                            }))
                    .onErrorMap(throwable -> new ApplicationException(String.format(SAVE_ERROR_MESSAGE,
                            request.getLicenceId()),
                            throwable, true, false))
                    .log();
        }

}

@Service
@Slf4j
@RequiredArgsConstructor
public class ApplicationService extends ClientService{

     private final AuthenticationService authenticationService;
     private final FeesService feesService;


         public Mono<SaveApplicationResponse> saveApplication(ApplicationRequest request) {

            return Mono.subscriberContext()
                    .flatMap(context -> feesService.calculateApplicationFees(request)
                            .collectList())
                    .map(feeItems -> request.getFeeItems().addAll(feeItems))
                    .subscriberContext()
                    .flatMap(context -> authenticationService.getAccessToken()
                            .flatMap(token -> post("/save",
                                    request,
                                    token.getAccessToken(),
                                    context)
                                    .bodyToMono(SaveApplicationResponse.class))
                            .log());
        }
}



@Service
@Slf4j
@RequiredArgsConstructor
public class FeesService extends ClientService{

     private final AuthenticationService authenticationService;  

        public Flux<FeeItem> calculateApplicationFees(ApplicationRequest request) {

        return Mono.subscriberContext()
                .flatMap(ctx -> authenticationService.getAccessToken()
                        .flatMap(token -> get("/fees", request, token.getAccessToken(), ctx)
                                .bodyToMono(FeeResponse.class))
                        .log())
                .flatMapMany(rsp -> Flux.fromIterable(rsp.getFeeItems()));
    }
}

【讨论】:

  • 你的问题来自第二个.subscriberContext()。它是一个静态方法,它创建一个新的Mono,这意味着它之前的代码永远不会执行,这就是为什么request 对象不会改变。无论如何,你的代码很乱。让它更简单。据我阅读您的代码,您根本不需要FluxfeesService.calculateApplicationFees(...) 应该返回 Mono&lt;List&lt;FeeItem&gt;&gt;。有太多不必要的.log()Mono.subscriberContext()。你甚至需要这里的上下文吗?
  • 感谢您的反馈@htn。感谢您提出代码中的问题。我有点考虑在上下文中设置一些属性,以便这些属性可以级联到所有调用,这就是有这些上下文的原因。只是想知道有没有其他方法可以做到这一点。
【解决方案3】:

您的问题来自第二个.subscriberContext()。它是一个静态方法,它创建一个新的Mono,这意味着它之前的代码永远不会执行,这就是为什么request对象不会改变。

不管怎样,你的代码很乱。让它更简单。据我阅读您的代码,您根本不需要FluxfeesService.calculateApplicationFees(...) 应该返回 Mono&lt;List&lt;FeeItem&gt;&gt;。有太多不必要的.log()Mono.subscriberContext()。你甚至需要这里的上下文吗?

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-26
    • 2018-02-14
    • 2012-06-22
    • 1970-01-01
    • 2020-06-05
    • 1970-01-01
    相关资源
    最近更新 更多