【发布时间】:2018-07-15 15:27:58
【问题描述】:
我正在尝试使用 Google Cloud Services 实施实时推荐系统。我已经使用 Kafka、Apache Storm 和 Cassandra 构建了引擎,但我想使用 Cloud Pub/Sub、Cloud Dataflow 和 Cloud Bigtable 在 Scala 中创建相同的引擎。
到目前为止,在 Cassandra 中,由于我在 Apache Storm 螺栓操作期间多次读写,因此我实现了以下连接器 MyDatabase.scala,它启动与数据库的单例连接并使用它使用来自 Kafka spout 的流数据读取和更新用户表。我为 Cassandra 使用了 Phantom Scala API 驱动程序。
MyDatabase.scala
import scala.concurrent.Await
import scala.concurrent.duration._
import com.websudos.phantom.dsl._
object CustomConnector {
val hosts = Seq("localhost")
val connector = ContactPoints(hosts).keySpace(""my_keyspace")
}
class MyDatabase(val keyspace: KeySpaceDef) extends Database(keyspace) {
object users extends Users with keyspace.Connector
}
object MyDatabase extends MyDatabase(CustomConnector.connector) {
Await.result(MyDatabase.autocreate.future(), 5.seconds)
}
Users.scala
import com.websudos.phantom.CassandraTable
import com.websudos.phantom.dsl._
import scala.concurrent.Future
case class User(
id: String,
items: Map[String, Int]
)
class UsersTable extends CassandraTable[Users, User] {
object id extends StringColumn(this) with PartitionKey[String]
object items extends MapColumn[String, Int](this)
def fromRow(row: Row): User = {
User(
id(row),
items(row)
)
}
}
abstract class Users extends UsersTable with RootConnector {
def store(user: User): Future[ResultSet] = {
insert.value(_.id, user.id).value(_.items, user.items)
.consistencyLevel_=(ConsistencyLevel.ALL)
.future()
}
def getById(id: String): Future[Option[User]] = {
select.where(_.id eqs id).one()
}
}
Dataflow 管道将如下所示:
- 从 Pub/Sub 提取流数据。
- 在单个 parDo 中实现逻辑 我们将使用一些新的更新 Bigtable 中的多个表 从 Pub/Sub 提取的数据生成的值。
当您使用 Phantom DSL 时,创建与 Cassandra 的连接非常简单。我的问题是,是否有任何类似的库,例如 Google Cloud Bigtable 的 Phantom,或者使用 Google Cloud API 和 Scio 实现它的正确方法是什么(因为我将使用 Scala 实现 Dataflow 管道)。似乎我找不到任何相关示例来建立与 Bigtable 的连接并在 Scala 的 Dataflow 管道中使用此连接。
谢谢
【问题讨论】:
-
Scio 0.4.7 提供了一个内置的
BigtableDoFn抽象类,您可以将其子类化以获得对BigtableSession对象的访问权限,以便对 Bigtable 进行异步查找。我还没有确认你是否真的可以用它来写回 Bigtable。理想情况下,您将使用BigtableSCollection类的.saveAsBigtable()方法写入Bigtable,这样您就可以在出错时自动重试。在内部,这只是使用 Apache Beam 的BigtableIO类。参考:spotify.github.io/scio/api/com/spotify/scio/bigtable/…. -
@Andrew 非常感谢。 BigtableDoFn 是我的问题的解决方案。你也可以读/写。
-
顺便说一句,您是否知道 Scio 是否有类似的 DoFn 功能可以与 Datastore 建立单一连接并从中异步读取/写入?
-
从 Scio 0.4.7 开始,Cloud Datastore 没有类似于
BigtableDoFn的内置接口。在 0.4.7 发布之前,我通常只会继承ScalaAsyncDoFn或DoFnWithResource并将 DoFn 的资源设置为某个连接对象。我想你可以为 Datastore 实现类似的东西。参考:spotify.github.io/scio/api/com/spotify/scio/transforms/….
标签: scala cassandra google-cloud-dataflow google-cloud-bigtable spotify-scio