【问题标题】:Apache Flink - Unable to get data From TwitterApache Flink - 无法从 Twitter 获取数据
【发布时间】:2017-07-08 22:52:03
【问题描述】:

我正在尝试使用 Apache Flink 通过 Twitter Streaming API 获取一些消息。

但是,我的代码没有在输出文件中写入任何内容。我正在尝试计算特定单词的输入数据。

请查看我的示例:

import java.util.Properties

import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.twitter._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import com.twitter.hbc.core.endpoint.{Location, StatusesFilterEndpoint, StreamingEndpoint}
import org.apache.flink.streaming.api.windowing.time.Time

import scala.collection.JavaConverters._


//////////////////////////////////////////////////////
// Create an Endpoint to Track our terms
class myFilterEndpoint extends TwitterSource.EndpointInitializer with Serializable {
  @Override
  def createEndpoint(): StreamingEndpoint = {
    //val chicago = new Location(new Location.Coordinate(-86.0, 41.0), new Location.Coordinate(-87.0, 42.0))
    val endpoint = new StatusesFilterEndpoint()
    //endpoint.locations(List(chicago).asJava)
    endpoint.trackTerms(List("odebrecht", "lava", "jato").asJava)
    endpoint
  }
}

object Connection {
  def main(args: Array[String]): Unit = {

    val props = new Properties()

    val params: ParameterTool = ParameterTool.fromArgs(args)
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.getConfig.setGlobalJobParameters(params)
    env.setParallelism(params.getInt("parallelism", 1))

    props.setProperty(TwitterSource.CONSUMER_KEY, params.get("consumer-key"))
    props.setProperty(TwitterSource.CONSUMER_SECRET, params.get("consumer-key"))
    props.setProperty(TwitterSource.TOKEN, params.get("token"))
    props.setProperty(TwitterSource.TOKEN_SECRET, params.get("token-secret"))

    val source = new TwitterSource(props)
    val epInit = new myFilterEndpoint()

    source.setCustomEndpointInitializer(epInit)

    val streamSource = env.addSource(source)

    streamSource.map(s => (0, 1))
      .keyBy(0)
      .timeWindow(Time.minutes(2), Time.seconds(30))
      .sum(1)
      .map(t => t._2)
      .writeAsText(params.get("output"))

    env.execute("Twitter Count")
  }
}

关键是,我没有错误消息,我可以在我的仪表板上看到。我的来源正在向我的 TriggerWindow 发送数据。但它没有收到任何数据:

我一次有两个问题。

第一:为什么我的源在没有收到任何东西的情况下向我的 TriggerWindow 发送字节?

第二点:我的代码有问题,我无法从 twitter 获取数据吗?

【问题讨论】:

  • 第一个结果应在 2 分钟后写入(即窗口的长度)。你等了那么久吗? TriggerWindow 已经接收到数据,但是 43s 之后肯定不会有任何内容写入文件。你的代码看起来不错。
  • 嗨@DawidWysakowicz,是的,我等了那么久。 Accually 我运行这段代码 2 小时。我拿了这个问题的印刷品。但是 Flink 没有输出 :(

标签: scala twitter streaming twitter4j apache-flink


【解决方案1】:

您的应用程序源没有将实际记录发送到您可以通过查看发送的记录列看到的窗口。发送的字节属于 Flink 在任务之间时不时发送的控制消息。更具体地说,它是 LatencyMarker 消息,用于测量 Flink 作业的端到端延迟。

代码对我来说看起来不错。我什至试用了您的代码并为我工作。因此,我得出结论,Twitter 连接凭据一定有问题。请重新检查您是否输入了正确的凭据。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-09-22
    • 1970-01-01
    • 2018-05-13
    • 1970-01-01
    • 1970-01-01
    • 2014-09-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多