【问题标题】:How to pull the data from remote database in Apache Kafka?如何从 Apache Kafka 中的远程数据库中提取数据?
【发布时间】:2019-02-02 11:13:35
【问题描述】:

我想在 Apache Kafka 中制作实时数据管道。我有位于远程位置的数据库,并且该数据库不断更新。任何人都可以使用哪个 Kafka 连接 API 从数据库中提取数据并实时摄取到 Kafka 代理中吗?稍后我将使用 kafka 流和 KSQL 运行临时查询来执行指标。

任何帮助将不胜感激!

【问题讨论】:

  • 抱歉,“数据库”太模糊了。你能澄清一下吗?您找到并尝试过哪些 Kafka Connect?尝试运行它们时遇到什么错误?
  • 如果您的数据库是 hdfs 的关系数据库,您可以使用 confluents 捆绑连接器。 docs.confluent.io/current/connect/connectors.html如果你有其他数据库需要开发自己的kafka连接插件:docs.confluent.io/current/connect/devguide.html
  • 我有 MYSQL 数据库,它位于远程位置,我有 ip、主机 ID 等。我不明白应该使用哪个连接器将数据从 mysql 拉到 kafka 并且数据库在不断更新。我正在尝试制作实时数据管道。

标签: mysql apache-kafka apache-kafka-connect


【解决方案1】:

如果您想创建实时数据管道,您需要使用能够从 MySQL 流式传输更改的更改数据捕获 (CDC) 工具。我建议Debezium 这是一个开源分布式平台,用于捕获变更数据。

捕获插入

当向表中添加新记录时,将生成类似于以下的 JSON:

{  
   "payload":{  
      "before":null,
      "after":{  
         "id":1005,
         "first_name":"Giorgos",
         "last_name":"Myrianthous",
         "email":"giorgos@abc.com"
      },
      "source":{  
         "name":"dbserver1",
         "server_id":223344,
         "ts_sec":1500369632,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":364,
         "row":0,
         "snapshot":null,
         "thread":13,
         "db":"inventory",
         "table":"customers"
      },
      "op":"c",
      "ts_ms":1500369632095
   }
}

before 对象为空,after 对象包含新插入的值。注意op 属性是c,表明这是一个 CREATE 事件。

捕获更新

假设email属性已经更新,会产生一个类似下面的JSON:

{ 
    "payload":{  
      "before":{  
         "id":1005,
         "first_name":"Giorgos",
         "last_name":"Myrianthous",
         "email":"giorgos@abc.com"
      },
      "after":{  
         "id":1005,
         "first_name":"Giorgos",
         "last_name":"Myrianthous",
         "email":"newEmail@abc.com"
      },
      "source":{  
         "name":"dbserver1",
         "server_id":223344,
         "ts_sec":1500369929,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":673,
         "row":0,
         "snapshot":null,
         "thread":13,
         "db":"inventory",
         "table":"customers"
      },
      "op":"u",
      "ts_ms":1500369929464
   }
}

注意 op 现在是 u,表明这是一个 UPDATE 事件。 before 对象显示更新前的行状态,after 对象捕获更新行的当前状态。

捕获删除

现在假设该行已被删除;

{ 
    "payload":{  
      "before":{  
         "id":1005,
         "first_name":"Giorgos",
         "last_name":"Myrianthous",
         "email":"newEmail@abc.com"
      },
      "after":null,
      "source":{  
         "name":"dbserver1",
         "server_id":223344,
         "ts_sec":1500370394,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":1025,
         "row":0,
         "snapshot":null,
         "thread":13,
         "db":"inventory",
         "table":"customers"
      },
      "op":"d",
      "ts_ms":1500370394589
   }
}

opnew 等于d,表示 DELETE 事件。 after 属性将为空,before 对象包含被删除之前的行。

您还可以查看他们网站上提供的extensive tutorial

编辑: Example configuration 用于 MySQL 数据库

{
  "name": "inventory-connector",  (1)
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector", (2)
    "database.hostname": "192.168.99.100", (3)
    "database.port": "3306", (4)
    "database.user": "debezium", (5)
    "database.password": "dbz", (6)
    "database.server.id": "184054", (7)
    "database.server.name": "fullfillment", (8)
    "database.whitelist": "inventory", (9)
    "database.history.kafka.bootstrap.servers": "kafka:9092", (10)
    "database.history.kafka.topic": "dbhistory.fullfillment" (11)
    "include.schema.changes": "true" (12)
  }
}

1 当我们向 Kafka Connect 注册连接器时的名称 服务。
2 此 MySQL 连接器类的名称。
3 地址 MySQL 服务器。
4 MySQL 服务器的端口号。
5 名称 具有所需权限的 MySQL 用户。
6 密码 具有所需权限的 MySQL 用户。
7 连接器的 在 MySQL 集群中必须是唯一的标识符,并且类似于 MySQL 的 server-id 配置属性。
8 逻辑名称 MySQL server/cluster,形成一个命名空间,用于所有的 连接器写入的 Kafka 主题的名称,Kafka 连接模式名称和相应 Avro 的命名空间 使用 Avro 连接器时的模式。
9 所有数据库的列表 由此连接器将监视的此服务器托管。这是 可选的,还有其他属性用于列出数据库和 要包括或排除在监视之外的表。
10 卡夫卡列表 此连接器将用于写入和恢复 DDL 的代理 数据库历史主题的语句。
11 数据库名称 连接器将写入和恢复 DDL 的历史主题 陈述。本主题仅供内部使用,请勿使用 由消费者。
12 指定连接器应 在名为 fullfillment events 的架构更改主题上生成 可供消费者使用的 DDL 更改。

【讨论】:

  • 我读过它只是想知道我需要在哪里提及远程数据库凭据大多数博主都展示了本地数据库的示例。
  • @PSP 而不是localhost,只需使用你想要的ip。
  • @PSP 我已经编辑了我的答案以包含 MySQL 的示例配置。
【解决方案2】:

如果您从 MySQL 数据库中读取数据,请使用 Confluent 的 JDBC Source 连接器。 https://github.com/confluentinc/kafka-connect-jdbc/ 您还需要下载 MYSQL 驱动程序并将其与 kafka jar 一起放入:https://dev.mysql.com/downloads/connector/j/5.1.html

【讨论】:

  • 你能分享示例代码,以便我更清楚。我仍然想知道我需要在哪里配置远程数据库凭据,例如主机名、IP、用户名、密码等。
猜你喜欢
  • 2019-02-07
  • 1970-01-01
  • 1970-01-01
  • 2013-12-19
  • 2016-07-10
  • 2011-10-14
  • 1970-01-01
  • 1970-01-01
  • 2018-06-16
相关资源
最近更新 更多