【问题标题】:AWS Glue Dynamic Filtering - Filter one dynamic frame using another dynamic frameAWS Glue 动态过滤 - 使用另一个动态框架过滤一个动态框架
【发布时间】:2020-05-04 23:59:04
【问题描述】:

我正在尝试根据驻留在另一个动态框架中的数据过滤动态过滤,我正在处理 join and relational example,在此代码中,人员和成员资格动态框架由 id 加入,但我想根据 id 过滤人员出现在会员 DF 中,下面是我放置静态值的代码

    import sys
from awsglue.transforms import Join
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

# catalog: database and table names
db_name = "legislators"
tbl_persons = "persons_json"
tbl_membership = "memberships_json"
tbl_organization = "organizations_json"



# Create dynamic frames from the source tables 
persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons)
memberships = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_membership)
orgs = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_organization)

persons = persons.drop_fields(['links', 'identifiers','other_names', 'images','contact_details'])


# Keep the fields we need and rename some.
orgs = orgs.drop_fields(['other_names', 'identifiers','links'])


fileredPersons = Filter.apply(frame = persons,
                              f = lambda x: x["id"] in ["0005af3a-9471-4d1f-9299-737fff4b9b46", "00aca284-9323-4953-bb7a-1bf6f5eefe95"])
print "Filtered record count:  ", fileredPersons.count()

下面是过滤逻辑

 fileredPersons = Filter.apply(frame = persons,
                                  f = lambda x: x["id"] in ["0005af3a-9471-4d1f-9299-737fff4b9b46", "00aca284-9323-4953-bb7a-1bf6f5eefe95"])

我想将会员 DF 中的 person_id 列传递到过滤函数条件中,基本上过滤具有会员资格的人,任何帮助将不胜感激。

【问题讨论】:

  • 如果你想使用 pyspark 代替胶水,我不确定是否可以使用胶水
  • 能否请您详细说明如何使用 pyspark 进行操作

标签: amazon-web-services pyspark filtering aws-glue


【解决方案1】:

您可以简单地执行内部连接,而不是像过滤一样

persons_filtered = persons.alias('persons').join(memberships, persons.id==memberships.id).select('persons.*')

这只会为您提供过滤后的值。 如果您的会员 df 很小或需要查找,那么您甚至可以广播它以获得更快的结果

from pyspark.sql.functions import broadcast
persons_filtered = persons.alias('persons').join(broadcast(memberships), persons.id==memberships.id).select('persons.*')

希望对你有帮助。

【讨论】:

  • 我得到“persons_filtered = people.join(memberships, people.id==memberships.id).select(persons.*)”的无效语法,这有什么问题吗
  • 我将 Glue Dyanmic Frame 转换为 Dataframe 并应用了您提供的代码
猜你喜欢
  • 1970-01-01
  • 2018-05-26
  • 1970-01-01
  • 2021-07-19
  • 1970-01-01
  • 2021-10-04
  • 2023-04-06
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多