这种方式是不可能的。需要使用表格,然后作为流程的最后一部分保存到文件中。
建议您按以下方式调整您的方法 - 这允许并行加载,但我真的怀疑并没有真正的好处。
-
按交付顺序加载所有文件,加载的每条记录都带有时间戳或文件序列号中的某些排序顺序以及记录类型。例如。例如,位置为 2 的文件 X 会使用 seqnum = 2 加载记录。如果在 SPARK 域内执行所有操作,您可以对正在处理的文件使用 DF 方法并附加到 Impala / Hive KUDU 表。
对于同一文件中的记录,如果同一文件中可以存在相同的键,则应用 monotonically_increasing_id() 以获取文件中的排序。见DataFrame-ified zipWithIndex。或者 zipWithIndex 通过 RDD 通过转换并返回到 DF。
然后发出一个 select 语句来获取具有最大值时间戳的键值,每个键的 seq_num。例如。如果在当前运行 3 个记录中,例如,对于 key=1,只需要处理一个 - 大概是具有最高值的那个。
另存为新文件。
相应地处理这个新文件。
-
或者:
绕过第 3 步,按 asc 顺序读取并相应地处理数据。
评论:
通常,我使用 LOAD 将此类数据加载到 HIVE / IMPALA,并通过从文件名中提取时间戳来设置分区键。需要一些 LINUX 脚本/处理。这是风格问题,不应成为真正的大数据瓶颈。
这是一个带有模拟输入的 sn-p,它说明了如何完成某些方面以允许针对 UPSerts 的键进行 MAX 选择。操作,DEL,ALT,无论您需要添加什么。尽管我认为您实际上可以根据我所看到的情况自己做到这一点:
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
def dfSchema(columnNames: List[String]): StructType =
StructType(
Seq(
StructField(name = "key", dataType = StringType, nullable = false),
StructField(name = "file", dataType = StringType, nullable = false),
StructField(name = "ts", dataType = StringType, nullable = false),
StructField(name = "val", dataType = StringType, nullable = false),
StructField(name = "seq_val", dataType = LongType, nullable = false)
)
)
val newSchema = dfSchema(List("key", "file", "ts", "val", "seq_val"))
val df1 = Seq(
("A","F1", "ts1","1"),
("B","F1", "ts1","10"),
("A","F1", "ts2","2"),
("C","F2", "ts3","8"),
("A","F2", "ts3","3"),
("A","F0", "ts0","0")
).toDF("key", "file", "ts","val")
val rddWithId = df1.sort($"key", $"ts".asc).rdd.zipWithIndex
val dfZippedWithId = spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
dfZippedWithId.show
返回:
+---+----+---+---+-------+
|key|file| ts|val|seq_val|
+---+----+---+---+-------+
| A| F0|ts0| 0| 0|
| A| F1|ts1| 1| 1|
| A| F1|ts2| 2| 2|
| A| F2|ts3| 3| 3|
| B| F1|ts1| 10| 4|
| C| F2|ts3| 8| 5|
+---+----+---+---+-------+
准备好进行后续处理。