【问题标题】:Apache Beam Combine grouped valuesApache Beam 组合分组值
【发布时间】:2017-05-26 18:21:12
【问题描述】:

我正在尝试寻找一种方法来重新排序我的 Kafka 消息,并使用 Apache BeamGoogle DataFlow 将有序消息发送到新主题。

我有发送以下格式的字符串消息的 Kafka 发布者: {system_timestamp}-{event_name}?{parameters}

例如:

1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3

我想做的是根据消息的 {system-timestamp} 部分并在 5 秒的窗口内重新排序事件,因为我们的发布者不保证消息会在符合 {system-timestamp} 值。

我编写了一个模拟排序器函数,用于对从 Kafka 接收到的事件进行排序(使用 KafkaIO 源):

static class SortEventsFunc extends DoFn<KV<String, Iterable<String>>, KV<String, Iterable<String>>> {

   @ProcessElement
   public void processElement(ProcessContext c) {
       KV<String, Iterable<String>> element = c.element();

       System.out.println("");
       System.out.print("key: " + element.getKey() + ";");

       Iterator<String> it = element.getValue().iterator();
       List<String> list = new ArrayList<>();
       while (it.hasNext()) {
           String val = it.next();
           System.out.print("value: " + val);
           list.add(val);
       }
       Collections.sort(list, Comparator.naturalOrder());
       c.output(KV.of(element.getKey(), list));
   }
 }

public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();

    DirectOptions directOptions = options.as(DirectOptions.class);
    directOptions.setRunner(DirectRunner.class);

    // Create the Pipeline object with the options we defined above.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // read from Kafka
        .apply(KafkaIO.<String,String>read()
            .withBootstrapServers("localhost:9092")
            .withTopics(new ArrayList<>((Arrays.asList("events"))))
            .withKeyDeserializer(StringDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withoutMetadata())
        // apply window
        .apply(Window.<KV<String,String>>into(
                FixedWindows.of(Duration.standardSeconds(5L))))
        // group by key before sorting
        .apply(GroupByKey.<String, String>create()) // return PCollection<KV<String, Iterable<String>>
        // sort events
        .apply(ParDo.of(new SortEventsFunc()))
        //combine KV<String, Iterable<String>> input to KafkaIO acceptable KV<String, String> format
        .apply(Combine.perKey()) //:TODO somehow convert KV<String, Iterable<String>> to KV<String, String>
        // write ordered events to Kafka
        .apply(KafkaIO.<String, String>write()
                .withBootstrapServers("localhost:9092")
                .withTopic("events-sorted")
                .withKeySerializer(StringSerializer.class)
                .withValueSerializer(StringSerializer.class)
            );
    pipeline.run();
}

所以我使用GroupByKey.&lt;String, String&gt;create() 转换对消息进行分组,在排序事件之后,我需要以某种方式将它们从KV&lt;String, Iterable&lt;String&gt;&gt; 转换为KafkaIO 接受的KV&lt;String, String&gt; or KV&lt;Void, String&gt; 值。 所以我想要做的就是忽略通过分组转换键创建的并且简单地 将每个值作为单独的消息传递给 KafkaIO 编写器

我探索了Combine#perKey 变换,但它接受 SerializableFunction,它只能将所有值组合成一个字符串。(带有一些分隔符),因此我只传递一个值作为一个连接字符串而不是每个值(由 KafkaIO#read() 读取)到 KafkaIO 写入器。

【问题讨论】:

    标签: java google-cloud-platform google-cloud-dataflow apache-beam dataflowtask


    【解决方案1】:

    其实很简单! 这里的诀窍是,您可以在 @ProcessElement 方法内任意多次调用 c.output

    因此,在您的情况下,只需定义一个 DoFn&lt;KV&lt;String, Iterable&lt;String&gt;&gt;, KV&lt;String, String&gt;&gt;,遍历 c.element().getValue() 集合,然后为每个集合调用 c.output

    【讨论】:

      猜你喜欢
      • 2021-11-10
      • 1970-01-01
      • 2018-08-28
      • 1970-01-01
      • 1970-01-01
      • 2019-06-04
      • 1970-01-01
      • 1970-01-01
      • 2018-01-05
      相关资源
      最近更新 更多