【问题标题】:Reading and Writing From Same Socket in Flink / Kafka在 Flink / Kafka 中从同一个套接字读取和写入
【发布时间】:2017-11-16 08:43:16
【问题描述】:

美好的一天

我正在通过 flink/kafka 接收数据(流数据)。 我要连接的端口与我需要回写消息的端口相同

TCP/IP -> Flink/Kafka Consumer -> 处理数据 -> 将结果发送到 kafka 主题并返回 TCP/IP 连接

// 1. Connect to TCP Stream (TCP (Socket) -> Kafka Stream INPUT)
val consumer_stream = env.socketTextStream(url, port, '\n')

// 2. Processing Data
.....

// 3. Write result to kafka topic 
consumer_stream.addSink(new FlinkKafkaProducer09[String](broker_url, topic_name, new SimpleStringSchema()))

// 4. Send result back to connected url ie.(Ref Step 1 URL) (url+port)
(This is where I need Assistance)

连接到 URL 和端口有效。我接收并处理数据 我写的主题 现在我还需要写回我连接的同一个url和端口。{因为Url和端口可以同时发送和接收数据}

我让它写到另一个端口

// write to Different PORT
val socket_write: DataStreamSink[String] = out_data.writeToSocket(url, diff_port, new SimpleStringSchema())

这行得通...问题是试图写入同一个端口。当我使用我正在读取的相同端口时... flink 作业失败

任何想法

问候

【问题讨论】:

  • 或者我应该创建一个具有相同 url 的 kafka 生产者。问题是 TCP 连接只允许这么多连接,所以想法是使用相同的连接来接收数据
  • 如果您想重用来自SocketTextStreamFunction 的输入TCP 连接,那么这并不容易,因为SocketTextStreamFunction 不会暴露套接字。但是,打开一个新连接并写回数据应该可以工作。 Flink 失败的异常是什么?
  • 在 Flink Job Submission with same port for (Read and Write) 这些错误弹出: 1) org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. 2)原因:org.apache.flink.runtime.client.JobExecutionException:作业执行失败 3)原因:java.net.ConnectException:连接被拒绝(连接被拒绝)
  • 使用不同端口进行读写时不会出现这些错误...适用于不同端口
  • 感谢您的回复 :) @TillRohrmann

标签: scala apache-kafka apache-flink


【解决方案1】:

您可以使用自定义的SinkFunction 将数据写回 URL。

stream.addSink(new SinkFunction<String>() {
    // initialise the client to send the data
    public void invoke(String value) throws Exception {
        // send here.               
    }
}

SocketClientSink

env.socketTextStream("localhost", 5555).map(x => { println(x); x }).addSink(new SocketClientSink[String]("localhost", 5555, new SimpleStringSchema))

【讨论】:

  • 感谢您的回复,我已经更新了上面的代码(问题的最后一部分) - 我现在可以写入不同的端口,但我似乎无法写入同一个端口......您能否提供一个更详细的代码示例,以写入相同的端口?非常感谢
  • @SubZero 根据评论,我认为可能不是 Flink 引起的。尝试编写一个简单的套接字程序并将数据写回该 URL/PORT。
  • 我创建了一个基本的应用程序,它使用 scala 读取和写入到同一端口并且它确实有效。
  • @SubZero 我在本地使用套接字服务器和 Flink Streaming 作业进行了测试。它运行良好......这是示例代码:env.socketTextStream("localhost", 5555).map(x =&gt; { println(x); x }).addSink(new SocketClientSink[String]("localhost", 5555, new SimpleStringSchema))
  • 非常感谢@David -> 在接收器中添加 SocketClientSink 而不仅仅是写入套接字。非常感谢您的帮助
猜你喜欢
  • 1970-01-01
  • 2011-01-05
  • 2012-10-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-08-12
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多