【问题标题】:Spark is running queries into database multiple timesSpark 多次在数据库中运行查询
【发布时间】:2018-01-19 20:52:43
【问题描述】:

我正在尝试使用以下代码加载数据集以触发:

Dataset<Row> dataset = spark.read().jdbc(RPP_CONNECTION_URL, creditoDia3, rppDBProperties));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, creditoDia2, rppDBProperties)));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, creditoDia, rppDBProperties)));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia3, rppDBProperties)));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia2, rppDBProperties)));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia,rppDBProperties)));
dataset = dataset.cache();
Long numberOfRowsProcessed = dataset.count();

因此,在这 6 次会话访问我的数据库并提取数据集并计算行数之后,我不再需要访问数据库。但运行以下代码后:

dataset.createOrReplaceTempView("temp");
Dataset<Row> base =  spark.sql(new StringBuilder()
.append("select ")
.append("TRANSACTION ")
.append("from temp ")
.append("where PAYMENT_METHOD in (1,2,3,4) ")
.append("and   TRANSACTION_STATUS in ('A','B') ")
.toString()
);
base.createOrReplaceTempView("base");

但是,我实际上看到的是 spark 再次运行查询,但这一次,附加了我在定义 Dataset&lt;Row&gt; base 时通过的过滤器。如您所见,我已经缓存了数据,但没有任何效果。

问题:是否可以在 spark 中加载内存中的所有内容并使用缓存的数据、查询 spark 而不再是数据库?

从我的关系数据库中获取数据的成本很高,而且需要一段时间。

更新

我注意到 spark 在尝试执行时正在向数据库发送新查询

from base a 
left join base b on on a.IDT_TRANSACTION = b.IDT_TRANSACTION and a.DATE = b.DATE 

这是附加到查询的字符串 spark(从数据库中捕获):

WHERE ("IDT_TRANSACTION_STATUS" IS NOT NULL) AND ("NUM_BIN_CARD" IS NOT NULL)

在日志中出现:

18/01/16 14:22:20 INFO DAGScheduler:ShuffleMapStage 12(显示在 RelatorioBinTransacao.java:496) 在 13,046 秒内完成 18/01/16 14:22:20 INFO DAGScheduler:寻找新的可运行阶段 18/01/16 14:22:20 信息 DAGScheduler:运行:设置(ShuffleMapStage 9)18/01/16 14:22:20 信息 DAGScheduler: 等待: Set(ShuffleMapStage 13, ShuffleMapStage 10,ResultStage 14,ShuffleMapStage 11) 18/01/16 14:22:20 信息 DAGScheduler: 失败: Set()

我不确定我是否明白要说的内容,但我认为内存中缺少某些内容。

如果我只是在左侧加入 cmets,如下所示:

from base a 
//left join base b on on a.IDT_TRANSACTION = b.IDT_TRANSACTION and a.DATE = b.DATE 

它工作得很好,它不再进入数据库。

【问题讨论】:

    标签: java apache-spark batch-processing


    【解决方案1】:

    这听起来您可能没有足够的内存来在集群上存储联合结果。在Long numberOfRowsProcessed = dataset.count(); 之后,请查看 Spark UI 的存储选项卡,以查看整个数据集是否已完全缓存。如果不是,那么您需要更多内存(和/或磁盘空间)。

    如果您确认数据集确实已缓存,请发布查询计划(例如base.explain())。

    【讨论】:

    • 我尝试在我的 spark.read().jdbc 的每一行添加.persist(StorageLevel.MEMORY_AND_DISK_2(),但仍在执行在数据库上附加过滤器的查询。我会尝试查看“存储”选项卡并尽快回复您。
    • 我正在使用.setMaster('local[*]')运行
    • 那么你是在本地模式下运行的吗?您将什么设置为驱动程序/执行程序内存?
    • 我构建了一个带有 spark 库的 java 应用程序并运行 java -Xmx25g -cp myjar.jar org.pack.MyClass
    • 刚刚发现哪个 spark sql 正在向数据库 .append("from base a ") .append("left join (select distinct date, idt_transaction from base where idt_transaction_status = 9 and num_bin_card is not null) b ") .append("on a.IDT_TRANSACTION = b.IDT_TRANSACTION and a.DATE = b.DATE ") 发送查询,当我离开连接库时,这是我已经创建的 TempView,它再次向数据库发送查询。如果我把它作为左连接,它就不再发送查询并且工作正常。问题是:如果我已经缓存了基表并且已经访问了它,为什么?
    【解决方案2】:

    我想出了一个解决问题的方法。我必须在向数据库发送查询的每一行添加cache() 指令。所以它看起来像这样:

    Dataset<Row> dataset = spark.read().jdbc(RPP_CONNECTION_URL, fake, rppDBProperties);
            dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, creditoDia3, rppDBProperties).cache());
            dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, creditoDia2, rppDBProperties).cache());
            dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, creditoDia, rppDBProperties).cache());
            dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia3, rppDBProperties).cache());
            dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia2, rppDBProperties).cache());
            dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia,rppDBProperties).cache());
            dataset = dataset.cache();
    

    我不得不添加fake sql的第一行,因为无论我做什么,spark似乎都没有考虑缓存第一个查询,所以我一直看到第一个查询被发送到数据库。

    底线,我不明白为什么我必须在每行添加cache() 指令,如果我已经在最后做了。但是,它奏效了。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-05-20
      • 2021-07-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-10-03
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多