【发布时间】:2015-09-01 21:16:35
【问题描述】:
数据可以动态加载还是已经预加载到 RDD/DataFrame 中?
假设我有一个 SQL 数据库,我使用 JDBC 源将 1,000,000 条记录加载到 RDD 中。例如,如果数据库中有一条新记录,我是否可以编写一份作业,将那条新记录添加到 RDD/Dataframe 中以使其成为 1,000,001?还是需要重新构建整个 RDD/DataFrame?
【问题讨论】:
标签: apache-spark
数据可以动态加载还是已经预加载到 RDD/DataFrame 中?
假设我有一个 SQL 数据库,我使用 JDBC 源将 1,000,000 条记录加载到 RDD 中。例如,如果数据库中有一条新记录,我是否可以编写一份作业,将那条新记录添加到 RDD/Dataframe 中以使其成为 1,000,001?还是需要重新构建整个 RDD/DataFrame?
【问题讨论】:
标签: apache-spark
我想这取决于你所说的添加(...)记录和重建是什么意思。可以使用SparkContext.union 或RDD.union 合并RDD,DataFrame.unionAll 合并DataFrame。
只要合并的 RDD 使用相同的序列化器,就不需要重新序列化,但如果两者使用相同的分区器,则需要重新分区。
以JDBC源码为例:
import org.apache.spark.sql.functions.{max, lit}
val pMap = Map("url" -> "jdbc:..", "dbtable" -> "test")
// Load first batch
val df1 = sqlContext.load("jdbc", pMap).cache
// Get max id and trigger cache
val maxId = df1.select(max($"id")).first().getInt(0)
// Some inserts here...
// Get new records
val dfDiff = sqlContext.load("jdbc", pMap).where($"id" > lit(maxId))
// Combine - only dfDiff has to be fetched
// Should be cached as before
df1.unionAll(dfDiff)
如果您需要可更新的数据结构 IndexedRDD 在 Spark 上实现键值存储。
【讨论】: