【问题标题】:Why Apache Spark is performing the filters on client为什么 Apache Spark 在客户端执行过滤器
【发布时间】:2015-09-17 11:16:38
【问题描述】:

作为 apache spark 的新手,在 Spark 上获取 Cassandra 数据时遇到一些问题。

List<String> dates = Arrays.asList("2015-01-21","2015-01-22");
CassandraJavaRDD<A> aRDD = CassandraJavaUtil.javaFunctions(sc).
                    cassandraTable("testing", "cf_text",CassandraJavaUtil.mapRowTo(A.class, colMap)).
                    where("Id=? and date IN ?","Open",dates);

此查询未过滤 cassandra 服务器上的数据。虽然这个 java 语句正在执行它的内存并最终抛出 spark java.lang.OutOfMemoryError 异常。查询应该过滤掉 cassandra 服务器上的数据,而不是 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md 中提到的客户端。

当我在 cassandra cqlsh 上使用过滤器执行查询时,它的性能很好,但是在没有过滤器(where 子句)的情况下执行查询会给出预期的超时。所以很明显spark没有在客户端应用过滤器。

SparkConf conf = new SparkConf();
            conf.setAppName("Test");
            conf.setMaster("local[8]");
            conf.set("spark.cassandra.connection.host", "192.168.1.15")

为什么在客户端应用过滤器以及如何改进它以在服务器端应用过滤器。

我们如何在 windows 平台上的 cassandra 集群之上配置 spark 集群??

【问题讨论】:

    标签: java apache-spark out-of-memory cassandra-2.0 spark-cassandra-connector


    【解决方案1】:

    没有使用 Cassandra 和 Spark,通过阅读您提供的部分(谢谢)我看到:

    注意:虽然 ALLOW FILTERING 子句隐式添加到 生成的 CQL 查询,并非所有谓词当前都被 卡桑德拉引擎。这个限制将在 未来的 Cassandra 版本。目前, ALLOW FILTERING 与 由二级索引或聚簇列索引的列。

    我很确定(但尚未测试)不支持“IN”谓词:请参阅https://github.com/datastax/spark-cassandra-connector/blob/24fbe6a10e083ddc3f770d1f52c07dfefeb7f59a/spark-cassandra-connector-java/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaRDD.java#L80

    因此,您可以尝试将 where 子句限制为 Id(假设有一个二级索引)并对日期范围使用 spark 过滤。

    【讨论】:

    • 感谢您为我提供了该链接,为我节省了一天的时间。
    【解决方案2】:

    我建议将表格作为 DataFrame 而不是 RDD 读取。这些在 Spark 1.3 及更高版本中可用。然后您可以将 CQL 查询指定为这样的字符串:

    CassandraSQLContext sqlContext = new CassandraSQLContext(sc);
    
    String query = "SELECT * FROM testing.cf_text where id='Open' and date IN ('2015-01-21','2015-01-22')";
    DataFrame resultsFrame = sqlContext.sql(query);
    
    System.out.println(resultsFrame.count());
    

    所以尝试一下,看看它是否更适合你。

    在 DataFrame 中获得数据后,您可以在其上运行 Spark SQL 操作。如果你想要RDD中的数据,你可以将DataFrame转换为RDD。

    【讨论】:

    • 我会试试 DataFrame
    • 你知道如何在windows平台的cassandra集群之上配置spark集群吗??
    【解决方案3】:

    在 SparkConfing 中设置 spark.cassandra.input.split.size_in_mb 解决了这个问题。

    conf = new SparkConf();
            conf.setAppName("Test");
            conf.setMaster("local[4]");
            conf.set("spark.cassandra.connection.host", "192.168.1.15").
            set("spark.executor.memory", "2g").
            set("spark.cassandra.input.split.size_in_mb", "67108864");
    

    Spark-cassnadra-connector 读取 spark.cassandra.input.split.size_in_mb 的错误值,因此在 SparkConf 中覆盖该值即可。现在 IN 子句也运行良好。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-02-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-12-21
      相关资源
      最近更新 更多