【问题标题】:How to transfer data from Kafka to Cassandra using Nifi?如何使用 Nifi 将数据从 Kafka 传输到 Cassandra?
【发布时间】:2019-05-22 13:19:53
【问题描述】:

我想使用Cassandra 中的NifiKafka 收集数据。我为此创建了一个这样的流程。

我的数据库连接配置是这样的:

这是我的 ConvertJsonToSQL 处理器的配置:

我在 ConvertJsonToSQL 处理器上遇到以下错误。

ConvertJSONToSQL[id=d25a7e27-0167-1000-2d9a-2c969b33482a] ConvertJSONToSQL[id=d25a7e27-0167-1000-2d9a-2c969b33482a] 由于 null 无法处理会话;处理器在管理上产生了 1 秒:java.lang.NullPointerException

注意:我将dbschema驱动程序jar添加到Nifi库中。

你认为我应该怎么做才能解决这个问题?

【问题讨论】:

  • Datastax 最近发布了一个 Kafka Connect 插件。 github.com/datastax/kafka-examples 来自 Landoop 的那个也可以...重点是 Kafka Connect 内置在 Kafka 框架中,所以你确定你需要 Nifi 吗?
  • @cricket_007 我正在寻找这个错误的原因。我不需要其他工具。
  • 好吧,如果没有 NPE 的整个堆栈跟踪,并且可能会进入一些 Nifi 源代码以查看究竟是什么,这很难说是 null
  • @cricket_007 我猜你是对的,我只是想在这里问你,说可能是遇到了。
  • 除非您共享错误的整个堆栈跟踪,否则我们将无法提供帮助。话虽如此,如果您正在寻找一种将 Kafka 事件摄取到 Cassandra 中的方法,那么您编写的流程可能并不完美,您可以尝试ConsumeKafkaRecord -> PutCassandraRecord

标签: cassandra nullpointerexception apache-kafka apache-nifi


【解决方案1】:

根据可用信息很难解决错误,ConvertJSONToSQL 失败的最可能原因是 JSON 无效。只差一点documentation

传入的 FlowFile 应该是“扁平”JSON 消息,这意味着它由单个 JSON 元素组成,每个字段映射到一个简单类型。

我看不到你在 AttributesToJSON 处理器中做了什么,但我相信 twitter 通常会返回一个嵌套的 JSON,而且你可能还没有将它展平。


解决此问题的一种简单通用方法是从顶部启动处理器,并检查每个处理器之前/之后的队列,直到看到您不期望的内容。

有了这个,您应该能够准确地查明问题,如果需要,您可以使用以这种方式发现的信息来创建一个可重现的示例并提出更详细的问题。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-22
    • 2019-08-10
    • 2023-01-25
    • 1970-01-01
    • 2015-11-03
    相关资源
    最近更新 更多