【问题标题】:Implementing Request Response in Apache Kafka java在 Apache Kafka java 中实现请求响应
【发布时间】:2020-09-03 14:49:20
【问题描述】:

请找到我们需要实现的用例。

首先,我们需要调用 Kafka 生产者一条消息作为休息服务,他们将在另一个主题中处理并返回响应。

对我们来说,这是一个请求-回复主题,我们需要回复同一个请求的响应,使用 replykafka 模板工作正常,但我们可以设置 co-relation id 在标题中。

作为主题消息元数据,有发送属性,有没有办法将关联ID与请求主题消息和回复主题消息映射。

给你解释清楚。

一个微服务期望负载如下所示,负载中包含 correlationId

{
  "operationDate": "2020-09-16T11:58:25",
  "correlationId": "-5544538377183901824042719876882142227",
  "birthDate": "2013-12-12",
  "firstNameEn": "boby",
  "firstNameAr": "الشيخ",
}

微服务将处理payload,并在另一个主题中给出响应。

{
  "correlationId": -5544538377183901824042719876882142227,
  "consumerId": null,
  "userid": 123456,
  "statusCode": "SUCCESS",
  "errors": null
}

现在我们需要使用 spring ReplyingKafkaTemplate 来实现。

由于 ReplyingKafkaTemplate 仅适用于标头中的 correlationId

【问题讨论】:

  • 抱歉,不清楚在请求ProducerRecord 中携带相关ID 的想法是什么?您不能为此使用标题吗?为什么?
  • 看我的回答;使用correlationIdStrategyFunction
  • @ArtemBilan 你说得对,我们应该或者可以在标头中使用,但是我们调用的微服务只允许在有效负载中而不是在标头中。这也是我们需要在有效负载中挂钩相同的关联 id 的问题。
  • @GaryRussell 我们需要一种方法,以便我们可以附加有效负载,并且他们将处理主题中的消息并放入另一个具有相同关联 id 的 kafka 主题
  • 更清晰。请参阅我的答案的编辑。

标签: spring-boot apache-kafka spring-kafka request-response


【解决方案1】:

假设您的意思是要在相关 ID 中包含主题,请参阅

/**
 * Set a function to be called to establish a unique correlation key for each request
 * record.
 * @param correlationStrategy the function.
 * @since 2.3
 */
public void setCorrelationIdStrategy(Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy) {

您可以根据ProducerRecord(具有topic())创建自己的关联ID。

您只需要确保它是独一无二的。如果您手动设置KafkaHeaders.REPLY_TOPIC,它将对策略可见。

编辑

使用负载中的相关 ID,使用 setCorrelationIdStrategy 从负载中提取相关 ID,并添加 RecordInterceptor 在回复端执行相同操作。

【讨论】:

  • 我认为他的观点是他们不能依赖这个headers.add(new RecordHeader(this.correlationHeaderName, correlationId.getCorrelationId()));。听起来另一部分无法读取 Kafka 标头...
  • 我同意;问题不明确;让我们等待澄清:)
  • 您好,问题是使用spring replaykafka 默认Kafka模板会在消息头中添加correlationId,如果replaycontainer中出现相同的correlationId,它将映射到相同的请求。
【解决方案2】:

感谢您的提示。

我已经用 Kafka 标头相关 ID 覆盖了有效负载。

@Override
    protected ListenableFuture<SendResult> doSend(ProducerRecord producerRecord) {
        if(producerRecord.value()!=null){
           // i have appeneded the header correlationId in th payload
        }
        return super.doSend(producerRecord);
    }

在 Replay onMessage 中,我已将响应负载相关性 ID 填充到标题中。

@Override
    public void onMessage(List<ConsumerRecord<K, R>> data) {
        data.forEach(
            krConsumerRecord -> //update each record header
        );
        super.onMessage(data);
    }

通过这种方式成功地将 request-response 语义与correlationId 集成到请求和响应负载中。

【讨论】:

    猜你喜欢
    • 2021-11-19
    • 2021-11-01
    • 2014-04-11
    • 2013-08-22
    • 1970-01-01
    • 2021-11-19
    • 1970-01-01
    • 1970-01-01
    • 2011-03-04
    相关资源
    最近更新 更多