【问题标题】:Deleting specific column in Cassandra from Spark从 Spark 中删除 Cassandra 中的特定列
【发布时间】:2019-02-15 09:39:23
【问题描述】:

我能够使用 RDD API 删除特定列 -

sc.cassandraTable("books_ks", "books")
  .deleteFromCassandra("books_ks", "books",SomeColumns("book_price"))

我正在努力使用 Dataframe API 做到这一点。

有人可以分享一个例子吗?

【问题讨论】:

    标签: apache-spark cassandra datastax


    【解决方案1】:

    您不能通过 DF API 删除,而且通过 RDD api 是不自然的。 RDDs 和 DFs 是不可变的,这意味着没有修改。您可以过滤它们以减少它们,但这会生成一个新的 RDD / DF。

    话虽如此,您可以做的是过滤掉您希望删除的行,然后构建一个 C* 客户端来执行该删除:

    // Spark 和 C* 连接的导入 导入 org.apache.spark.sql.cassandra._ 导入 com.datastax.spark.connector.cql.CassandraConnectorConf

    spark.setCassandraConf("Test Cluster", CassandraConnectorConf.ConnectionHostParam.option("localhost"))
    val df = spark.read.format("org.apache.spark.sql.cassandra").options(Map("keyspace" -> "books_ks", "table" -> "books")).load()
    val dfToDelete = df.filter($"price" < 3).select($"price");
    dfToDelete.show();
    
    
    // import for C* client
    import com.datastax.driver.core._
    
    // build a C* client (part of the dependency of the scala driver)
    val clusterBuilder = Cluster.builder().addContactPoints("127.0.0.1");
    val cluster  = clusterBuilder.build();
    val session = cluster.connect();
    
    // loop over everything that you filtered in the DF and delete specified row.
    for(price <- dfToDelete.collect())
        session.execute("DELETE FROM books_ks.books WHERE price=" + price.get(0).toString);
    

    很少警告如果您尝试删除大部分行,这将无法正常工作。在这里使用 collect 意味着这项工作将在 Spark 的驱动程序中完成,即 SPOF 和瓶颈。

    执行此操作的更好方法是 a) 定义一个 DF UDF 来执行删除,好处是您可以获得并行化。选项 b) 到 RDD 级别,只删除如上所示。

    故事的寓意,仅仅因为它可以做到,并不意味着它应该做到。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-07-02
      • 2018-03-01
      • 2017-05-18
      • 2015-07-26
      • 1970-01-01
      • 1970-01-01
      • 2022-01-22
      • 2020-06-25
      相关资源
      最近更新 更多