【发布时间】:2023-03-12 22:34:01
【问题描述】:
我有 mysql 表存储在 dataframe_mysql 中
dataframe_mysql = sqlContext.read.format("jdbc").options(...
dataframe_mysql.registerTempTable('dataf')
groupedtbl=sqlContext.sql("""SELECT job_seq_id,first(job_dcr_content) as firststage,last(job_dcr_content) as laststage,
first(from_stage) as source, last(from_stage) as target , count(jid) as noofstages from dataf group by job_seq_id having count(jid)>1""" )
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType
func1 = udf(fu1, StringType())
func2= udf(fu2, StringType())
res1=groupedtbl.withColumn('dcol',func1(groupedtbl.firststage,groupedtbl.lastage,groupedtbl.job_seq_id))
res2=res1.withColumn('lcol',func2(res1.dcol,res1.job_seq_id))
对于上面的代码,我看到即使我发出了一个限制命令:
lb=res2.limit(2).collect()
或以下命令仅获取一条记录的结果:
labels.filter(res2.job_seq_id==5843064)
它不仅在第一个查询中获得两个结果或在第二个查询中获得单个结果,它还对其他行进行了大量不必要的计算,即使只需要两行,也会浪费时间。我可以从内部日志中看到这一点,即使只是获取两行,其计算 100 行,然后从中检索两个结果行。我虽然 DAG 机制应该可以处理这个问题,但它似乎没有,我在这个观察中错了吗?
【问题讨论】:
标签: python apache-spark pyspark