【问题标题】:Improve speed of spark app提高火花应用程序的速度
【发布时间】:2016-10-12 17:26:53
【问题描述】:

这是我的 python-spark 代码的一部分,其中部分运行速度太慢,无法满足我的需要。 尤其是这部分代码,我真的很想提高它的速度,但不知道怎么做。目前处理 6000 万行数据大约需要 1 分钟,我希望将其改进到 10 秒以下。

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load() 

我的 spark 应用的更多上下文:

article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace=source).load().where(range_expr).select('article','created_at').repartition(64*2)

axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load()
speed_df = article_ids.join(axes,article_ids.article==axes.article).select(axes.article,axes.at,axes.comments,axes.likes,axes.reads,axes.shares) \
     .map(lambda x:(x.article,[x])).reduceByKey(lambda x,y:x+y) \
     .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
     .filter(lambda x:len(x[1])>=2) \
     .map(lambda x:x[1][-1]) \
     .map(lambda x:(x.article,(x,(x.comments if x.comments else 0)+(x.likes if x.likes else 0)+(x.reads if x.reads else 0)+(x.shares if x.shares else 0))))    

非常感谢您的建议。

编辑:

count占用大部分时间(50s)不加入

我也尝试过增加并行度,但效果不明显:

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number) 

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load()

【问题讨论】:

  • 你确定是负载,还是连接?连接很昂贵...
  • count 占用了大部分时间不加入,见上面我的更新。谢谢
  • 这个问题和this有什么不同?

标签: python apache-spark cassandra pyspark datastax-enterprise


【解决方案1】:

首先,您应该弄清楚实际花费最多的时间。

例如确定读取数据需要多长时间

axes = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(table="axes", keyspace=source)
  .load()
  .count()

增加并行度或并行读取器的数量可能会有所帮助,但前提是您没有最大化 Cassandra 集群的 IO。

其次,看看您是否可以使用 Dataframes api 完成所有操作。每次使用 python lambda 时,都会在 python 和 scala 类型之间产生序列化成本。

编辑:

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number) 

只有在加载完成后才会生效,所以这对你没有帮助。

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load()

不是 Spark Cassandra 连接器的有效参数,因此不会执行任何操作。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#read-tuning-parameters 输入拆分大小决定了在 Spark 分区中放入多少个 C* 分区。

【讨论】:

  • 我在上面添加了一些细节我尝试在上面的编辑中使用这两种方法增加 parralellsim 但它没有任何效果。您能否具体说明 dateframes API 中所有内容的含义?谢谢
  • @Peter 我为您提供了仅使用 DataFrames previous time 的方法的链接。
  • @zero323 我尝试只使用 dateframes,但 dateframe 似乎没有 keyby 和 reducebykey 方法,因此看来我需要重新使用 RDD。我在尝试时收到此错误消息:AttributeError: 'DataFrame' object has no attribute 'keyBy'。知道该怎么做吗?谢谢
  • @peter 相信我。如果您点击链接,您会找到实现相同结果所需的代码。
  • @zero323 我添加了您的日期框架建议,请您看看这个问题。非常感谢! stackoverflow.com/questions/37848388/…
猜你喜欢
  • 2018-03-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-12-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多