【问题标题】:How to decide if Spark application performance is close to maximum (for given cores and memory)?如何确定 Spark 应用程序性能是否接近最大值(对于给定的内核和内存)?
【发布时间】:2016-10-16 16:05:49
【问题描述】:

我们在 2 节点集群(每个节点 8 核和 16G 内存)中使用 Cassandra 3.5 和 Spark 1.6.1。

有如下Cassandra表

CREATE TABLE schema.trade (
symbol text,
date int,
trade_time timestamp,
reporting_venue text,
trade_id bigint,
ref_trade_id bigint,
action_type text,
price double,
quantity int,
condition_code text,
PRIMARY KEY ((symbol, date), trade_time, trade_id)
) WITH compaction = {'class' : 'DateTieredCompactionStrategy', 'base_time_seconds' : '3600', 'max_sstable_age_days' : '93'};

我想计算交易量百分比:在按交易所和时间栏(1 或 5 分钟)分组的时间段内相关证券交易的所有交易量总和。 我创建了一个示例:

void getPercentageOfVolume(String symbol, Integer date, Timestamp timeFrom, Timestamp timeTill, Integer barWidth) {
    char MILLISECOND_TO_MINUTE_MULTIPLIER = 60_000;
    LOG.info("start");
    JavaPairRDD<Tuple2, Integer> counts = javaFunctions(sparkContext).cassandraTable("schema", "trade")
            .filter(row ->
                        row.getString("symbol").equals(symbol) && row.getInt("date").equals(date) &&
                        row.getDateTime("trade_time").getMillis() >= timeFrom.getTime() &&
                        row.getDateTime("trade_time").getMillis() < timeTill.getTime())
            .mapToPair(row ->
                new Tuple2<>(
                    new Tuple2(
                            new Timestamp(
                                    (row.getDateTime("trade_time").getMillis() / (barWidth * MILLISECOND_TO_MINUTE_MULTIPLIER)) * barWidth * MILLISECOND_TO_MINUTE_MULTIPLIER
                            ),
                            row.getString("reporting_venue")),
                    row.getInt("quantity")
                )
            ).reduceByKey((a, b) -> a + b);
    LOG.info(counts.collect().toString());
    LOG.info("finish");
}

