【问题标题】:How to use accumulators with spark 2.3.1 api如何在 spark 2.3.1 api 中使用累加器
【发布时间】:2019-04-03 01:23:38
【问题描述】:

我正在使用带有 Cassandra 3.x 的 spark-sql_2.11-2.3.1 版本。 我需要提供一个具有

的验证功能
   column_family_name text,
    oracle_count bigint,
    cassandra_count bigint,
    create_timestamp timestamp,
    last_update_timestamp timestamp,
    update_user text

同样,我需要计算成功插入的记录数,即要填充的 cassandra_count,因为我想使用 spark 累加器。但不幸的是,我无法使用 spark-sql_2.11-2.3.1 版本找到所需的 API 示例。

以下是我保存到 cassandra sn-p 的内容

 o_model_df.write.format("org.apache.spark.sql.cassandra")
    .options(Map( "table" -> columnFamilyName, "keyspace" -> keyspace ))
    .mode(SaveMode.Append)
    .save()

这里如何为成功保存到 Cassandra 中的每一行实现累加器增量 ...

任何帮助将不胜感激。

【问题讨论】:

    标签: apache-spark cassandra apache-spark-sql databricks spark-cassandra-connector


    【解决方案1】:

    Spark 的累加器通常用于您编写的转换中,不要指望 spark cassandra 连接器会为您提供类似的东西。

    但总的来说 - 如果您的工作没有错误地完成,那么这意味着数据已正确写入数据库。

    如果你想检查数据库中真正有多少行,那么你需要统计数据库中的数据——你可以使用 spark cassandra 连接器的cassandraCount 方法。主要原因 - 您的 DataFrame 中可能有多行可以映射到单个 Cassandra 行(例如,如果您错误地定义了主键,那么多行就有它)。

    【讨论】:

    • 先生,我有点卡在这里......不确定它与一个查询/处理器是否正常工作,但不是两个或更多?任何线索我在这里做错了什么stackoverflow.com/questions/53042545/…
    • 我得到了可以调试的源代码,给我一些可以检查的线索。
    • 先生,我调试了它,表名是空字符串,我找到了...非常感谢您的时间。
    • 先生关于 cassandraCount ,它获取所有表计数而不是最近插入的行计数,但我需要在 cassandra 中获取最近插入的计数......我应该如何实现该逻辑?......即在 ETL 中,我们会拒绝记录并插入记录,对吗?类似的方式我想实现....我应该如何处理它的设计?
    • 先生,有个小疑问,在 CQL 中有一种叫做“计数器”类型的东西……它在我上面的情况下有用吗?你认为无论如何我可以在我的用例中使用它?
    猜你喜欢
    • 2014-03-10
    • 1970-01-01
    • 2016-09-30
    • 2016-06-27
    • 2016-12-09
    • 1970-01-01
    • 2019-08-07
    • 2016-05-11
    • 1970-01-01
    相关资源
    最近更新 更多