【发布时间】:2018-11-07 10:46:35
【问题描述】:
我正在尝试使用 FlinkKafkaProducer010 生成元素,但是当我打开消费者控制台窗口时,元素似乎无序到达。
我使用以下方法创建了主题: kafka-topics.bat --create --topic mytopic --zookeeper localhost:2181 --partitions 1 --replication-factor 1
消费者是使用以下方法创建的:kafka-console-consumer.bat --zookeeper localhost:2181 --topic mytopic
我使用的 Kafka Producer 代码是:
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
if(parameterTool.getNumberOfParameters() < 2) {
System.out.println("Missing parameters!");
System.out.println("Usage: Kafka --topic <topic> --bootstrap.servers <kafka brokers>");
return;
}
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
DataStream<String> messageStream = env.addSource(getSourceFunction());
FlinkKafkaProducer010<String> producer = new FlinkKafkaProducer010<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties());
messageStream.addSink(producer);
env.execute("Kafka Producer");
}
public static SourceFunction<String> getSourceFunction() {
return new SourceFunction<String>() {
private static final long serialVersionUID = 6369260225318862378L;
public boolean running = true;
@Override
public void run(SourceContext<String> ctx) {
int counter = 0;
while (this.running && counter < 500) {
String data = "item " + Integer.toString(counter);
ctx.collect(data);
counter++;
}
}
@Override
public void cancel() {
running = false;
}
};
}
当我查看 Kafka 日志文件时,我看到一个 .log 文件,其中的元素也乱序。元素的排序使大约 10 个值的跳跃。在我的用例中,必须有正确的顺序。我一直在寻找如何确保元素按顺序到达,但到目前为止没有任何运气。 有什么我错过的东西可以修复排序吗?
提前感谢您的帮助!
【问题讨论】:
-
您已保证在分区中排序。你能以编程方式创建消费者并检查吗?
-
我已经在使用 FlinkKafkaConsumer010 并将 TimeCharacteristic 设置为 EventTime 来使用 Flink 中的元素。然而,这也有乱序到达,所以我回到了检查的基础,只检查 Kafka 和一个简单的 Kafka 控制台消费者。
标签: apache-kafka apache-flink kafka-producer-api