【问题标题】:Is it possible to refer or update memcache/reddis/DB data from spark jobs?是否可以从 spark 作业中引用或更新 memcached/redis/DB 数据?
【发布时间】:2017-05-16 16:57:30
【问题描述】:

我们在缓存/数据库中有一些业务数据。我们每天处理大量日志数据并更新我们的数据缓存/数据库。其中一些更新是近乎实时的,一些是批量更新的。我们有火花工作做很多转变。我们将 Spark 作业的结果存储在文本文件中,然后运行另一个顺序作业将它们放入我们的缓存/DB 中。

我考虑过使用连接器(mongoDB-spark 连接器、redis-spark 连接器),并将整个数据作为 RDD 并对其进行处理。但是与我们所做的日志文件和每日更新相比,我们的业务数据量确实很大。所以,放弃了。

问题:

  1. 我们能否从缓存/数据库上的执行程序直接更新,以避免最后一步?
  2. 任何其他建议或替代方法以获得更好的性能?
  3. 您在这里看到任何反模式吗?

【问题讨论】:

  • 不清楚你的问题是什么。是的,您可以从 Spark 直接写入数据库或其他任何具有 Java/scala 连接器的东西。
  • 同意你说的,绝对有可能。但我想知道每次记录转换打开和关闭连接的性能。还有其他更好的方法来共享@ executors 级别的连接吗?我猜连接的广播不会有帮助。

标签: mongodb caching apache-spark pyspark memcached


【解决方案1】:

如果您对数据库的写入很简单,您可以使用以下命令直接写入数据库:

myDF.write
    .mode("overwrite") // Choose the mode you want from org.apache.spark.sql.SaveMode
    .jdbc(url, "my_table", props)

如果查询不仅仅是一个简单的插入(例如,我的查询中有一个带有 on duplicate key update 部分的查询),您需要自己做。

您可以使用mapPartitions() 在分区之间分配写入。

myDF.mapPartitions(rows => {
  val connection = DriverManager.getConnection(URL, properties)

  rows.foreach(bulk => {
      val statement = connection.prepareStatement(myQuery)
      bulk.foreach(row => {
        statement.setString(1, row.getString(0))
        statement.setInt(2, row.getInt(1))
        ...
        statement.addBatch()
      })

      statement.executeLargeBatch().iterator
    })
  rows
}).count //An action here is required, to trigger the mapPartitions()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2010-11-20
    • 2013-10-02
    • 1970-01-01
    • 1970-01-01
    • 2011-06-03
    • 1970-01-01
    • 1970-01-01
    • 2016-04-27
    相关资源
    最近更新 更多