【问题标题】:Cache Cassandra table in scala application在 scala 应用程序中缓存 Cassandra 表
【发布时间】:2018-12-21 20:33:12
【问题描述】:

我需要从 Cassandra 获取一些数据,以获取 Kafka-Streams 流应用程序中的条目。我需要在 ID 上执行连接。我想设置一个缓存以节省用于查询的时间。 表很简单: id | name ---|----- 1 |Mike

我的计划很简单:从数据库中查询表,然后存储到Map[Int, String]
主要问题是——表中的数据可能会发生变化,需要定期更新,所以需要时不时的去查询一下。

到目前为止,我已经想出了一个这样的线程解决方案:

    // local database mirror
    class Mirror(user: String, password: String) extends Runnable {

      var database: Map[Int, String] =  Map[Int, String]() withDefaultValue "undefined"

      def run(): Unit = {
        update()
      }


      // 
      def update(): Unit = {
        println("update")
        database.synchronized {
          println("sync-update")        
          // val c = Driver.getConnection(...)
          // database = c.execute(select id, name from table). ...
          database += (1 -> "one")
          Thread.sleep(100)
          // c.close()
        }
      }

      def get(k: Int): Option[String] = {
        println("get")
        database.synchronized {
          println("sync-get")
          if (! (database contains k)) {
            update()
            database.get(k)
          } else {
            database.get(k)
          }
        }      
      }
    }

主要看起来像这样:

    def main(args: Array[String]): Unit = {

      val db = new Mirror("u", "p")
      val ex = new ScheduledThreadPoolExecutor(1)
      val f = ex.scheduleAtFixedRate(db, 100, 100, TimeUnit.SECONDS)       

      while(true) { // simulate stream
        val res = db.get(1)
        println(res)
        Thread.sleep(10000)
      }       
    }

它似乎运行良好。但是我的代码中有什么陷阱吗?特别是我对updateget 函数的线程安全性没有信心。

【问题讨论】:

    标签: scala concurrency cassandra


    【解决方案1】:

    如果您不反对使用 Akka,我会查看 Akka Streams;特别是Alpakka 来做到这一点。如果不需要,就没有必要重新发明轮子。

    话虽如此,代码存在以下问题:

    1. 如果更新 Cassandra 中的条目,则对缓存的存在性检查将无济于事。只有当它们从您的缓存中丢失时才会有所帮助
    2. 如果您认为大多数情况下您的缓存将包含 当前 条目,请考虑使用 reentrant read write lock。如果您有多个线程调用您的镜像,这将有助于解决争用。

    再次,我强烈建议您查看 Akka Streams 和 Alpakka,因为您可以使用该工具做您想做的事情,而不必自己编写一堆代码。

    【讨论】:

    • 谢谢。现在限制使用纯 Kafka 解决方案。存在检查旨在以这种方式工作。如果缺少 id 来自流,它将强制从 Cassandra 更新内容。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-08-17
    • 2012-03-26
    • 2018-06-07
    • 2011-11-29
    • 1970-01-01
    • 2020-06-27
    • 1970-01-01
    相关资源
    最近更新 更多