【问题标题】:Spark Streaming with a dynamic lookup table带有动态查找表的 Spark Streaming
【发布时间】:2015-03-31 15:14:43
【问题描述】:

我目前正在研究使用 Spark Streaming 来获取类似日志文件的条目,并出于统计原因对它们进行一些计算。

HDFS 上保存了一些数据集,现在可以从 HBase 和 Hive 访问,需要这些数据集来查找一些数据并对其进行转换,例如 IP 与机器名称和机器所有者之间的映射。

预计 Spark 应用程序将在我们的集群上日复一日地运行数周而无需重新启动。但是,这些参考表每隔几个小时就会更新一次。

如果使用的数据稍微旧一点也没关系,但数据超过两周就不行了。因此,我想知道如何在 map 和 reduce 阶段查找数据以进行转换和扩充。我有几个想法。

  1. 广播变量可以读取数据集并有效地传递它。但是,一旦设置了广播变量,就无法更改它,并且在驱动程序类中再次获取数据,取消持久化和广播新的数据将不起作用,因为工作人员的指针都指向旧数据集。我不知道有没有办法解决这个问题。

  2. 可以进行 HBase get() 查询。如果根据查找的键将数据定向到reducer,则每个reducer 可以保存整个数据集子集的缓存,并且可以保存自己的本地缓存。 HBase 在获取单个记录时应该有最小的延迟。

  3. 还有别的吗?

【问题讨论】:

    标签: hadoop hbase apache-spark spark-streaming


    【解决方案1】:

    这里有两个选择。

    首先是在 DStream 之上使用foreachRDD 转换。 foreachRDD 在驱动端执行,这意味着你可以在那里创建任何新的 RDD。您可以存储时间计数器并每 10-15 分钟从 HDFS 重新读取文件

    其次是通过DStream读取transform转换中的一些文件并将其结果保存在内存中。使用这种方法,您必须读取每个执行程序的整个查找表,效率不高

    我建议您使用第一种方法。更准确地说,您可以将上次更新数据时的标志存储在某处,并将其存储在您的 Spark 应用程序中。在每次迭代中,您检查此标志的值(例如,存储在 HBase 或 Zookeeper 中)并将其与本地存储的值进行比较 - 如果不同,则重新读取查找表,否则 - 执行操作旧的

    【讨论】:

    • 我有一个相关的问题。我的查找表大约有 200 万行并且是静态的。键是一个大约 100 个字符的字符串,值是一个大约 10 个字符的字符串。现在,我将这些数据存储在索引的 mongo db 集合中,并在转换步骤中进行查找。我批量拨打电话,所以每次转换只招致一次点击,但它仍然是网络调用。将这么大的查找表作为 Spark 广播变量是否有意义?
    • 200 万条记录,每条 110 字节只有 220MB 的数据——对于广播变量来说并不算多。每个硬件节点有 1 个执行程序应确保在整个集群中存储的这 220MB 的副本所需的最少数量。如果它是静态的,您可以在处理开始时将其加载到内存中,然后再使用它。我不建议您使用像 MongoDB 这样的集中式东西,因为随着集群的增长,它将成为您的瓶颈。如果数据是完全静态的,您可以考虑将数据存储在每个节点上的文件中或每个节点上的本地存储中(例如 redis)
    猜你喜欢
    • 2021-02-12
    • 2018-10-19
    • 2017-05-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-09-16
    • 2019-03-01
    • 2020-03-19
    相关资源
    最近更新 更多