[2016-06-15 09:25:27.014] [INFO] [main] [EquityTCAAnalytics] 开始
[2016-06-15 09:25:28.000] [INFO] [main] [NettyUtil] 在类路径中找到 Netty 的原生 epoll 传输,使用它
[2016-06-15 09:25:28.518] [INFO] [main] [Cluster] 添加了新的 Cassandra 主机 /node1:9042
[2016-06-15 09:25:28.519] [INFO] [main] [LocalNodeFirstLoadBalancingPolicy] 添加了主机 node1 (datacenter1)
[2016-06-15 09:25:28.519] [INFO] [main] [Cluster] 添加了新的 Cassandra 主机 /node2:9042
[2016-06-15 09:25:28.520] [INFO] [main] [CassandraConnector] 连接到 Cassandra 集群:Cassandra
[2016-06-15 09:25:29.115] [INFO] [main] [SparkContext] 开始工作:收集 EquityTCAAnalytics.java:88
[2016-06-15 09:25:29.385] [INFO] [dag-scheduler-event-loop] [DAGScheduler] 注册 RDD 2 (mapToPair at EquityTCAAnalytics.java:78)
[2016-06-15 09:25:29.388] [INFO] [dag-scheduler-event-loop] [DAGScheduler] 有 5 个输出分区的作业 0(在 EquityTCAAnalytics.java:88 收集)
[2016-06-15 09:25:29.388] [INFO] [dag-scheduler-event-loop] [DAGScheduler] 最后阶段:ResultStage 1(在 EquityTCAAnalytics.java:88 收集)
[2016-06-15 09:25:29.389] [INFO] [dag-scheduler-event-loop] [DAGScheduler] 最后阶段的父母:List(ShuffleMapStage 0)
[2016-06-15 09:25:29.391] [INFO] [dag-scheduler-event-loop] [DAGScheduler] 缺少父母:列表(ShuffleMapStage 0)
[2016-06-15 09:25:29.400] [INFO] [dag-scheduler-event-loop] [DAGScheduler] 提交 ShuffleMapStage 0 (MapPartitionsRDD[2] at mapToPair at EquityTCAAnalytics.java:78),没有丢失的父母
[2016-06-15 09:25:29.594] [INFO] [dag-scheduler-event-loop] [MemoryStore] 块广播_0 作为值存储在内存中(估计大小 10.8 KB,空闲 10.8 KB)
[2016-06-15 09:25:29.642] [INFO] [dag-scheduler-event-loop] [MemoryStore] 块 broadcast_0_piece0 以字节形式存储在内存中(估计大小 5.4 KB,空闲 16.3 KB)
[2016-06-15 09:25:29.647] [INFO] [dispatcher-event-loop-7] [BlockManagerInfo] 在 node2:44871 的内存中添加了 broadcast_0_piece0(大小:5.4 KB,免费:2.4 GB)
[2016-06-15 09:25:29.650] [INFO] [dag-scheduler-event-loop] [SparkContext] 从 DAGScheduler.scala:1006 的广播创建广播 0
[2016-06-15 09:25:29.658] [INFO] [dag-scheduler-event-loop] [DAGScheduler] 从 ShuffleMapStage 0 (MapPartitionsRDD[2] at mapToPair at EquityTCAAnalytics.java:78) 提交 5 个缺失任务
[2016-06-15 09:25:29.661] [INFO] [dag-scheduler-event-loop] [TaskSchedulerImpl] 添加任务集 0.0 和 5 个任务
[2016-06-15 09:25:30.006] [INFO] [dispatcher-event-loop-7] [SparkDeploySchedulerBackend] 注册执行程序 NettyRpcEndpointRef(null) (node1:41122),ID 为 0
[2016-06-15 09:25:30.040] [INFO] [dispatcher-event-loop-7] [TaskSetManager] 在阶段 0.0 中启动任务 0.0(TID 0,node1,分区 0,NODE_LOCAL,11725 字节)
[2016-06-15 09:25:30.051] [INFO] [dispatcher-event-loop-7] [TaskSetManager] 在阶段 0.0 启动任务 1.0(TID 1,node1,分区 1,NODE_LOCAL,11317 字节)
[2016-06-15 09:25:30.054] [INFO] [dispatcher-event-loop-7] [TaskSetManager] 在阶段 0.0 启动任务 2.0(TID 2,node1,分区 2,NODE_LOCAL,11929 字节)
[2016-06-15 09:25:30.057] [INFO] [dispatcher-event-loop-7] [TaskSetManager] 在阶段 0.0 启动任务 3.0(TID 3,node1,分区 3,NODE_LOCAL,11249 字节)
[2016-06-15 09:25:30.059] [INFO] [dispatcher-event-loop-7] [TaskSetManager] 在阶段 0.0 启动任务 4.0(TID 4,node1,分区 4,NODE_LOCAL,11560 字节)
[2016-06-15 09:25:30.077] [INFO] [dispatcher-event-loop-7] [SparkDeploySchedulerBackend] 注册执行程序 NettyRpcEndpointRef(null) (CassandraCH4.ehubprod.local:33668),ID 为 1
[2016-06-15 09:25:30.111] [INFO] [dispatcher-event-loop-4] [BlockManagerMasterEndpoint] 使用 511.1 MB RAM,BlockManagerId(0, node1, 36512) 注册块管理器 node1:36512
[2016-06-15 09:25:30.168] [INFO] [dispatcher-event-loop-3] [BlockManagerMasterEndpoint] 使用 511.1 MB RAM、BlockManagerId(1, CassandraCH4.ehubprod.本地, 33610)
[2016-06-15 09:25:30.818] [INFO] [dispatcher-event-loop-2] [BlockManagerInfo] 在 node1:36512 的内存中添加了 broadcast_0_piece0(大小:5.4 KB,免费:511.1 MB)
[2016-06-15 09:25:36.764] [INFO] [pool-21-thread-1] [CassandraConnector] 与 Cassandra 集群断开连接:Cassandra
[2016-06-15 09:25:48.914] [INFO] [task-result-getter-0] [TaskSetManager] 在节点 1 (1/5) 上的 18854 毫秒内完成阶段 0.0 (TID 4) 中的任务 4.0
[2016-06-15 09:25:55.541] [INFO] [task-result-getter-1] [TaskSetManager] 在节点 1 (2/5) 上的 25489 毫秒内完成阶段 0.0 (TID 2) 中的任务 2.0
[2016-06-15 09:25:57.837] [INFO] [task-result-getter-2] [TaskSetManager] 在节点 1 (3/5) 上的 27795 毫秒内完成阶段 0.0 (TID 1) 中的任务 1.0
[2016-06-15 09:25:57.931] [INFO] [task-result-getter-3] [TaskSetManager] 在节点 1 (4/5) 上的 27919 毫秒内完成阶段 0.0 (TID 0) 中的任务 0.0
[2016-06-15 09:26:01.357] [INFO] [task-result-getter-0] [TaskSetManager] 在节点 1 (5/5) 上的 31302 毫秒内完成阶段 0.0 (TID 3) 中的任务 3.0
[2016-06-15 09:26:01.358] [INFO] [dag-scheduler-event-loop] [DAGScheduler] ShuffleMapStage 0 (mapToPair at EquityTCAAnalytics.java:78) 在 31.602 秒内完成
[2016-06-15 09:26:01.360] [INFO] [dag-scheduler-event-loop] [DAGScheduler] 寻找新的可运行阶段
[2016-06-15 09:26:01.360] [INFO] [dag-scheduler-event-loop] [DAGScheduler] 运行:Set()
[2016-06-15 09:26:01.360] [INFO] [task-result-getter-0] [TaskSchedulerImpl] 从池中删除了任务已全部完成的 TaskSet 0.0
[2016-06-15 09:26:01.362] [INFO] [dag-scheduler-event-loop] [DAGScheduler] 等待:Set(ResultStage 1)
[2016-06-15 09:26:01.362] [INFO] [dag-scheduler-event-loop] [DAGScheduler] 失败:Set()
[2016-06-15 09:26:01.365] [INFO] [dag-scheduler-event-loop] [DAGScheduler] 提交 ResultStage 1 (ShuffledRDD[3] at reduceByKey at EquityTCAAnalytics.java:87),没有丢失的父母
[2016-06-15 09:26:01.373] [INFO] [dag-scheduler-event-loop] [MemoryStore] 块 broadcast_1 作为值存储在内存中(估计大小 3.6 KB,空闲 19.9 KB)
[2016-06-15 09:26:01.382] [INFO] [dag-scheduler-event-loop] [MemoryStore] 块 broadcast_1_piece0 以字节形式存储在内存中(估计大小 2.1 KB,空闲 21.9 KB)
[2016-06-15 09:26:01.383] [INFO] [dispatcher-event-loop-1] [BlockManagerInfo] 在 node2:44871 的内存中添加了 broadcast_1_piece0(大小:2.1 KB,免费:2.4 GB)
[2016-06-15 09:26:01.384] [INFO] [dag-scheduler-event-loop] [SparkContext] 从 DAGScheduler.scala:1006 的广播创建广播 1
[2016-06-15 09:26:01.385] [INFO] [dag-scheduler-event-loop] [DAGScheduler] 从 ResultStage 1 提交 5 个缺失的任务(ShuffledRDD[3] at reduceByKey at EquityTCAAnalytics.java:87)
[2016-06-15 09:26:01.386] [INFO] [dag-scheduler-event-loop] [TaskSchedulerImpl] 添加任务集 1.0 和 5 个任务
[2016-06-15 09:26:01.390] [INFO] [dispatcher-event-loop-4] [TaskSetManager] 在阶段 1.0 中启动任务 0.0(TID 5,node1,分区 0,NODE_LOCAL,2786 字节)
[2016-06-15 09:26:01.390] [INFO] [dispatcher-event-loop-4] [TaskSetManager] 在阶段 1.0 中启动任务 1.0(TID 6,node1,分区 1,NODE_LOCAL,2786 字节)
[2016-06-15 09:26:01.397] [INFO] [dispatcher-event-loop-4] [TaskSetManager] 在阶段 1.0 中启动任务 2.0(TID 7,node1,分区 2,NODE_LOCAL,2786 字节)
[2016-06-15 09:26:01.398] [INFO] [dispatcher-event-loop-4] [TaskSetManager] 在 1.0 阶段启动任务 3.0(TID 8,node1,分区 3,NODE_LOCAL,2786 字节)
[2016-06-15 09:26:01.406] [INFO] [dispatcher-event-loop-4] [TaskSetManager] 在阶段 1.0 中启动任务 4.0(TID 9,node1,分区 4,NODE_LOCAL,2786 字节)
[2016-06-15 09:26:01.429] [INFO] [dispatcher-event-loop-4] [BlockManagerInfo] 在 node1:36512 的内存中添加了 broadcast_1_piece0(大小:2.1 KB,免费:511.1 MB)
[2016-06-15 09:26:01.452] [INFO] [dispatcher-event-loop-6] [MapOutputTrackerMasterEndpoint] 要求将 shuffle 0 的地图输出位置发送到 node1:41122
[2016-06-15 09:26:01.456] [INFO] [dispatcher-event-loop-6] [MapOutputTrackerMaster] shuffle 0 的输出状态大小为 161 字节
[2016-06-15 09:26:01.526] [INFO] [task-result-getter-1] [TaskSetManager] 在节点 1 (1/5) 上的 128 毫秒内完成阶段 1.0 (TID 9) 中的任务 4.0 (1/5)
[2016-06-15 09:26:01.575] [INFO] [task-result-getter-3] [TaskSetManager] 在节点 1 (2/5) 上的 184 毫秒内完成阶段 1.0 (TID 7) 中的任务 2.0 (2/5)
[2016-06-15 09:26:01.580] [INFO] [task-result-getter-2] [TaskSetManager] 在节点 1 (3/5) 上的 193 毫秒内完成阶段 1.0 (TID 5) 中的任务 0.0 (3/5)
[2016-06-15 09:26:01.589] [INFO] [task-result-getter-3] [TaskSetManager] 在节点 1 (4/5) 上的 199 毫秒内完成阶段 1.0 (TID 6) 中的任务 1.0
[2016-06-15 09:26:01.599] [INFO] [task-result-getter-2] [TaskSetManager] 在节点 1 (5/5) 上的 200 毫秒内完成阶段 1.0 (TID 8) 中的任务 3.0
[2016-06-15 09:26:01.599] [INFO] [task-result-getter-2] [TaskSchedulerImpl] 从池中删除了任务已全部完成的 TaskSet 1.0
[2016-06-15 09:26:01.599] [INFO] [dag-scheduler-event-loop] [DAGScheduler] ResultStage 1(在 EquityTCAAnalytics.java:88 收集)在 0.202 秒内完成
[2016-06-15 09:26:01.612] [INFO] [main] [DAGScheduler] 工作 0 完成:在 EquityTCAAnalytics.java:88 收集,耗时 32.496470 秒
[2016-06-15 09:26:01.634] [INFO] [main] [EquityTCAAnalytics] [((2016-06-10 13:45:00.0,DA),6944), ((2016-06-10 14: 25:00.0,B),5241), ..., ((2016-06-10 10:55:00.0,QD),109080), ((2016-06-10 14:55:00.0,A),1300 )]
[2016-06-15 09:26:01.641] [INFO] [main] [EquityTCAAnalytics] 完成

32.5s正常吗?

【问题讨论】:

  • 正常与您要完成的任务以及您正在处理的数据量有关。也可能是网络 IO 的瓶颈。所以实际上你的问题不能用给定的信息来回答。

标签: java apache-spark spark-cassandra-connector


【解决方案1】:

我会说 %% 的 CPU 和/或内存使用率将是一个起点。如果您的核心未得到充分利用,这可能意味着您的流程不够并行。内存大小取决于您的数据,但通常是使用更多的 RAM 而不是返回 IO。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-02-23
    • 2014-12-09
    • 1970-01-01
    • 1970-01-01
    • 2012-06-19
    • 2012-02-14
    • 2017-03-22
    • 2020-03-10
    相关资源
    最近更新 更多