【问题标题】:Spark: How to configure writetime while saving to cassandraSpark:如何在保存到 cassandra 时配置写入时间
【发布时间】:2023-03-24 05:20:01
【问题描述】:

我有一个类似于 cassandra 表的实体。我正在使用 spark 将数据保存/更新到 Cassandra。这里的实体是Offer案例类

case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp, product_type: String, writeTime: util.Date)

val offerDataset: Dataset[Offer] = ....

我将这些数据保存如下

offerDataset.write.format("org.apache.spark.sql.cassandra")
      .options(Map("keyspace" -> cassandraKeyspace, "table" -> tableName))
      .mode(SaveMode.Append)
      .save()

cassandra 表的架构是

OFFER(offer_id, metadata_last_modified_source_time, product_type) 

问题是在保存/更新 cassandra 表时将 Offer 实体的 writeTime 字段配置为写入时间戳。此处在datastax中提到-https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md配置之类的

writetime=columnName

我无法理解的是语法应该是什么样子。

任何帮助都将不胜感激

【问题讨论】:

    标签: scala apache-spark cassandra timestamp spark-cassandra-connector


    【解决方案1】:

    本文档适用于 Spark Cassandra 连接器的 alpha 版本,因此请期待某些内容无法正常工作。正如文档中所指出的 - 这是一个表格选项,因此您可以通过 options 进行设置。您只需从 util.Date 切换到 TimestampLong 类型 - Spark SQL 不支持从 Date 类型进行编码。

    以下定义一切正常:

    import java.time.Instant
    import java.sql.Timestamp
    
    case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp, 
      product_type: String, writeTime: Long)
    
    val offerDataset = Seq(Offer("123", Timestamp.from(Instant.now()), "test", 1243124234L),
      Offer("456", Timestamp.from(Instant.now()), "test", 12431242366L)).toDF
    

    Timestamp:

    case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp, 
       product_type: String, writeTime: Timestamp)
    
    val offerDataset = Seq(Offer("123", Timestamp.from(Instant.now()), "test", new Timestamp(1243124234L)),
      Offer("456", Timestamp.from(Instant.now()), "test", new Timestamp(12431242366L))).toDF
    

    如果我们使用下面的表结构:

    create table test.wrt_test (
      offer_id text,
      metadata_last_modified_source_time timestamp,
      product_type text,
      primary key(offer_id, metadata_last_modified_source_time));
    

    然后您可以将数据保存为如下(仅在 3.0-alpha 中!):

    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.cassandra._
    offerDataset.write.cassandraFormat("wrt_test", "test")
        .option("writetime", "writeTime") // here you specify name of the column with time!
        .mode(SaveMode.Append).save()
    

    但如果您使用 RDD API,它在当前版本中也可以正常工作:

    import com.datastax.spark.connector.writer._
    offerDataset.rdd.saveToCassandra("test", "wrt_test", 
       writeConf = WriteConf(timestamp = TimestampOption.perRow("writeTime")))
    

    在两种情况下都写完后,您会得到以下信息:

    cqlsh> select offer_id, metadata_last_modified_source_time, product_type, writetime(product_type) from test.wrt_test;
    offer_id | metadata_last_modified_source_time | product_type | writetime(product_type)
    ----------+------------------------------------+--------------+-------------------------
          123 |    2020-04-16 07:28:38.905000+0000 |         test |              1243124234
          456 |    2020-04-16 07:28:38.905000+0000 |         test |             12431242366
    (2 rows)
    

    【讨论】:

    • 谢谢@AlexOtt。这个'option(“writetime”,“writeTime”)'失败,而这个工作'offerDataset.rdd.saveToCassandra(“test”,“wrt_test”,writeConf = WriteConf(timestamp = TimestampOption.perRow(“writeTime”)))'
    • 你用的是什么SCC版本?正如我所写 - 它仅在 SCC 的 alpha 版本中可用
    • 我正在使用 2.0.3,由于我的项目限制,我无法更改
    • 那么只有使用RDD才能到达它
    • 我建议至少升级 2.0.latest - 有很多修复和性能优化
    猜你喜欢
    • 2015-04-14
    • 2015-09-20
    • 2015-03-06
    • 1970-01-01
    • 2018-10-05
    • 2016-01-30
    • 1970-01-01
    • 2018-03-16
    • 1970-01-01
    相关资源
    最近更新 更多