【问题标题】:Writing dataframe to SQL Server with df.write.jdbc() produces error: Column has a data type that cannot participate in a columnstore index使用 df.write.jdbc() 将数据帧写入 SQL Server 会产生错误:列具有无法参与列存储索引的数据类型
【发布时间】:2018-11-23 07:22:39
【问题描述】:

我在 20 个节点的集群中使用 pyspark 和 spark 2.2 和 python 2.7。我正在使用 df = spark.read.jdbc(...) 将数据从云 blob 存储加载到数据框中,然后我尝试使用 df.write.jdbc(...) 将其写入我的 SQL Server 数据库。但是,在写入过程中,我收到以下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o67.jdbc. : com.microsoft.sqlserver.jdbc.SQLServerException: The statement failed. Column 'my_col' has a data type that cannot participate in a columnstore index.

df 的架构如下:

root |-- my_col: string (nullable = true) |-- my_other_col: string (nullable = true) ...

This post 让我相信df.write.jdbc(...) 可能会在写入时尝试为 df 中的所有列创建列存储索引。不幸的是,我不知道如何阻止 spark 这样做,所以我可以缓解这个问题。

我的代码的摘要版本如下所示:

spark = (pyspark.sql.SparkSession.builder
             .appName('my-app').getOrCreate())

df = (self.spark.read.format('com.databricks.spark.avro')
                      .options(inferSchema=True, header=True)
                      .load(blob_storage_path)).repartition(self.num_partitions)

df.write.jdbc(url=self.jdbc_url, table=table, mode='overwrite',
                          properties=self.jdbc_properties)

这是完整的堆栈跟踪:

py4j.protocol.Py4JJavaError: An error occurred while calling o68.jdbc. 
:  com.microsoft.sqlserver.jdbc.SQLServerException: The statement failed. Column 'my_col' has a data type that cannot participate in a columnstore index.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:217)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1655)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:885)
at com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:778)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2445)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:191)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:166)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeUpdate(SQLServerStatement.java:703)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:805)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:461)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

提前感谢您提供的任何帮助!

【问题讨论】:

    标签: sql-server apache-spark pyspark


    【解决方案1】:

    您有什么 SQL Server 版本?在 2017 年之前的版本中,可能在列存储索引中的数据类型有一些限制。阅读本文的限制部分https://docs.microsoft.com/en-us/sql/t-sql/statements/create-columnstore-index-transact-sql 我猜你可以做的事情是删除列存储索引或迁移到 Sql Server 2017。

    【讨论】:

    • 我使用的是 SQL Server 2008 SP3,确切的版本号是 10.0.9397.48。不幸的是,迁移到 SQL Server 2017 不是一种选择。虽然我不想删除我尝试上传到数据库的任何数据列,但我可以不创建任何列存储索引。我认为这里的问题是我无法找到如何更改 df.write.jdbc(...) 调用来创建列存储索引。
    猜你喜欢
    • 2018-08-28
    • 1970-01-01
    • 1970-01-01
    • 2016-08-13
    • 2018-07-20
    • 2016-02-22
    • 2017-10-03
    • 2021-03-02
    相关资源
    最近更新 更多