【问题标题】:Delete from cassandra Table in Spark从 Spark 中的 cassandra 表中删除
【发布时间】:2018-02-23 08:45:27
【问题描述】:

我将 Spark 与 cassandra 一起使用。我正在从我的表中读取一些行,以便使用 PrimaryKey 删除主题。这是我的代码:

val lines = sc.cassandraTable[(String, String, String, String)](CASSANDRA_SCHEMA, table).
  select("a","b","c","d").
  where("d=?", d).cache()

lines.foreach(r => {
    val session: Session = connector.openSession
    val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+r._1 +"' and ctid='"+r._2+"'and cvid='"+r._3+"';"
    session.execute(delete)
    session.close()
})

但是这种方法为每一行创建一个会话,并且需要很多时间。那么是否可以使用 sc.CassandraTable 或其他比我的更好的解决方案删除我的行。

谢谢

【问题讨论】:

    标签: scala apache-spark cassandra-2.0


    【解决方案1】:

    我认为 Cassandra 连接器目前不支持 delete。为了分摊连接设置的成本,推荐的方法是将操作应用于每个分区。

    所以您的代码将如下所示:

    lines.foreachPartition(partition => {
        val session: Session = connector.openSession //once per partition
        partition.foreach{elem => 
            val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where     channel='"+elem._1 +"' and ctid='"+elem._2+"'and cvid='"+elem._3+"';"
            session.execute(delete)
        }
        session.close()
    })
    

    您还可以考虑使用DELETE FROM ... WHERE pk IN (list) 并使用类似的方法为每个分区构建list。这将更加高效,但可能会因非常大的分区而中断,因为列表将变得很长。在应用此功能之前重新分区您的目标 RDD 会有所帮助。

    【讨论】:

      【解决方案2】:

      您很久以前就问过这个问题,所以您可能已经找到了答案。 :P 只是为了分享,这是我在 Java 中所做的。这段代码非常适合我的本地 Cassandra 实例。但它不适用于我们的 BETA 或 PRODUCTION 实例,因为我怀疑那里有多个 Cassandra 数据库实例,并且删除仅对 1 个实例有效,并且数据立即被复制回来。 :(

      如果您能够让它在您的 Cassandra 生产环境中运行,并运行多个实例,请分享!

      public static void deleteFromCassandraTable(Dataset myData, SparkConf sparkConf){
          CassandraConnector connector = CassandraConnector.apply(sparkConf);
          myData.foreachPartition(partition -> {
              Session session = connector.openSession();
      
              while(partition.hasNext()) {
                  Row row = (Row) partition.next();
                  boolean isTested = (boolean) row.get(0);
                  String product = (String) row.get(1);
                  long reportDateInMillSeconds = ((Timestamp) row.get(2)).getTime();
                  String id = (String) row.get(3);
      
                  String deleteMyData = "DELETE FROM test.my_table"
                          + " WHERE is_tested=" + isTested
                          + " AND product='" + product + "'"
                          + " AND report_date=" + reportDateInMillSeconds
                          + " AND id=" + id + ";";
      
                  System.out.println("%%% " + deleteMyData);
                  ResultSet deleteResult = session.execute(deleteMyData);
                  boolean result = deleteResult.wasApplied();
                  System.out.println("%%% deleteResult =" + result);
              }
              session.close();
          });
      }
      

      【讨论】:

      • 你找到解决这个问题的方法了吗?
      猜你喜欢
      • 2019-02-15
      • 2021-08-17
      • 2020-12-06
      • 1970-01-01
      • 1970-01-01
      • 2017-05-18
      • 2011-07-30
      • 1970-01-01
      • 2015-11-29
      相关资源
      最近更新 更多