【发布时间】:2019-07-22 09:36:22
【问题描述】:
我有两张表,例如:
ID Name Age
1 Alex 20
2 Sarah 21 and so on
.....................
ID Name Marks
1 Alex 80
2 Sarah 78 and so on
.....................
我想在多个键(连接条件)上使用 Cloud Dataflow (Apache Beam) 连接这两个表。 e. ID 和 Name 都是公共列。我该怎么做?
我尝试使用一个键(一个公共列)加入它,但我不知道如何使用多个键
我已将此代码用作参考:
class JoinTables:
def add_key_details(self, row, key_details):
result = row.copy()
try:
result.update(key_details[row['name']])
except KeyError as err:
traceback.print_exc()
logging.error("Name Not Found error: %s", err)
return result
def run(argv=None):
jointables = JoinTables()
table1= (p
| 'Read table1 details from BigQuery ' >> beam.io.Read(
beam.io.BigQuerySource(
query='SELECT * FROM `dataset.table1`',
use_standard_sql=True
)
)
| 'Key Details 1' >> beam.Map(lambda row: (row['name'], row))
)
table2 = (p
| 'Read table2 details from BigQuery ' >> beam.io.Read(
beam.io.BigQuerySource(
query='SELECT * FROM `dataset.table2`',
use_standard_sql=True
)
)
| 'Join data with side input 1' >> beam.Map(jointables.add_key_details, AsDict(table1))
)
【问题讨论】:
标签: python join google-cloud-dataflow apache-beam