【发布时间】:2018-03-27 17:34:18
【问题描述】:
我正在尝试对我的 RDD 进行一些转换,为此,我正在使用 map 调用一个函数。但是,这个函数没有被调用。有人请让我知道我在这里做错了什么?
我可以看到 test 函数被调用但不是 store_past_info
def store_past_info(row):
print "------------------- store_past_info ------------------------------"
if row["transactiontype"] == "Return":
global prv_transaction_number
prv_transaction_number = row["transnumber"]
global return_occured
return_occured = True
global group_id
group_id.append(row["transnumber"])
if row["transactiontype"] == "Purchase":
if return_occured:
global group_id
group_id.append(prv_transaction_number)
else:
global group_id
group_id.append(row["transnumber"])
print group_id
def test(rdd):
print "------------------- test ------------------------------"
rdd.map(store_past_info).collect()
print group_id
这是它在商店中的运作方式:
- 如果购买了某些商品,则会生成一个 ID。
-
如果您想退回购买的几件商品,则输入了两个条目
- 使用新 id 退回所有产品的退货条目,
org_id作为您要退回的采购订单的id - 新的购买条目与您上次购买的 ID 相同的
id用于您想要保留的东西
- 使用新 id 退回所有产品的退货条目,
输入
Date Type Id org_id
25-03-2018 Purchase 111
25-03-2018 Purchase 112
26-03-2018 Return 113 111
26-03-2018 Purchase 111
输出 我想添加一个新列 group_id,它将显示退货和退货后发生的相应购买的相同 id(客户不进行此购买,这是系统为每次退货保留条目的方式)步骤 2.1
Date Type Id org_id group_id
25-03-2018 Purchase 111 111
25-03-2018 Purchase 112 112
26-03-2018 Return 113 111 113
26-03-2018 Purchase 111 113
【问题讨论】:
-
我认为你不能像在 pySpark 中那样使用全局变量。你想做什么?你能提供一些示例输入/所需的输出吗?
-
@pault 我已经更新了这个问题。但是,我也没有看到函数被调用。
-
您能否详细说明如何从输入到输出?我认为你更适合使用数据框和窗口函数来解决这个问题,但我仍然不是 100% 清楚你的逻辑。
-
@pault 请检查问题现在是否更清楚。
标签: apache-spark hadoop pyspark