本文档适用于 Spark Cassandra 连接器的 alpha 版本,因此请期待某些内容无法正常工作。正如文档中所指出的 - 这是一个表格选项,因此您可以通过 options 进行设置。您只需从 util.Date 切换到 Timestamp 或 Long 类型 - Spark SQL 不支持从 Date 类型进行编码。
以下定义一切正常:
import java.time.Instant
import java.sql.Timestamp
case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp,
product_type: String, writeTime: Long)
val offerDataset = Seq(Offer("123", Timestamp.from(Instant.now()), "test", 1243124234L),
Offer("456", Timestamp.from(Instant.now()), "test", 12431242366L)).toDF
或Timestamp:
case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp,
product_type: String, writeTime: Timestamp)
val offerDataset = Seq(Offer("123", Timestamp.from(Instant.now()), "test", new Timestamp(1243124234L)),
Offer("456", Timestamp.from(Instant.now()), "test", new Timestamp(12431242366L))).toDF
如果我们使用下面的表结构:
create table test.wrt_test (
offer_id text,
metadata_last_modified_source_time timestamp,
product_type text,
primary key(offer_id, metadata_last_modified_source_time));
然后您可以将数据保存为如下(仅在 3.0-alpha 中!):
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.cassandra._
offerDataset.write.cassandraFormat("wrt_test", "test")
.option("writetime", "writeTime") // here you specify name of the column with time!
.mode(SaveMode.Append).save()
但如果您使用 RDD API,它在当前版本中也可以正常工作:
import com.datastax.spark.connector.writer._
offerDataset.rdd.saveToCassandra("test", "wrt_test",
writeConf = WriteConf(timestamp = TimestampOption.perRow("writeTime")))
在两种情况下都写完后,您会得到以下信息:
cqlsh> select offer_id, metadata_last_modified_source_time, product_type, writetime(product_type) from test.wrt_test;
offer_id | metadata_last_modified_source_time | product_type | writetime(product_type)
----------+------------------------------------+--------------+-------------------------
123 | 2020-04-16 07:28:38.905000+0000 | test | 1243124234
456 | 2020-04-16 07:28:38.905000+0000 | test | 12431242366
(2 rows)