【问题标题】:Dispatch and initiate Spark Jobs on Kafka message在 Kafka 消息上调度和启动 Spark 作业
【发布时间】:2019-12-07 03:14:15
【问题描述】:

我有一个通过 Kafka 发送数据的外部数据源。

事实上,这不是真实数据,而是指向数据的链接。

"type": "job_type_1"
"urls": [
  "://some_file"
  "://some_file"
]

只有一个主题,但它包含 type 字段,基于该字段我需要执行一项作业。

数据不是连续的,更像是作业——它包含一组数据,应该在一个批次中处理。下一个主题是独立的。同一类型的所有主题应同步处理。

选项:

  1. 使用 Spark Streaming。

    看起来这不是适合我的方案的解决方案。并且没有将value 视为不是数据而是作为路径列表的内置功能

  2. 创建一个中间服务,它将分派请求并启动一个具体的工作。在这种情况下,将 20Kb+ 数据传递给作业的最佳方法是什么,因为 spark-submit 可能不需要太多争论

  3. 创建一个长时间运行的 Spark 应用程序,其中将包含纯 Kafka 消费者,并在每条消息上创建 Spark Session 并执行作业。

不确定这是否能正常工作,如何停止等等。

  1. ???

更新

到目前为止,我的解决方案是创建一个长时间运行的 Spark 作业,它将使用 Kafka API(不是 Spark 的 API)连接到 Kafka、订阅、检索 URL 列表、调度作业类型,然后执行 spark 作业使用 urls 所以 spark 应用程序将使用标准的 spark.read().load(urls) api

【问题讨论】:

  • github.com/spark-jobserver/spark-jobserver 可能很有用。编写一个 kafka 消费者,它将从 kafka 读取作业信息记录并使用 SparkJobServer 启动相应的作业。
  • 谢谢。最终,我不是在寻找当前基础设施的替代品,因为它几乎是不可能的。
  • 出于好奇,您如何管理作业处理确认。处理工作两次是否值得关注?
  • 消息在 spark 作业完成后被认为是只读的。应用程序以 1 个 1 个方式提取消息

标签: apache-spark spark-streaming spark-streaming-kafka


【解决方案1】:

您可以在一个 spark 会话中运行多个 spark 作业。在传入流上启动火花流作业。将结果收集到主节点并并行触发查询。比如……

class KafkaStreamingExample {

  val conf = new SparkConf().setAppName("Spark Pi")
  def main(args:Array[String]):Unit =  {
    val spark = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
    val ssc = new StreamingContext(spark.sparkContext, Seconds(1))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("topicA", "topicB")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    stream.foreachRDD((rdd,time) =>{

      val queriesToRun = rdd.map(_.value()).collect()

      queriesToRun.par.foreach(query => {
        spark.sql(query)
      })
    })
  }
}

【讨论】:

  • 最后,我使用了 DataFrame API。但据我所知,这个示例将读取并收集主题中的所有数据,而我在主题中只有 URL
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-10-24
  • 2018-04-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-05-25
相关资源
最近更新 更多