【问题标题】:Can data be loaded in Apache Spark RDD/Dataframe on the fly?数据可以动态加载到 Apache Spark RDD/Dataframe 中吗?
【发布时间】:2015-09-01 21:16:35
【问题描述】:

数据可以动态加载还是已经预加载到 RDD/DataFrame 中?

假设我有一个 SQL 数据库,我使用 JDBC 源将 1,000,000 条记录加载到 RDD 中。例如,如果数据库中有一条新记录,我是否可以编写一份作业,将那条新记录添加到 RDD/Dataframe 中以使其成为 1,000,001?还是需要重新构建整个 RDD/DataFrame?

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    我想这取决于你所说的添加(...)记录重建是什么意思。可以使用SparkContext.unionRDD.union 合并RDD,DataFrame.unionAll 合并DataFrame。

    只要合并的 RDD 使用相同的序列化器,就不需要重新序列化,但如果两者使用相同的分区器,则需要重新分区。

    以JDBC源码为例:

    import org.apache.spark.sql.functions.{max, lit}
    
    val pMap = Map("url" -> "jdbc:..", "dbtable" -> "test")
    
    // Load first batch
    val df1 = sqlContext.load("jdbc", pMap).cache
    
    // Get max id and trigger cache
    val maxId = df1.select(max($"id")).first().getInt(0)
    
    // Some inserts here...
    
    // Get new records
    val dfDiff = sqlContext.load("jdbc", pMap).where($"id" > lit(maxId))
    
    // Combine - only dfDiff has to be fetched
    // Should be cached as before
    df1.unionAll(dfDiff)
    

    如果您需要可更新的数据结构 IndexedRDD 在 Spark 上实现键值存储。

    【讨论】:

    • 如果我有一个已经加载了 N 条记录的 RDD。事后我可以向该 RDD 添加一条记录吗?或者更好地说,我有一个计划的工作来检查 SQL DB 中的最新记录,并不断用新记录更新 RDD。
    • 如上图,你可以联合差分数据帧,并使用缓存强制计算来获取数据,但除非你使用支持可以通过驱动程序推送的时间点查询的系统,否则你会必须自己负责提供正确的谓词。
    猜你喜欢
    • 1970-01-01
    • 2017-08-16
    • 2017-02-03
    • 2017-04-11
    • 1970-01-01
    • 1970-01-01
    • 2020-07-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多