【问题标题】:Kafka Streams JDBC Source Long IncompatibilityKafka Streams JDBC Source Long不兼容
【发布时间】:2018-07-08 08:51:07
【问题描述】:

问题:在使用带有 Avro 序列化器和反序列化器的 Kafka Connect JDBC 源设置 Kafka 管道后,一旦我尝试使用 Kafka Streams Java 应用程序将该数据读入 KStream,就会收到以下错误。

org.apache.kafka.common.errors.SerializationException:数据大小 LongDeserializer 收到的不是 8

我已尝试尽可能地遵循现有示例,但有些事情没有意义。我将在下面包含所有代码/附加信息,但这里有几个问题......

  1. 我目前理解的最大差距之一是 Avro 记录的“KEY”使用的是什么?我(在运行时)出错的行与我告诉 KStream 密钥是 LONG 的事实有关,但是当检索 Avro 记录时,长度小于 8(预期长度为 LONG类型)。
    当我设置我的 JDBC 源时,那里没有任何东西可以识别密钥是什么——而且我在文档中没有看到任何东西可以让我相信我可以指定密钥,尽管我已经尝试过:

    curl -X POST \
      -H "Content-Type: application/json" \
      --data 'see next code block for formatted data'  \
    http://localhost:8083/connectors
    
    // This is the data chunk used above but in a string - broke it apart for readability here
    {
        "name": "source-jdbc-ldw_applications",
        "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
            "tasks.max": 1,
            "connection.url": "jdbc:sqlserver://dbserver;databaseName=dbname;user=kafkareader;password=kafkareader;",
            "mode": "incrementing",
            "incrementing.column.name": "ApplicationID",
            "topic.prefix": "source-jdbc-",
            "poll.interval.ms": 30000,
            "table.whitelist": "LDW_Applications",
            "transforms": "setSchema",
            "transforms.setSchema.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
            "transforms.setSchema.schema.name": "com.mycompany.avro.Application",
            "transforms.setSchema.schema.version": "1"
        }
    }
    

通过上述操作,我得到以下运行报告的架构:

curl http://localhost:8081/subjects/source-jdbc-LDW_Applications-value/versions/1 |jq

这是它的输出:

{
    "subject": "source-jdbc-LDW_Applications-value",
    "version": 1,
    "id": 9,
    "schema": "{\"type\":\"record\",\"name\":\"Application\",\"namespace\":\"com.baydynamics.avro\",\"fields\":[{\"name\":\"ApplicationID\",\"type\":\"long\"},{\"name\":\"Name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"Description\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"Group\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"OwnerUserID\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"RiskScore\",\"type\":[\"null\",{\"type\":\"int\",\"connect.type\":\"int16\"}],\"default\":null},{\"name\":\"RiskRating\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ServiceLevelTierID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"LossPotentialID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ConfidentialityRequirementID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"IntegrityRequirementID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"AvailabilityRequirementID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ApplicationCategoryID\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.version\":1,\"connect.name\":\"com.baydynamics.avro.Application\"}"
}

让这个架构更漂亮一点:

{
"type":"record",
"name":"Application",
"namespace":"com.baydynamics.avro",
"fields":[
    {
        "name":"ApplicationID",
        "type":"long"
    },
    {
        "name":"Name",
        "type":[
            "null",
            "string"
        ],
        "default":null
    },
    {
        "name":"Description",
        "type":[
            "null",
            "string"
        ],
        "default":null
    },
    {
        "name":"Group",
        "type":[
            "null",
            "string"
        ],
        "default":null
    },
    {
        "name":"OwnerUserID",
        "type":[
            "null",
            "long"
        ],
        "default":null
    },
    {
        "name":"RiskScore",
        "type":[
            "null",
            {
            "type":"int",
            "connect.type":"int16"
            }
        ],
        "default":null
    },
    {
        "name":"RiskRating",
        "type":[
            "null",
            "string"
        ],
        "default":null
    },
    {
        "name":"ServiceLevelTierID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"LossPotentialID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"ConfidentialityRequirementID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"IntegrityRequirementID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"AvailabilityRequirementID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"ApplicationCategoryID",
        "type":[
            "null",
            "long"
        ],
        "default":null
    }
],
"connect.version":1,
"connect.name":"com.baydynamics.avro.Application"
}

同样,我没有看到任何表明任何特定字段将成为记录的关键的任何内容。

然后我进入 Kafka Streams,并尝试将这些数据带入 KStream……然后它就爆炸了……

final KStream<Long, Application> applicationStream = builder.stream(Serdes.Long(), applicationSerde, VULNERABILITY_TOPIC);

所以,事情就是这样,因为我知道存储在后台的数据是 SQL Server 中的 BIGINT,并且映射到 Java 中的 LONG,所以我将 KStream 的键类型设为 Long,然后使用 Serdes.Long( ) 用于 KStream 构建器的参数的反序列化器。

调试时,我看到原始记录的长度为 7,这就是它抛出错误的原因。显然 Avro 以一种更好地压缩的方式序列化事物?我不知道。无论如何,问题是我什至不知道它认为它实际上使用的是什么键?!那么谁知道 - 也许我对 Long 的假设是不正确的,因为它实际上并没有使用 ApplicationID 作为密钥?为什么我什至会假设它是?!

对此的任何帮助将不胜感激。我知道上面有很多信息,但简而言之..

  1. 使用 JDBC Kafka 连接将数据推送到主题中
  2. 数据正在进入主题 - 我可以通过控制台看到它
  3. 尝试将数据推送到流中,以便我可以对数据做一些很棒的事情,但由于 Serdes 与 Avro Record 不兼容,因此尝试填充流时失败了

更新 1: 根据下面 Randall 的建议,我尝试了 SMT(单消息转换),现在我每条记录都有一个密钥,这是朝着正确方向迈出的极好一步,但由于某种原因,似乎没有强制转换为 Long (INT64) 有任何实际效果。我已经使用 SMT 截取了连接器配置的一些屏幕截图,生成的记录(现在有一个密钥!)以及我在 Kafka 流中看到的相同错误:

【问题讨论】:

    标签: apache-kafka apache-kafka-streams apache-kafka-connect confluent-schema-registry


    【解决方案1】:

    Confluent JDBC source connector 不会生成带有键的记录。添加此支持的 feature request 已被记录。

    与此同时,您可以使用单个消息转换从值中提取一些字段,从而实质上创建键。内置的ValueToKey transform 正是这样做的。 This blog post 有该 SMT 的示例。

    【讨论】:

    • 非常感谢您的回复。我将看看我是否可以实现这一点并走得更远。您的陈述“Confluent JDBC 源连接器不生成带有键的记录”让我感觉好多了,因为我到处看了看,却无法弄清楚我认为应该发生的事情。我希望这在某处的在线文档中被提及。
    • 再次感谢您。为了进一步说明这一点,我遵循了链接文章中的建议(非常有帮助),但我仍然遇到兼容性错误。似乎即使将密钥强制为 INT64(长),我仍然在我的 kafka 流中得到“长度小于 8”。我附上了一张图片,因为它比我用文字更能说明问题:(imgur.com/a/VC6OO)
    • 您找到解决问题的方法了吗?
    猜你喜欢
    • 2020-09-05
    • 1970-01-01
    • 2018-11-19
    • 1970-01-01
    • 1970-01-01
    • 2021-01-22
    • 1970-01-01
    • 2012-06-07
    • 2021-01-25
    相关资源
    最近更新 更多