【发布时间】:2018-01-22 14:19:17
【问题描述】:
我最近开始使用 pyspark,遇到了一些我想更好地理解和避免的行为。
考虑以下代码:
query1 = "SELECT * FROM A where X >= 1000000 and X < 1001000 LIMIT 50"
s1 = spark.sql(query1)
X_vals = s1.select('X').rdd.flatMap(lambda x: x).collect()
query2 = "SELECT * FROM B" + " where Y in " + '(' + ','.join([str(x) for x in X_vals]) + ')'
s2 = spark.sql(query2)
s1.write.mode('overwrite').option("header", True).option("sep",'\t').csv('test/A.csv')
s2.write.mode('overwrite').option("header", True).option("sep",'\t').csv('test/B.csv')
从A,我从一个范围中获取50条记录的样本,并将X的值存储在X_vals中。然后我从表B 中获取相同的记录(X_vals 中的Y)。
稍后,我将两个表都写入csv 文件。在生成的csv 文件中,A 中的X 不再与B 中的Y 匹配。
我认为这是可以解释的行为,是由惰性评估引起的; collect() 语句中选择的记录与.csv 语句中的记录不同。然而,我对 Spark 的理解还不足以解释为什么会发生这种情况。
所以;为什么会发生这种情况,有没有办法强制查询两次返回相同的结果(不加入表)?
谢谢,
弗洛里安
【问题讨论】:
-
使用
order by强制query1 到特定顺序怎么样? -
@Bala,我猜
ORDER BY could确实有效,尽管我不确定。我对这种行为的直观解释是,上述情况是随机的,因为它取决于哪个执行器首先完成。对于order by,是先收集20条记录再排序,还是先排序所有记录再收集?
标签: apache-spark pyspark apache-spark-sql pyspark-sql