【问题标题】:Messages saved Kafka Topic not saving correctly via Kafka Connector消息已保存 Kafka 主题未通过 Kafka 连接器正确保存
【发布时间】:2019-07-26 08:37:38
【问题描述】:

所以我设置了一个 Confluent Kafka JDBC 连接器。首先,我启动了一个模式注册表,例如

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

这是 schema-registery.properties 文件

listeners=http://0.0.0.0:8081
kafkastore.connection.url=zookeeperhost:2181
kafkastore.bootstrap.servers=PLAINTEXT://kafkahost:9092
kafkastore.topic=_schemas
debug=false

接下来我像这样启动一个独立的连接器

./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./jdbc-source.properties

connect-avro-standalone.properties 是

bootstrap.servers=kafkahost:9092

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
plugin.path=share/java

jdbc-source.properties 是

name=jdbc_source_oracle
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=host)(PORT=port))(CONNECT_DATA=(SERVER=dedicated)(SID=server)))
connection.user=xxx
connection.password=xxx
table.whitelist=table1, table2
mode=bulk
topic.prefix=my_topic
query=select * from table1 t1 join table1 t2 on t2.id = t1.id where t2.entereddate >='19-FEB-2019' and t2.entereddate <= '23-FEB-2019'

我使用的查询仅用于测试目的,我要使用的真正查询将实现递增模式并且不包含 where 子句。

现在这可以将数据发布到主题中,但会发生一些奇怪的事情。首先,ID 以不可读的格式保存。只是空旷的广场。其次,数据库中填充的一些字段在主题中保存为 null。第三,每当我尝试更改 JDBC 源文件中查询的日期时,什么都没有发生。它仍然包含我第一次运行时发布的相同消息,因为 kafka 主题中的任何内容都没有更新,没有多少次我更改查询。

谁能帮帮我?

编辑

我想做的是通过 pyspark 代码使用数据。这是我如何做的代码

from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("data streaming app")\
    .getOrCreate()


data_raw = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafkahost:9092")\
    .option("subscribe", "my_topic")\
    .load()

query = data_raw.writeStream\
    .outputMode("append")\
    .format("console")\
    .option("truncate", "false")\
    .trigger(processingTime="5 seconds")\
    .start()\
    .awaitTermination()

我还使用 kafka-avro-console-consumer 使用此命令使用数据

./bin/kafka-avro-console-consumer \
--bootstrap-server kafkahost:9092 \
--property print.key=true \
--from-beginning \
--topic my_topic

这两个都给了我奇怪的结果

这是 pyspark 代码给我的

这就是使用 avro 控制台给我的结果

屏蔽一些可能包含公司敏感信息的字段和文本。

【问题讨论】:

  • 不清楚您是如何使用数据的。您需要使用kafka-avro-console-consumer。另外,我相信时间戳递增模式至少需要第二次精度,而不是字符串日期列
  • 顺便说一句,如果您使用的是query,则不需要table.whitelist
  • @cricket_007,我更新了我的帖子,提供了我如何从主题中消费的信息,在这种情况下我也没有使用递增模式。我正在使用批量模式,以便我可以测试功能,这就是我使用字符串数据列的原因。这有关系吗?
  • @RobinMoffatt 我之前试过不使用 table.whitelist 但我得到了这个异常java.sql.SQLException: Invalid column type: getTimestamp not implemented for class oracle.jdbc.driver.T4CClobAccessor 我也查看了那个链接,但找不到任何可以帮助我的东西

标签: jdbc apache-kafka apache-kafka-connect confluent-platform confluent-schema-registry


【解决方案1】:

如果您使用 Spark 的 Avro,则需要使用 correct deserializer

您会在控制台中看到 Avro 数据中的字节,然后是小数/数字的处理,as detailed here

您可以阅读更多关于 Kafka Connect 和 Avro 的序列化替代方案(包括 JSON)here

【讨论】:

  • 这适用于 Scala,不确定它是否适用于 python。有没有办法让我在使用 Kafka 连接器时不使用 Avro?还是一般的架构?
  • 是的,您可以使用 JsonConverter(甚至 StringConverter,尽管您为什么要这样做超出了我的理解)。我添加了一个指向我的答案的链接,可以帮助您解释这一点
  • 请注意,您可以使用 Python 使用 Avro 序列化消息:docs.confluent.io/5.0.0/clients/confluent-kafka-python/… / github.com/confluentinc/confluent-kafka-python
  • @anonuser1234 您可能想尝试将 Spark 输出到控制台 github.com/AbsaOSS/ABRiS
  • @cricket_007,谢谢,但这看起来只适用于 scala。我正在使用 PySpark
猜你喜欢
  • 2022-07-25
  • 2019-12-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-05-27
  • 1970-01-01
  • 2021-04-06
  • 1970-01-01
相关资源
最近更新 更多