【问题标题】:How to make a cached from a finished Spark Job still accessible for the other job?如何使已完成的 Spark 作业中的缓存仍然可供其他作业访问?
【发布时间】:2016-09-12 03:19:38
【问题描述】:

我的项目是为用户实现交互查询以发现该数据。就像我们有一个列列表,用户可以选择然后用户添加到列表并按查看数据。 Cassandra 中的当前数据存储,我们使用 Spark SQL 从中查询。

数据流是我们有一个原始日志,经过 Spark 存储处理到 Cassandra 中。数据是具有 20 多列和 4 个指标的时间序列。目前我进行了测试,因为集群键有 20 多个维度,所以写入 Cassandra 很慢。

这里的想法是将所有数据从 Cassandra 加载到 Spark 中并将其缓存在内存中。向客户端提供 API 并在 Spark Cache 上运行查询。 但我不知道如何保持缓存的数据持续存在。我尝试使用他们有功能调用share object 的 spark-job-server。但不确定它是否有效。

我们可以提供具有 40 多个 CPU 内核和 100 GB RAM 的集群。我们估计要查询的数据约为 100 GB。

我已经尝试过的:

  • 尝试存储在Alluxio并从中加载到Spark,但加载时间很慢,因为当它加载4GB数据时,Spark需要做两件事首先从Alluxio读取需要1分钟以上,然后存储到磁盘(Spark随机播放)花费超过 2 或 3 分钟。这意味着超过我们设定的 1 分钟以内的时间。我们在 8 个 CPU 内核中测试了 1 个作业。
  • 尝试存储在 MemSQL 中,但有点昂贵。 1天它花费了2GB RAM。不确定当我们扩展时速度是否保持良好。
  • 尝试使用 Cassandra,但 Cassandra 不支持 GROUP BY。

所以,我真正想知道的是我的方向是对还是错?我可以改变什么来归档目标(查询,如 MySQL,有很多 group by、SUM、ORDER BY)通过 API 返回到客户端。

【问题讨论】:

    标签: caching apache-spark cassandra spark-jobserver


    【解决方案1】:

    如果您在 DataFrame 上显式调用 cachepersist,它将保存在内存(和/或磁盘,取决于您选择的存储级别)中,直到上下文关闭。这也适用于sqlContext.cacheTable

    因此,当您使用 Spark JobServer 时,您可以创建一个长时间运行的上下文(使用 REST 或在服务器启动时)并将其用于同一数据集上的多个查询,因为它将被缓存直到上下文或JobServer 服务关闭。但是,使用这种方法时,您应该确保有足够的内存可用于此上下文,否则 Spark 会将大部分数据保存在磁盘上,这会对性能产生一些影响。

    此外,JobServer 的命名对象功能对于在作业之间共享特定对象很有用,但如果您将数据注册为临时表 (df.registerTempTable("name")) 并缓存它 (sqlContext.cacheTable("name")),则不需要这样做,因为您将能够从多个作业中查询您的表(使用 sqlContext.sqlsqlContext.table),只要这些作业在相同的上下文中执行。

    【讨论】:

    • 谢谢。如果我们同时有多个请求会发生什么?是否需要在队列中等待执行?有什么解决方案可以在不停机的情况下更新缓存表?
    • @giaosudau 对同一个临时表的多个查询由 Spark 集群调度程序管理,您可以将其配置为 FIFO 或 FAIR(循环)。要更新缓存表,您可以先执行sqlContext.uncaceTable("name"),然后执行newDF.registerTempTable("name"),再执行新的sqlContext.cacheTable("name")
    • 在更新缓存表时仍然很困惑。如果我们先清除当前数据并更新一个新数据,这意味着我们之间有停机时间(等待将当前数据加载到缓存中)。我只想每小时更新 1 个少量数据,> 90 天前的那一天将从缓存中删除。有没有办法做到这一点?谢谢。
    • @giaosudau,你不需要休息。您可以构建一个查询 cassandra 的作业,例如 val df = cc.sql("select * from keyspace.cTable where day <= 90")。然后你每小时运行一次这个作业,但后面跟着df.persist().count(),这将强制数据持久化在内存中,但不会影响你之前的表。然后您可以重新创建您的临时表:cc.uncacheTable("table"),然后是df.registerTempTable("table")cc.cacheTable("table")。这将是瞬时的,因为df 已经在内存中。
    • @DanieldePaula,谢谢我正在搜索确切的内容,但就我而言,我没有使用 Jobserver。在这种情况下,如何缓存数据并使其可用于所有其他 Spark 作业,您能帮我吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-11-17
    • 1970-01-01
    • 2022-11-10
    • 2020-07-14
    • 1970-01-01
    • 2012-03-28
    • 2022-01-18
    相关资源
    最近更新 更多