【问题标题】:Realtime Database streaming Using Apache Spark and kfaka使用 Apache Spark 和 kafka 进行实时数据库流式传输
【发布时间】:2017-01-31 16:08:50
【问题描述】:

我正在使用 Kafka 设计一个 spark 流应用程序。我有几个问题如下: 我正在将 RDBMS 表中的数据流式传输到 kafka,并使用 Spark 消费者使用 Spark - SQL 来消费消息和处理

问题: 1. 我将数据从表中流式传输到 kafka(键作为表名,值作为 JSON 记录形式的表数据)——这是正确的架构吗?

  1. 在 spark 消费者中,我正在尝试使用 DStream.foreachRDD(x => 转换为 x RDD) 来使用数据——我遇到了这个问题(它说转换中的转换错误不允许......我正在尝试在 foreachRDD 函数中提取键以获取表名并使用 map 函数转换 x.values 以从 JSON 转换回普通字符串,然后将每条记录保存到 Spark-sql )

这种用于数据库流式传输的架构和设计是否可行?如何解决转换问题中的转换?

问候, 皮尤什·坎萨尔

【问题讨论】:

  • 在您遇到错误的地方分享您的代码将有助于更好地理解错误。如果可能,请分享代码 sn-p

标签: apache-spark apache-kafka apache-spark-sql spark-streaming


【解决方案1】:

我有一个类似的用例。

我使用 Nifi 从 RDBMS 视图中获取数据并放入 Kafka 主题。 我为具有多个分区的 Oracle 数据库中的每个视图都有一个主题。 使用 Nifi 将数据转换为 JSON 格式并放入 Kafka。

是否有任何要求对所有表数据使用相同的 kafka 主题?

以下代码将用于将数据持久保存到 Cassandra。

> val msg = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topicsSet)
>   
>     /* Process Records for each RDD */   Holder.log.info("Spark foreach Starts")
>        val data = msg.map(_._2) 
>        data.foreachRDD(rdd =>{
>        if(rdd.toLocalIterator.nonEmpty)    {
>     
>     
>       val messageDfRdd = sqlContext.read.json(rdd)
var data2=messageDfRdd .map(p => employee(p.getLong(1),p.getString(4),p.getString(0),p.getString(2),p.getString(3),p.getString(5)));

>  //code to save to Cassandra.   
>            }

【讨论】:

  • 我只是在构建一个 POC。所以我不确定我遵循的设计是否正确。我只是想根据一些键在 Dstream 的每个RDD 中处理一些数据。
  • code val ssc = new StreamingContext(sc, Seconds(10)) val sqlContext = new SQLContext(sc) val records:ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,Map("MozartComposerDB" -> 5),StorageLevel.DISK_ONLY) val recordsKey = records.transform(x => x.groupByKey().keys) val arr = sc.accumulator( "") // 新的 ArrayBuffer[String](); recordsKey.foreachRDD{key => // 根据每个 recordsKey 过滤记录 Dstream 中的数据 } ssc.start() ssc.awaitTermination() } }
  • 你可以直接使用reduceByKey或者GroupByKey。不需要转换。 val data = msg.groupByKey() val key = data.map(._1) val values = data.map(._2)
猜你喜欢
  • 2015-12-12
  • 1970-01-01
  • 1970-01-01
  • 2016-11-22
  • 2016-10-03
  • 2019-05-13
  • 2016-10-10
  • 2018-09-18
  • 2020-10-10
相关资源
最近更新 更多