【问题标题】:How to read Azure CosmosDb Collection in Databricks and write to a Spark DataFrame如何在 Databricks 中读取 Azure CosmosDb 集合并写入 Spark DataFrame
【发布时间】:2019-09-20 02:36:49
【问题描述】:

我正在查询 CosmosDb 集合,并且可以打印结果。当我尝试将结果存储到 Spark DataFrame 时,它​​失败了。

以本站为例:

How to read data from Azure's CosmosDB in python

按照上面链接中的确切步骤操作。此外,尝试以下方法

 df = spark.createDataFrame(dataset)

这会引发此错误:

ValueError: 部分类型推断后无法确定

值错误 回溯(最近一次通话最后一次)
在 ()
25 打印(数据集)
26
---> 27 df = spark.createDataFrame(dataset)
28 df.show()
29

/databricks/spark/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
808 rdd,架构 = self._createFromRDD(data.map(准备),架构,采样率)
809 其他:
--> 810 rdd,schema = self._createFromLocal(map(prepare, data), schema)
第811章 = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
812 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(),schema.json())

/databricks/spark/python/pyspark/sql/session.py in _createFromLocal(self, data, schema)
440 写入临时文件。
第441章 --> 442 个数据,模式 = self._wrap_data_schema(数据,模式)
443返回self._sc.parallelize(数据),架构

但是,希望将其保存为 Spark DataFrame

任何帮助将不胜感激。谢谢!!!>

【问题讨论】:

标签: python azure cosmos


【解决方案1】:

为了推断字段类型,PySpark 会查看每个字段中的非无记录。如果一个字段只有 None 记录,PySpark 无法推断类型并会引发该错误。

手动定义架构将解决问题

>>> from pyspark.sql.types import StructType, StructField, StringType
>>> schema = StructType([StructField("foo", StringType(), True)])
>>> df = spark.createDataFrame([[None]], schema=schema)
>>> df.show()
+----+
|foo |
+----+
|null|
+----+

希望对你有帮助。

【讨论】:

    【解决方案2】:

    我看到您使用旧的 Python SDK for DocumentDB 来查询 CosmosDB 文档以创建 PySpark DataFrame 对象,从而遵循我之前的回答。但是你不能直接将client.ReadDocuments方法的结果docs作为参数data传递给函数SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True),因为数据类型不同,如下所示。

    函数createDataFrame需要一个参数data,它必须是RDDlistpandas.DataFrame

    但是,我从https://pypi.org/project/pydocumentdb/#files下载了pydocumentdb-2.3.3.tar.gz的源代码,并查看了document_client.pyquery_iterable.py的代码文件。

    # from document_client.py
    def ReadDocuments(self, collection_link, feed_options=None):
        """Reads all documents in a collection.
    
        :param str collection_link:
            The link to the document collection.
        :param dict feed_options:
    
        :return:
            Query Iterable of Documents.
        :rtype:
            query_iterable.QueryIterable
    
        """
        if feed_options is None:
            feed_options = {}
    
        return self.QueryDocuments(collection_link, None, feed_options)
    
    # query_iterable.py
    class QueryIterable(object):
        """Represents an iterable object of the query results.
        QueryIterable is a wrapper for query execution context.
        """
    

    所以要解决您的问题,您必须首先通过从ReadDocuments 方法迭代结果Query Iterable of Documents 创建一个pandas.DataFrame 对象,然后通过spark.createDataFrame(pandas_df) 创建一个PySpark DataFrame 对象。

    【讨论】:

      猜你喜欢
      • 2017-04-03
      • 2019-03-14
      • 2019-03-08
      • 2023-02-01
      • 2020-02-29
      • 1970-01-01
      • 2018-05-23
      • 1970-01-01
      • 2019-02-07
      相关资源
      最近更新 更多