【问题标题】:Structured Streaming - Foreach Sink结构化流 - Foreach Sink
【发布时间】:2018-10-27 20:49:54
【问题描述】:

我基本上是从 Kafka 源读取信息,并将每条消息转储到我的 foreach 处理器(感谢 Jacek 的页面提供的简单示例)。

如果这确实有效,我将在此处的process 方法中实际执行一些业务逻辑,但是,这不起作用。我相信println 不起作用,因为它在执行程序上运行,并且无法将这些日志返回给驱动程序。但是,这个insert into 临时表至少应该可以工作,并向我展示消息实际上已被消费并处理到接收器。

我在这里错过了什么?

真的在寻找第二双眼睛来检查我的努力:

 val stream = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker")) 
      .option("subscribe", src_topic) 
      .load()

    val rec = stream.selectExpr("CAST(value AS STRING) as txnJson").as[(String)]

    val df = stream.selectExpr("cast (value as string) as json")

    val writer = new ForeachWriter[Row] {
      val scon = new SConConnection
      override def open(partitionId: Long, version: Long) = {
        true
      }
      override def process(value: Row) = {
        println("++++++++++++++++++++++++++++++++++++" + value.get(0))
        scon.executeUpdate("insert into rs_kafka10(miscCol) values("+value.get(0)+")")
      }
      override def close(errorOrNull: Throwable) = {
        scon.closeConnection
      }
    }


    val yy = df.writeStream
      .queryName("ForEachQuery")
      .foreach(writer)
      .outputMode("append")
      .start()

    yy.awaitTermination()

【问题讨论】:

  • 什么是 SConConnection?编写器被序列化并发送给执行器,如果您的 scon 不可序列化,您应该将构造放在 open 方法中。你可以尝试用--master local[*] 运行它并检查你的printlns 在那里吗?
  • 是的@Harald,我确实尝试过。我没有看到任何 printlns 或我在进程调用中放入的任何内容。相比之下,我稍微调整了传入消息并将结果数据帧转储到控制台,该控制台接收器工作得很好..
  • 我的错,在本地模式下,prinltln 确实显示在驱动程序日志中。但是,我仍在等待成功尝试调用任何业务逻辑方法..
  • 但是如果 println 被调用,你也可以把你的业务逻辑放在那里。你到底是什么问题?
  • 我意识到我不能从流程方法调用我自己的方法。如果我在进程方法中转储我的内联,它会被调用。但是,不能序列化的 hbaseconnection 仍然是有问题的。即使我在进程调用之外打开连接,并在关闭调用中关闭.. :(

标签: scala apache-spark apache-kafka spark-structured-streaming


【解决方案1】:

感谢 Harald 和其他人的 cmets,我发现了一些事情,这使我实现了正常的处理行为 -

  1. 本地模式下测试代码,yarn不是调试最大的帮助
  2. 由于某种原因,foreach sink 的 process 方法不允许调用其他方法。当我将业务逻辑直接放在那里时,它就可以工作了。

希望对其他人有所帮助。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-10-22
    • 2020-04-28
    • 1970-01-01
    • 2020-02-27
    • 2019-09-05
    • 2019-12-10
    • 2017-05-04
    • 1970-01-01
    相关资源
    最近更新 更多