【问题标题】:How Spark-cassandra-connector determines the range to query on Cassandra?Spark-cassandra-connector 如何确定在 Cassandra 上查询的范围?
【发布时间】:2021-09-26 13:12:01
【问题描述】:

我有一个三节点 Cassandra 集群,每个节点上运行 Spark 执行器。我知道要扫描 Cassandra 数据库,SCC(Spark-Cassandra-Connector) 使用范围查询将令牌放在 where 子句中。在不同节点上运行的 SCC 实例如何能够选择与在其他节点上运行的其他 SCC 实例不同的范围。例如,node1 上的 SCC 实例 A 选择了一个范围 RangeA,那么 SCC 实例 BC 如何决定不使用相同的范围 RangeA

他们会互相交流吗?

【问题讨论】:

  • 我已经尝试通过更多指向源代码的指针来回答您的问题

标签: apache-spark cassandra spark-cassandra-connector


【解决方案1】:

Spark-cassandra-connector 基础知识

spark-cassandra-connector 的内部结构相当复杂,但最重要的部分(过于简化)如下:

  • 连接器自然更喜欢query locally。例如,为了避开网络并让 spark executor 查询其本地 cassandra 节点
  • 为此,驱动程序需要了解 Cassandra 拓扑以及您需要查询的令牌范围在哪里(驱动程序完成了初始 ring describe,因此在此之后有一个完整的了解在哪里可以找到什么代币的一部分)
  • 在了解令牌范围的位置并将每个令牌映射到 IP 之后,连接器会以这样一种方式分散工作,即每个本地 spark 执行器都可以查询其本地范围的那部分

更多详细信息

它比这要复杂一些,但简而言之就是这样。我认为来自 Datastax 的 video 解释得更好。

您可能还想考虑reading 这个问题(诚然,答案很模糊)。

如何构建数据对于解决这个问题很重要盒子

请注意,以驱动程序可以尝试执行此操作的方式构建数据和查询需要一些技能/知识。

实际上,最常见的性能问题类型通常源于结构不良的数据或导致非本地执行的查询。 datastax java 驱动程序和 spark-cassandra-connector 在内部尽最大努力使查询本地化,但您需要还遵循最佳实践来构建数据。如果您还没有这样做,我建议您阅读/通过 DataStax 的 Data Modeling By Example 文章中描述的培训。

编辑:没有位置的查询

正如您所提到的,有时执行程序与节点不在同一主机上。不过,原理是一样的:

当您有查询时,它是在某个令牌范围内。此查询的一些数据将归node A“所有”,一些数据将归node B“所有”,还有一些归node C“所有”。

ring describe 操作告诉驱动程序,对于某个范围,它的哪一部分在node A,哪一部分在node B,哪一部分在node C。然后,驱动程序基本上将查询拆分为 3 个子查询,并从拥有特定范围的适当节点请求它。

每个节点都用自己的部分进行响应,最后由驱动程序聚合。

你可能注意到local or not,原理完全一样:

仅向每个节点询问其拥有的特定范围,驱动程序之前通过使用ring describe 操作了解到该范围

希望它更清楚一点。

【讨论】:

  • 如果无法获取位置(与 Ca​​ssandra 节点位于同一主机上的 spark 执行器)怎么办?例如在 K8S 环境中,我无法让 executor 和 Cassandra 节点在同一个 pod/container 上运行。不同的 SCC 实例将如何识别它们的范围?
  • 如果不能进行本地执行,那么 spark 执行器的每个实例都会获得令牌范围的“一块”,并针对选定的节点执行它。这里的原理(除了局部性)是相同的:每个执行器获取自己的范围片段并将其连接到该片段所在的 cassandra 节点(通过最初描述令牌范围来知道)。
  • 我不确定我是否理解得足够好。根据我的理解,您是说 SCC 是 Cassandra 拓扑感知的,并且会选择或尝试选择它本地的那些范围。但是如果不能实现本地化,那么 SCC 将如何选择它的令牌范围。
  • 本地与否并不重要(尽管如果可能,本地是首选)。它是拓扑感知的事实在这里是相关的,这意味着对于任何给定的查询(对于 3 个节点的集群),它可以分为 3 个子查询:每个节点 1 个。然后将每个子查询“委托”给不同的执行程序,该执行程序依次访问该子查询的数据所在的 Cassandra 节点。
  • 这 3 个查询不是必需的 :-)
【解决方案2】:

驱动程序在执行操作时正在生成分区列表,然后将映射到 Spark 分区并在工作节点之间分发。分区的生成确实取决于多种因素(您可以查看ScanHelper.getPartitionGenerator function):

  • WHERE 条件是否包含分区键
  • 是否已指定分区计数

基于此,它返回一个CassandraPartitionGenerator class 的实例,该实例使用partitions function 执行真正的分区生成,从集群中获取令牌范围列表,如有必要,将这些令牌范围拆分为较小的令牌范围,按它们所属的节点等对它们进行分组。

CassandraPartitionGenerator 的实例随后被 DataFrame 或 RDD API 用于获取将由 Spark 调度执行的 Spark 分区列表。最后这些分区由CqlTokenRange class 转换为CQL where 子句。

附: Russel Spitzer 还写了一篇关于 Spark 数据本地化和 Spark Cassandra 连接器的 blog post - 这也有助于理解。

【讨论】:

    猜你喜欢
    • 2019-04-10
    • 2012-01-09
    • 2020-06-03
    • 2016-07-21
    • 2016-09-02
    • 1970-01-01
    • 2015-03-10
    • 2016-05-08
    • 1970-01-01
    相关资源
    最近更新 更多