【发布时间】:2020-05-04 23:59:04
【问题描述】:
我正在尝试根据驻留在另一个动态框架中的数据过滤动态过滤,我正在处理 join and relational example,在此代码中,人员和成员资格动态框架由 id 加入,但我想根据 id 过滤人员出现在会员 DF 中,下面是我放置静态值的代码
import sys
from awsglue.transforms import Join
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
glueContext = GlueContext(SparkContext.getOrCreate())
# catalog: database and table names
db_name = "legislators"
tbl_persons = "persons_json"
tbl_membership = "memberships_json"
tbl_organization = "organizations_json"
# Create dynamic frames from the source tables
persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons)
memberships = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_membership)
orgs = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_organization)
persons = persons.drop_fields(['links', 'identifiers','other_names', 'images','contact_details'])
# Keep the fields we need and rename some.
orgs = orgs.drop_fields(['other_names', 'identifiers','links'])
fileredPersons = Filter.apply(frame = persons,
f = lambda x: x["id"] in ["0005af3a-9471-4d1f-9299-737fff4b9b46", "00aca284-9323-4953-bb7a-1bf6f5eefe95"])
print "Filtered record count: ", fileredPersons.count()
下面是过滤逻辑
fileredPersons = Filter.apply(frame = persons,
f = lambda x: x["id"] in ["0005af3a-9471-4d1f-9299-737fff4b9b46", "00aca284-9323-4953-bb7a-1bf6f5eefe95"])
我想将会员 DF 中的 person_id 列传递到过滤函数条件中,基本上过滤具有会员资格的人,任何帮助将不胜感激。
【问题讨论】:
-
如果你想使用 pyspark 代替胶水,我不确定是否可以使用胶水
-
能否请您详细说明如何使用 pyspark 进行操作
标签: amazon-web-services pyspark filtering aws-glue