【发布时间】:2016-10-12 17:26:53
【问题描述】:
这是我的 python-spark 代码的一部分,其中部分运行速度太慢,无法满足我的需要。 尤其是这部分代码,我真的很想提高它的速度,但不知道怎么做。目前处理 6000 万行数据大约需要 1 分钟,我希望将其改进到 10 秒以下。
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load()
我的 spark 应用的更多上下文:
article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace=source).load().where(range_expr).select('article','created_at').repartition(64*2)
axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load()
speed_df = article_ids.join(axes,article_ids.article==axes.article).select(axes.article,axes.at,axes.comments,axes.likes,axes.reads,axes.shares) \
.map(lambda x:(x.article,[x])).reduceByKey(lambda x,y:x+y) \
.map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
.filter(lambda x:len(x[1])>=2) \
.map(lambda x:x[1][-1]) \
.map(lambda x:(x.article,(x,(x.comments if x.comments else 0)+(x.likes if x.likes else 0)+(x.reads if x.reads else 0)+(x.shares if x.shares else 0))))
非常感谢您的建议。
编辑:
count占用大部分时间(50s)不加入
我也尝试过增加并行度,但效果不明显:
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number)
和
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load()
【问题讨论】:
-
你确定是负载,还是连接?连接很昂贵...
-
count 占用了大部分时间不加入,见上面我的更新。谢谢
-
这个问题和this有什么不同?
标签: python apache-spark cassandra pyspark datastax-enterprise