事实证明,spark 有一种精确控制分区逻辑的方法。这就是spark.read.jdbc 中的predicates 选项。
我最终想出的如下:
(为了这个例子,假设我们有一个商店的购买记录,我们需要根据userId和productId对它进行分区,以便将一个实体的所有记录一起保存在同一台机器,我们可以在这些实体上执行聚合而不用打乱)
- 首先,生成要分区的每一列的直方图(每个值的计数):
| userId |
count |
| 123456 |
1640 |
| 789012 |
932 |
| 345678 |
1849 |
| 901234 |
11 |
| ... |
... |
| productId |
count |
| 123456789 |
5435 |
| 523485447 |
254 |
| 363478326 |
2343 |
| 326484642 |
905 |
| ... |
... |
| userId |
bin |
| 123456 |
1 |
| 789012 |
1 |
| 345678 |
1 |
| 901234 |
2 |
| ... |
... |
| productId |
bin |
| 123456789 |
1 |
| 523485447 |
2 |
| 363478326 |
2 |
| 326484642 |
3 |
| ... |
... |
url = 'jdbc:oracle:thin:username/password@address:port:dbname'
query = ```
(SELECT
MY_TABLE.*,
USER_PARTITION.BIN as USER_BIN,
PRODUCT_PARTITION.BIN AS PRODUCT_BIN
FROM MY_TABLE
LEFT JOIN USER_PARTITION
ON my_table.USER_ID = USER_PARTITION.USER_ID
LEFT JOIN PRODUCT_PARTITION
ON my_table.PRODUCT_ID = PRODUCT_PARTITION.PRODUCT_ID) MY_QUERY```
df = spark.read\
.option('driver', 'oracle.jdbc.driver.OracleDriver')\
jdbc(url=url, table=query, predicates=predicates)
predicates = [
'USER_BIN = 1 OR PRODUCT_BIN = 1',
'USER_BIN = 2 OR PRODUCT_BIN = 2',
'USER_BIN = 3 OR PRODUCT_BIN = 3',
...
'USER_BIN = n OR PRODUCT_BIN = n',
]
谓词作为WHERE子句添加到查询中,这意味着分区1中用户的所有记录都到同一台机器上。此外,分区 1 中产品的所有记录也都在同一台机器上。
请注意,这里的用户和产品之间没有任何关系。我们不在乎哪些产品位于哪个分区或发送到哪台机器。
但是由于我们想要对用户和产品(分别)执行一些聚合,我们需要将一个实体(用户或产品)的所有记录保存在一起。使用这种方法,我们可以在没有任何洗牌的情况下实现这一目标。
另外,请注意,如果有一些用户或产品的记录不适合工人的记忆,那么您需要进行子分区。这意味着您应该首先向您的数据添加一个新的随机数字列(介于 0 和一些 chunk_size 之间,例如 10000 或其他东西),然后根据该数字和原始 ID(例如 userId)的组合进行分区。这会导致每个实体被分成固定大小的块(即 10000),以确保它适合工作人员的记忆。
并且在聚合之后,您需要根据原始 ID 对数据进行分组,以将所有块聚合在一起,并使每个实体再次成为一个整体。
由于我们的内存限制和数据的性质,最后的洗牌是不可避免的,但这是实现预期结果的最有效方式。