【问题标题】:How to send messages in order from Apache Spark to Kafka topic如何按顺序从 Apache Spark 向 Kafka 主题发送消息
【发布时间】:2018-02-08 07:41:43
【问题描述】:

我有一个用例,在 MySQL 中连续插入有关传感器的事件信息。我们需要每 1 或 2 分钟在 Kafka 主题中进行一些处理来发送此信息。

我正在使用 Spark 将此信息发送到 Kafka 主题并在 Phoenix 表中维护 CDC。我正在使用 Cron 作业每 1 分钟运行一次 spark 作业。

我目前面临的问题是消息排序,我需要以升序时间戳发送这些消息以结束系统 Kafka 主题(有 1 个分区)。但是由于 1 个以上的 spark DataFrame 分区同时向 Kafka 主题发送信息,导致大部分消息排序丢失。

目前,作为一种解决方法,我将我的 DataFrame 重新分区为 1,以保持消息的顺序,但这不是一个长期的解决方案,因为我正在失去 spark 分布式计算。

如果你们对此有更好的解决方案设计,请提出建议。

【问题讨论】:

  • 您能展示一下如何将数据插入 MySQL 吗?
  • @user8371915 数据由应用程序插入,这些应用程序的工作是捕获传感器事件并插入 mysql 数据库,这些应用程序不在我的控制范围内。
  • 那么 MySQL 是源,Kafka 是接收器?目前尚不清楚为什么顺序是相关的,但一般来说,您不能保证顺序和端到端并行性。
  • 是的 MySQL 是源,spark 用于从 MySQL 读取数据并应用业务逻辑,一旦处理完消息,它就会发送到 Kafka。

标签: hadoop apache-spark apache-kafka spark-dataframe kafka-producer-api


【解决方案1】:

通过使用键修复我的数据并在分区内应用排序,我能够在某种程度上按照升序时间戳实现消息排序。

val pairJdbcDF = jdbcTable.map(row => ((row.getInt(0), row.getString(4)), s"${row.getInt(0)},${row.getString(1)},${row.getLong(2)},${row. /*getDecimal*/ getString(3)},${row.getString(4)}"))
        .toDF("Asset", "Message")
val repartitionedDF = pairJdbcDF.repartition(getPartitionCount, $"Asset")
        .select($"Message")
        .select(expr("(split(Message, ','))[0]").cast("Int").as("Col1"),
          expr("(split(Message, ','))[1]").cast("String").as("TS"),
          expr("(split(Message, ','))[2]").cast("Long").as("Col3"),
          expr("(split(Message, ','))[3]").cast("String").as("Col4"),
          expr("(split(Message, ','))[4]").cast("String").as("Value"))
        .sortWithinPartitions($"TS", $"Value")

【讨论】:

    猜你喜欢
    • 2019-10-18
    • 1970-01-01
    • 1970-01-01
    • 2023-03-17
    • 1970-01-01
    • 1970-01-01
    • 2019-10-13
    • 2017-12-15
    • 2019-02-14
    相关资源
    最近更新 更多