【问题标题】:Is there a way to use Kafka Connect with REST Proxy?有没有办法将 Kafka Connect 与 REST 代理一起使用?
【发布时间】:2021-09-08 19:12:06
【问题描述】:

Kafka Connect 源和接收器连接器提供了几乎理想的功能集,无需编写任何代码即可配置数据管道。就我而言,我想用它来集成来自公共 Internet 上的多个数据库服务器(生产者)的数据。

但是,一些生产者无法直接访问 Kafka 代理,因为他们的网络/防火墙配置仅允许到特定主机(端口 443)的流量。不幸的是,我无法真正更改这些设置。

我的想法是使用 Confluent REST 代理,但我了解到 Kafka Connect 使用 KafkaProducer API,因此它需要直接访问代理。

我找到了几个可能的解决方法,但没有一个是完美的:

  1. SSH 隧道 - 如下所述:Consume from a Kafka Cluster through SSH Tunnel
  2. 使用 REST 代理,但用自定义生产者替换 Kafka Connect,在 How can we configure kafka producer behind a firewall/proxy? 中提到
  3. 使用 SSHL 解复用器将流量路由到代理(但只有一个代理)

有没有人遇到过类似的挑战?你是怎么解决的?

【问题讨论】:

    标签: apache-kafka apache-kafka-connect kafka-rest


    【解决方案1】:

    接收器连接器(写入外部系统的连接器)不使用生产者 API。

    话虽如此,您可以使用一些向 REST 代理端点发出 POST 请求的 HTTP 接收器连接器。这并不理想,但可以解决问题。注意:这意味着您有两个集群 - 一个用于通过 Connect 发出 HTTP 请求,另一个位于代理后面。


    总的来说,我看不出这个问题对于 Connect 有什么独特之处,因为您在通过唯一开放的 HTTPS 端口将数据写入 Kafka 的任何其他尝试中都会遇到类似的问题。

    【讨论】:

    • 嗯,这是一个有两个集群的有趣方法。有点过于复杂,但事实是它回答了我的限制,同时仍然给了我 SSH 隧道不具备的灵活性。非常感谢。
    • 在 github 上有另一个名为 kafkaproxy 的项目,它是一个纯 TCP 代理,如果您可能想研究一下
    • 注意:HTTP Sink 连接器是使用商业许可证 (confluent.io/hub/confluentinc/kafka-connect-http) 发布的。在某些情况下,这可能是一个限制因素。
    • 我发现使用 REST 代理配置 Conflent HTTP Sink 连接器有点棘手。似乎连接器发送的 JSON 结构与 REST 代理所期望的不兼容:docs.confluent.io/platform/current/kafka-rest/… 您是否有一个用于 REST 代理的 HTTP Sink 连接器配置脚本示例?
    • 我从来没有说过要使用 Confluent,我说过要使用 一些 HTTP 接收器。 IE。你可以自己写一个发送你需要的格式
    【解决方案2】:

    正如@OneCricketeer 推荐的那样,我尝试了一个带有 REST 代理的 HTTP Sink 连接器方法。 我设法配置了 Confluent HTTP Sink 连接器以及替代连接器 (github.com/llofberg/kafka-connect-rest) 以使用 Confluent REST 代理。

    我正在添加连接器配置,以防任何尝试这种方法的人节省一些时间。

    Confluent HTTP Sink 连接器

        {
        "name": "connector-sink-rest",
        "config": {
            "topics": "test",
            "tasks.max": "1",
            "connector.class": "io.confluent.connect.http.HttpSinkConnector",
            "headers": "Content-Type:application/vnd.kafka.json.v2+json",
            "http.api.url": "http://rest:8082/topics/test",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "value.converter": "org.apache.kafka.connect.storage.StringConverter",
            "value.converter.schemas.enable": "false",
            "batch.prefix": "{\"records\":[",
            "batch.suffix": "]}",
            "batch.max.size": "1",
            "regex.patterns":"^~$",
            "regex.replacements":"{\"value\":~}",
            "regex.separator":"~",
            "confluent.topic.bootstrap.servers": "localhost:9092",
            "confluent.topic.replication.factor": "1"
        }
    }
    

    Kafka Connect REST 连接器

    {
        "name": "connector-sink-rest-v2",
        "config": {
            "connector.class": "com.tm.kafka.connect.rest.RestSinkConnector",
            "tasks.max": "1",
            "topics": "test",
            "rest.sink.url": "http://rest:8082/topics/test",
            "rest.sink.method": "POST",
            "rest.sink.headers": "Content-Type:application/vnd.kafka.json.v2+json",        
            "transforms": "velocityEval",        
            "transforms.velocityEval.type": "org.apache.kafka.connect.transforms.VelocityEval$Value",
            "transforms.velocityEval.template": "{\"records\":[{\"value\":$value}]}",
            "transforms.velocityEval.context": "{}"        
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2021-12-25
      • 2011-10-06
      • 2018-03-15
      • 2016-11-01
      • 1970-01-01
      • 2018-09-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多