【问题标题】:Is it possible to read all rows of Cassandra partition in one Spark worker?是否可以在一个 Spark 工作人员中读取所有 Cassandra 分区行?
【发布时间】:2016-04-01 04:21:49
【问题描述】:

我正在尝试通过最小化 reduceByKey 的步骤数来优化 Zipkin Dependencies Spark job 以在更少的阶段运行。数据从下表中读取:

CREATE TABLE IF NOT EXISTS zipkin.traces (
    trace_id  bigint,
    ts        timestamp,
    span_name text,
    span      blob,
    PRIMARY KEY (trace_id, ts, span_name)
)

在那里,单个分区trace_id 包含完整的跟踪,并且包含从几行到几百行不等的任何地方。但是,整个分区由 Spark 作业转换为非常简单的RDD[((String, String), Long)],将条目数量从数十亿减少到数百。

不幸的是,当前代码是通过独立读取所有行来完成的

sc.cassandraTable(keyspace, "traces")

并使用两个reduceByKey 步骤得出RDD[((String, String), Long)]。如果有一种方法可以一次性读取整个分区,在一个 Spark 工作进程中,并在内存中进行处理,这将是一个巨大的速度提升,无需存储/流式传输来自当前的大量数据集第一阶段。

-- 编辑--

为了澄清,作业必须从表中读取所有数据,数十亿个分区。

【问题讨论】:

    标签: apache-spark cassandra spark-cassandra-connector zipkin


    【解决方案1】:

    将所有分区数据保留在同一个 spark worker 上而不进行 shuffle 的关键是使用spanByKey

    https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#grouping-rows-by-partition-key

    CREATE TABLE events (year int, month int, ts timestamp, data varchar, PRIMARY KEY (year,month,ts));
    
    sc.cassandraTable("test", "events")
      .spanBy(row => (row.getInt("year"), row.getInt("month")))
    
    sc.cassandraTable("test", "events")
      .keyBy(row => (row.getInt("year"), row.getInt("month")))
      .spanByKey
    

    如果没有 shuffle,那么所有修改都将就地完成并作为迭代器一起流水线化。

    请务必注意警告:

    注意:这仅适用于顺序排序的数据。因为数据是 在 Cassandra 中按集群键排序,所有可行的跨度必须 遵循自然的聚类键顺序。

    【讨论】:

    猜你喜欢
    • 2017-04-13
    • 2014-12-07
    • 1970-01-01
    • 1970-01-01
    • 2021-10-24
    • 2016-11-07
    • 2023-03-22
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多