【问题标题】:Spark-Streaming from an Actor来自 Actor 的 Spark-Streaming
【发布时间】:2017-07-10 18:56:09
【问题描述】:

我想让消费者 Actor 订阅 Kafka 主题并流式传输数据,以便在消费者外部使用 Spark Streaming 进行进一步处理。为什么是演员?因为我读到它的主管策略将是处理 Kafka 故障(例如,在故障时重新启动)的好方法。

我找到了两个选择:

  • Java KafkaConsumer 类:其poll() 方法返回Map[String, Object]。我希望像 KafkaUtils.createDirectStream 一样返回 DStream,但我不知道如何从演员外部获取流。
  • 扩展ActorHelper 特征并使用actorStream(),如example 所示。后一个选项不显示与主题的连接,而是与套接字的连接。

谁能指出我正确的方向?

【问题讨论】:

    标签: scala apache-kafka spark-streaming actor


    【解决方案1】:

    为了处理 Kafka 故障,我使用了 Apache Curator 框架和以下解决方法:

    val client: CuratorFramework = ... // see docs
    val zk: CuratorZookeeperClient = client.getZookeeperClient
    
    /**
      * This method returns false if kafka or zookeeper is down.
      */ 
    def isKafkaAvailable:Boolean = 
       Try {
          if (zk.isConnected) {
            val xs = client.getChildren.forPath("/brokers/ids")
            xs.size() > 0
          }
          else false
        }.getOrElse(false)
    

    为了消费 Kafka 主题,我使用了 com.softwaremill.reactivekafka 库。例如:

    class KafkaConsumerActor extends Actor {
       val kafka = new ReactiveKafka()
       val config: ConsumerProperties[Array[Byte], Any] = ... // see docs
    
       override def preStart(): Unit = {
          super.preStart()
    
          val publisher = kafka.consume(config)
          Source.fromPublisher(publisher)
                .map(handleKafkaRecord)
                .to(Sink.ignore).run()
       }
    
       /**
         * This method will be invoked when any kafka records will happen.
         */
       def handleKafkaRecord(r: ConsumerRecord[Array[Byte], Any]) = {
          // handle record
       }
    }
    

    【讨论】:

      猜你喜欢
      • 2016-11-03
      • 1970-01-01
      • 2016-11-28
      • 1970-01-01
      • 1970-01-01
      • 2020-03-19
      • 2016-05-01
      • 2017-02-26
      • 1970-01-01
      相关资源
      最近更新 更多