【问题标题】:Spark refuse to create empty dataframe when using pyarrowSpark在使用pyarrow时拒绝创建空数据框
【发布时间】:2020-01-20 16:02:09
【问题描述】:

我想从现有的 spark 数据框中创建一个空数据框。我使用 pyarrow 支持(在 spark conf 中启用)。当我尝试从空 RDD 和与现有数据帧相同的架构中创建一个空数据帧时,我得到了 java.lang.NegativeArraySizeException。这是重现错误的完整代码

spark = SparkSession.builder \
                    .config("spark.sql.execution.arrow.enabled", "true") \
                    .getOrCreate()
df = spark.createDataFrame(["10","11","13"], "string").toDF("age")
empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema)
empty_pandas_df = empty_df.toPandas()

这是完整的堆栈跟踪:

/conda_env/lib/python3.6/site-packages/pyarrow/__init__.py:157: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
  warnings.warn("pyarrow.open_stream is deprecated, please use "
/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py:2139: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.fallback.enabled' does not have an effect on failures in the middle of computation.
  An error occurred while calling o349.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
    at org.apache.spark.api.python.PythonServer.getResult(PythonRDD.scala:874)
    at org.apache.spark.api.python.PythonServer.getResult(PythonRDD.scala:870)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NegativeArraySizeException
    at org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3293)
    at org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3287)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply$mcV$sp(PythonRDD.scala:456)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply(PythonRDD.scala:456)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply(PythonRDD.scala:456)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7.apply(PythonRDD.scala:457)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7.apply(PythonRDD.scala:453)
    at org.apache.spark.api.python.SocketFuncServer.handleConnection(PythonRDD.scala:994)
    at org.apache.spark.api.python.SocketFuncServer.handleConnection(PythonRDD.scala:988)
    at org.apache.spark.api.python.PythonServer$$anonfun$11$$anonfun$apply$9.apply(PythonRDD.scala:853)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.api.python.PythonServer$$anonfun$11.apply(PythonRDD.scala:853)
    at org.apache.spark.api.python.PythonServer$$anonfun$11.apply(PythonRDD.scala:852)
    at org.apache.spark.api.python.PythonServer$$anon$1.run(PythonRDD.scala:908)

  warnings.warn(msg)
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-18-61602774c141> in <module>
----> 1 empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema)

/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py in toPandas(self)
   2120                         _check_dataframe_localize_timestamps
   2121                     import pyarrow
-> 2122                     batches = self._collectAsArrow()
   2123                     if len(batches) > 0:
   2124                         table = pyarrow.Table.from_batches(batches)

/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py in _collectAsArrow(self)
   2182                 return list(_load_from_socket((port, auth_secret), ArrowStreamSerializer()))
   2183             finally:
-> 2184                 jsocket_auth_server.getResult()  # Join serving thread and raise any exceptions
   2185
   2186     ##########################################################################################

/conda_env/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258
   1259         for temp_arg in temp_args:

/conda_env/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/conda_env/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

当我用 pyarrow 禁用时,错误消失了

spark.conf.set("spark.sql.execution.arrow.enabled","false")

这是 pyspark 的已知问题还是与 pyarrow 相关?

注意:仅 pyspark>=2.4.4 可重现此错误。

【问题讨论】:

    标签: python apache-spark pyspark pyarrow


    【解决方案1】:

    从结果中收集 RDD 并创建 pandas 数据帧的问题的解决方法如下: 您的代码中的其他问题是使用 ':' 替换为 ','

    from pyspark.sql import SparkSession
    import pyarrow as pa
    import pandas as pd
    
    
    spark = SparkSession.builder.config("spark.sql.execution.arrow.enabled", "true").getOrCreate()
    
    df = spark.createDataFrame(["10", "11", "13"], "string").toDF("age")
    
    empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema, verifySchema=True)
    empty_pandas_df = empty_df.collect()
    empty_pandas_df = pd.DataFrame(empty_pandas_df)
    
    print(empty_pandas_df)
    df.show()
    

    输出

    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    19/09/22 11:08:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    Empty DataFrame
    Columns: []
    Index: []
    [Stage 2:>                                                          (0 + 3) / 3]+---+
    |age|
                                                                                    +---+
    | 10|
    | 11|
    | 13|
    +---+
    

    【讨论】:

    • 是的,你是对的“:”,这是写问题时的错字。
    • 它没有解决问题,因为 empty_pandas_df 模式是空的:empty_pandas_df.dtypes 给出 Series([], dtype: object)
    • 它对我有用,我用过:pd.DataFrame(empty_df.collect(), columns=df.columns)
    • 您将获得一个带有损坏 Schema 的空 Dataframe,因为您将丢失类型(并且 int 类型成为对象),而原生 toPandas 并非如此。
    猜你喜欢
    • 2020-11-03
    • 2021-02-04
    • 1970-01-01
    • 1970-01-01
    • 2022-01-18
    • 2017-04-13
    • 1970-01-01
    • 2021-03-03
    • 2017-03-23
    相关资源
    最近更新 更多