【问题标题】:Establishing singleton connection with Google Cloud Bigtable in Scala similar to Cassandra在 Scala 中建立与 Google Cloud Bigtable 的单例连接,类似于 Cassandra
【发布时间】:2018-07-15 15:27:58
【问题描述】:

我正在尝试使用 Google Cloud Services 实施实时推荐系统。我已经使用 Kafka、Apache Storm 和 Cassandra 构建了引擎,但我想使用 Cloud Pub/Sub、Cloud Dataflow 和 Cloud Bigtable 在 Scala 中创建相同的引擎。

到目前为止,在 Cassandra 中,由于我在 Apache Storm 螺栓操作期间多次读写,因此我实现了以下连接器 MyDatabase.scala,它启动与数据库的单例连接并使用它使用来自 Kafka spout 的流数据读取和更新用户表。我为 Cassandra 使用了 Phantom Scala API 驱动程序。

MyDatabase.scala

import scala.concurrent.Await
import scala.concurrent.duration._
import com.websudos.phantom.dsl._


object CustomConnector {

  val hosts = Seq("localhost")

  val connector = ContactPoints(hosts).keySpace(""my_keyspace")

}

class MyDatabase(val keyspace: KeySpaceDef) extends Database(keyspace) {
  object users extends Users with keyspace.Connector
}

object MyDatabase extends MyDatabase(CustomConnector.connector) {
  Await.result(MyDatabase.autocreate.future(), 5.seconds)
}

Users.scala

import com.websudos.phantom.CassandraTable
import com.websudos.phantom.dsl._

import scala.concurrent.Future

case class User(
                 id: String,
                 items: Map[String, Int]
               )

class UsersTable extends CassandraTable[Users, User] {

  object id extends StringColumn(this) with PartitionKey[String]
  object items extends MapColumn[String, Int](this)

  def fromRow(row: Row): User = {
    User(
      id(row),
      items(row)
    )
  }
}

abstract class Users extends UsersTable with RootConnector {

  def store(user: User): Future[ResultSet] = {
    insert.value(_.id, user.id).value(_.items, user.items)
      .consistencyLevel_=(ConsistencyLevel.ALL)
      .future()
  }

  def getById(id: String): Future[Option[User]] = {
    select.where(_.id eqs id).one()
  }
}

Dataflow 管道将如下所示:

  1. 从 Pub/Sub 提取流数据。
  2. 在单个 parDo 中实现逻辑 我们将使用一些新的更新 Bigtable 中的多个表 从 Pub/Sub 提取的数据生成的值。

当您使用 Phantom DSL 时,创建与 Cassandra 的连接非常简单。我的问题是,是否有任何类似的库,例如 Google Cloud Bigtable 的 Phantom,或者使用 Google Cloud API 和 Scio 实现它的正确方法是什么(因为我将使用 Scala 实现 Dataflow 管道)。似乎我找不到任何相关示例来建立与 Bigtable 的连接并在 Scala 的 Dataflow 管道中使用此连接。

谢谢

【问题讨论】:

  • Scio 0.4.7 提供了一个内置的 BigtableDoFn 抽象类,您可以将其子类化以获得对 BigtableSession 对象的访问权限,以便对 Bigtable 进行异步查找。我还没有确认你是否真的可以用它来写回 Bigtable。理想情况下,您将使用BigtableSCollection 类的.saveAsBigtable() 方法写入Bigtable,这样您就可以在出错时自动重试。在内部,这只是使用 Apache Beam 的 BigtableIO 类。参考:spotify.github.io/scio/api/com/spotify/scio/bigtable/….
  • @Andrew 非常感谢。 BigtableDoFn 是我的问题的解决方案。你也可以读/写。
  • 顺便说一句,您是否知道 Scio 是否有类似的 DoFn 功能可以与 Datastore 建立单一连接并从中异步读取/写入?
  • 从 Scio 0.4.7 开始,Cloud Datastore 没有类似于 BigtableDoFn 的内置接口。在 0.4.7 发布之前,我通常只会继承 ScalaAsyncDoFnDoFnWithResource 并将 DoFn 的资源设置为某个连接对象。我想你可以为 Datastore 实现类似的东西。参考:spotify.github.io/scio/api/com/spotify/scio/transforms/….

标签: scala cassandra google-cloud-dataflow google-cloud-bigtable spotify-scio


【解决方案1】:

DoFn 中处理多个元素之间共享数据库连接的Beam 方法是使用@Setup@Teardown 方法。有关示例,请参见 source code of the Beam Cassandra connector

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-02-16
    • 2016-06-13
    • 2017-12-24
    • 2015-09-15
    • 2018-03-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多