【发布时间】:2017-04-04 18:58:50
【问题描述】:
我正在尝试使用如下代码将 Spark SQL Row 值插入数据库:
final Broadcast<String> jdbcUrl = sc.broadcast(config.jdbcUrl());
df.foreachPartition((final Iterator<Row> it) -> {
final Sql2o sql2o = new Sql2o(jdbcUrl.value(), null, null, new NoQuirks());
try (final Connection conn = sql2o.beginTransaction()) {
final String sql = "INSERT INTO Table (Id, Value) VALUES (:id, :value)";
final Query query = conn.createQuery(sql, false);
int batchSize = 0;
while (it.hasNext()) {
final Row row = it.next();
query.addParameter("id", row.getLong(0))
.addParameter("value", row.get(1));
.addToBatch();
if (++batchSize == 1000) {
query.executeBatch();
conn.commit();
batchSize = 0;
}
}
query.executeBatch();
conn.commit();
}
});
我收到主键违规错误:
java.sql.BatchUpdateException:违反主键约束 'PK_Table'。无法在对象“表”中插入重复键。这 重复键值为 42。
我添加了一些调试日志记录代码,并验证了两个不同的执行程序试图插入相同的行(具有相同的 id 和值)。
在 Spark SQL 开始插入 Row 值之前,该表为空。此外,我尝试在 DataFrame 上调用 foreachPartition() 之前调用 distinct() 和 persist(),但我仍然遇到问题。
同一个DataFrame的不同分区不应该有不同的数据吗?分区器不总是保证吗?
编辑:
我在 DataFrame 上运行 df.groupBy(df.col("id")).count().filter(col("count").gt(1)).show(); 并且没有分组到多个行的 id:
+--+-----+
|id|count|
+--+-----+
+--+-----+
据我所知,看起来同一个分区正在不同的执行程序中同时迭代。怎么会?
【问题讨论】:
-
如果这不是 SQL Server 问题,为什么还要标记 Microsoft SQL-Server?
-
@pmbAustin,很公平。我删除了标签。
-
正如您的错误代码所说:42。这就是答案;)好的,现在真正的建议:
distinct()生成不同的行。这意味着,您可以有 2 个相同的 ID,但第二个值会不同。尝试做groupBy('id).count -
@T.Gawęda,并将其与
df.count进行比较? -
否,将其过滤为大于 1 的值
标签: java apache-spark apache-spark-sql