【发布时间】:2017-11-16 08:43:16
【问题描述】:
美好的一天
我正在通过 flink/kafka 接收数据(流数据)。 我要连接的端口与我需要回写消息的端口相同
TCP/IP -> Flink/Kafka Consumer -> 处理数据 -> 将结果发送到 kafka 主题并返回 TCP/IP 连接
// 1. Connect to TCP Stream (TCP (Socket) -> Kafka Stream INPUT)
val consumer_stream = env.socketTextStream(url, port, '\n')
// 2. Processing Data
.....
// 3. Write result to kafka topic
consumer_stream.addSink(new FlinkKafkaProducer09[String](broker_url, topic_name, new SimpleStringSchema()))
// 4. Send result back to connected url ie.(Ref Step 1 URL) (url+port)
(This is where I need Assistance)
连接到 URL 和端口有效。我接收并处理数据 我写的主题 现在我还需要写回我连接的同一个url和端口。{因为Url和端口可以同时发送和接收数据}
我让它写到另一个端口
// write to Different PORT
val socket_write: DataStreamSink[String] = out_data.writeToSocket(url, diff_port, new SimpleStringSchema())
这行得通...问题是试图写入同一个端口。当我使用我正在读取的相同端口时... flink 作业失败
任何想法
问候
【问题讨论】:
-
或者我应该创建一个具有相同 url 的 kafka 生产者。问题是 TCP 连接只允许这么多连接,所以想法是使用相同的连接来接收数据
-
如果您想重用来自
SocketTextStreamFunction的输入TCP 连接,那么这并不容易,因为SocketTextStreamFunction不会暴露套接字。但是,打开一个新连接并写回数据应该可以工作。 Flink 失败的异常是什么? -
在 Flink Job Submission with same port for (Read and Write) 这些错误弹出: 1) org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. 2)原因:org.apache.flink.runtime.client.JobExecutionException:作业执行失败 3)原因:java.net.ConnectException:连接被拒绝(连接被拒绝)
-
使用不同端口进行读写时不会出现这些错误...适用于不同端口
-
感谢您的回复 :) @TillRohrmann
标签: scala apache-kafka apache-flink