【发布时间】:2016-06-21 13:33:03
【问题描述】:
我正在尝试衡量必须将 dataframe 从 scala 复制到 python 并返回到大型管道中的性能影响。为此,我创建了这个相当人工的转换器:
from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.ml.util import keyword_only
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
import random
class RandomColAdderTransformer(Transformer, HasInputCol, HasOutputCol):
@keyword_only
def __init__self(self, inputCol=None, outputCol=None, bogusarg=None):
super(RandomColAdderTransformer, self).__init__()
self.bogusarg = None
self._setDefault(bogusarg=set())
kwargs = self.__init__._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None):
kwargs = self.setParams._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset):
cur_col = self.getInputCol()
def randGet(col): # UDF crashes with no arguments
a = col*random.random() # Ensure we are reading and copying to python space
return a # It runs only once?
sparktype = FloatType()
return dataset.withColumn("randFloat", udf(randGet, sparktype)(cur_col))
这个转换器的目标是确保有一些从 python 生成的数字,它访问dataframe 并进行乘法(在 python 中),然后对于管道的下一个阶段,它必须添加dataframe的一列
但是我有些奇怪。在测试我的代码时,会为所有列生成相同的随机数:
df = sqlContext.createDataFrame([(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
myTestTransformer = RandomColAdderTransformer()
myTestTransformer.setInputCol("x3")
transformedDF = myTestTransformer.transform(df)
transformedDF.show()
+---+---+-----+-----------+
| x1| x2| x3| randFloat|
+---+---+-----+-----------+
| 1| a| 23.0| 0.95878977|
| 3| B|-23.0|-0.95878977|
+---+---+-----+-----------+
然后transformedDF.show() 的连续调用实际上改变了值!?
transformedDF.show()
+---+---+-----+-----------+
| x1| x2| x3| randFloat|
+---+---+-----+-----------+
| 1| a| 23.0| 0.95878977|
| 3| B|-23.0|-0.95878977|
+---+---+-----+-----------+
In [3]: transformedDF.show()
+---+---+-----+-----------+
| x1| x2| x3| randFloat|
+---+---+-----+-----------+
| 1| a| 23.0| 2.9191132|
| 3| B|-23.0|-0.95878977|
+---+---+-----+-----------+
In [4]: transformedDF.show()
+---+---+-----+-----------+
| x1| x2| x3| randFloat|
+---+---+-----+-----------+
| 1| a| 23.0| 0.95878977|
| 3| B|-23.0|-0.95878977|
+---+---+-----+-----------+
In [5]: transformedDF.show()
+---+---+-----+----------+
| x1| x2| x3| randFloat|
+---+---+-----+----------+
| 1| a| 23.0| 16.033003|
| 3| B|-23.0|-2.9191132|
+---+---+-----+----------+
这是预期的行为吗? .show() 真的会触发计算开始吗? AFAIK 我应该使用单个节点,确定它们会在单个线程中运行,以便共享随机种子吗?我知道存在一个内置的 pyspark rng,但它不适合我的目的,因为它实际上不会从 python 空间生成数据。
【问题讨论】:
-
您希望在这里得到什么样的答案?解释给定代码发生了什么或完整的工作替代?
-
解释发生了什么。 AFAIK 代码执行我希望它执行的操作(python 空间中的值乘以
dataframe中的值,然后将该列附加到dataframe以进一步处理它。如果我错了,请纠正我,我我试图了解正在发生的事情并确保它会强制 py4j 将数据复制到 jvm 并返回。 -
您可以使用身份 (
lambda x: x),如果您的唯一目标是移动数据,它也可以正常工作。 -
@zero323 我不确定这是否真的会做任何事情,因为我认为评估是按需进行的,但感谢您的解释。
-
如果下游处理需要它,它将独立于版本执行。在 1.x 中,即使根本不使用它也会被计算出来。一般来说,您不仅应该考虑数据移动成本。对 GC 也有重大影响。更不用说 Python UDF 对执行计划特别不利。
标签: python apache-spark random pyspark apache-spark-sql