【发布时间】:2018-12-21 20:33:12
【问题描述】:
我需要从 Cassandra 获取一些数据,以获取 Kafka-Streams 流应用程序中的条目。我需要在 ID 上执行连接。我想设置一个缓存以节省用于查询的时间。
表很简单:
id | name
---|-----
1 |Mike
我的计划很简单:从数据库中查询表,然后存储到Map[Int, String]。
主要问题是——表中的数据可能会发生变化,需要定期更新,所以需要时不时的去查询一下。
到目前为止,我已经想出了一个这样的线程解决方案:
// local database mirror
class Mirror(user: String, password: String) extends Runnable {
var database: Map[Int, String] = Map[Int, String]() withDefaultValue "undefined"
def run(): Unit = {
update()
}
//
def update(): Unit = {
println("update")
database.synchronized {
println("sync-update")
// val c = Driver.getConnection(...)
// database = c.execute(select id, name from table). ...
database += (1 -> "one")
Thread.sleep(100)
// c.close()
}
}
def get(k: Int): Option[String] = {
println("get")
database.synchronized {
println("sync-get")
if (! (database contains k)) {
update()
database.get(k)
} else {
database.get(k)
}
}
}
}
主要看起来像这样:
def main(args: Array[String]): Unit = {
val db = new Mirror("u", "p")
val ex = new ScheduledThreadPoolExecutor(1)
val f = ex.scheduleAtFixedRate(db, 100, 100, TimeUnit.SECONDS)
while(true) { // simulate stream
val res = db.get(1)
println(res)
Thread.sleep(10000)
}
}
它似乎运行良好。但是我的代码中有什么陷阱吗?特别是我对update 和get 函数的线程安全性没有信心。
【问题讨论】:
标签: scala concurrency cassandra