【问题标题】:Aggregating Topics with apache beam Kafkaio (Dataflow)使用 Apache Beam Kafkaio (Dataflow) 聚合主题
【发布时间】:2019-03-11 22:14:17
【问题描述】:

我在压缩的 kafka 主题中有缓慢移动的数据,在另一个主题中也有快速移动的数据。

1) 快速移动的数据是从 Kafka 实时摄取的无限事件。

2) 慢速移动数据是元数据,用于丰富快速移动的数据。这是一个紧凑的主题,数据不经常更新(天/月)。

3) 每个快速移动的数据负载都应该有一个元数据负载,它具有相同的 customerId,可以与之聚合。

我想根据 customerId 汇总快速/慢速移动数据(在两个主题的数据中都很常见)。我想知道你将如何去做这件事?到目前为止:

PTransform<PBegin, PCollection<KV<byte[], byte[]>>> kafka = KafkaIO.<byte[], byte[]>read()
    .withBootstrapServers(“url:port")
    .withTopics([“fast-moving-data”, “slow-moving-data"])
    .withKeyDeserializer(ByteArrayDeserializer.class)
    .withValueDeserializer(ByteArrayDeserializer.class)
    .updateConsumerProperties((Map) props)
    .withoutMetadata();

我注意到我可以使用 .withTopics 并指定我想使用的不同主题,但在此之后我无法找到任何示例来帮助聚合。任何帮助,将不胜感激。

【问题讨论】:

  • 您能进一步解释一下您加入流的特征吗?例如一个流中有多少元素对应于另一个流中有多少元素?来自不同流的事件是否具有相似的事件时间?
  • 你知道延迟通常是多少吗?
  • 缓慢移动的数据包含很少变化的客户偏好,而快速移动的数据是客户与网站的交互并且非常频繁。许多快速移动的数据记录将被缓慢移动的数据客户记录丰富。联接将是 customerId 到 customerId。如果客户偏好在一段时间内没有改变,那么慢速移动数据和快速移动数据之间的事件时间可能相隔几天甚至更长。
  • 非常有趣。我认为这可以作为侧面输入来完成。我会尽量找人为你解答。

标签: apache-kafka google-cloud-dataflow avro apache-beam


【解决方案1】:

SO Q&A 中也讨论了以下模式,这对于您的用例来说可能是一个很好的探索模式。可能成为问题的一项是压缩的慢速移动流的大小。希望它有用。

对于这种模式,我们可以使用 GenerateSequence 源转换定期发出一个值,例如每天一次。 通过在每个元素上激活的数据驱动触发器将此值传递到全局窗口。 在 DoFn 中,使用此过程作为触发器从有界源中提取数据 创建 SideInput 以用于下游转换。

请务必注意,由于此模式使用全局窗口 SideInput 触发处理时间,因此与事件时间处理的元素的匹配将是不确定的。例如,如果我们有一个在事件时间窗口化的主管道,这些窗口将看到的 SideInput 视图的版本将取决于在处理时间而不是事件时间中触发的最新触发器。

同样重要的是要注意,通常 SideInput 应该是适合内存的东西。

Java(SDK 2.9.0):

在下面的示例中,sideinput 以非常短的间隔更新,这样可以很容易地看到效果。期望侧输入更新缓慢,例如每隔几个小时或一天一次。

在下面的示例代码中,我们使用了在 DoFn 中创建的 Map,它成为 View.asSingleton,这是此模式的推荐方法。

下面的示例说明了该模式,请注意 View.asSingleton 会在每次计数器更新时重建。

对于您的用例,您可以将 GenerateSequence 转换替换为 PubSubIO 转换。这有意义吗?

public static void main(String[] args) {

 // Create pipeline
 PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
     .as(PipelineOptions.class);

 // Using View.asSingleton, this pipeline uses a dummy external service as illustration.
 // Run in debug mode to see the output
 Pipeline p = Pipeline.create(options);

 // Create slowly updating sideinput

 PCollectionView<Map<String, String>> map = p
     .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))

     .apply(Window.<Long>into(new GlobalWindows())
         .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
         .discardingFiredPanes())

     .apply(ParDo.of(new DoFn<Long, Map<String, String>>() {
       @ProcessElement public void process(@Element Long input,
           OutputReceiver<Map<String, String>> o) {
         // Do any external reads needed here...
         // We will make use of our dummy external service.
         // Every time this triggers, the complete map will be replaced with that read from 
         // the service.
         o.output(DummyExternalService.readDummyData());
       }

     })).apply(View.asSingleton());

 // ---- Consume slowly updating sideinput

 // GenerateSequence is only used here to generate dummy data for this illustration.
 // You would use your real source for example PubSubIO, KafkaIO etc...
 p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
     .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
     .apply(Sum.longsGlobally().withoutDefaults())
     .apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {

       @ProcessElement public void process(ProcessContext c) {
         Map<String, String> keyMap = c.sideInput(map);
         c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());

  LOG.debug("Value is {} key A is {} and key B is {}"
, c.element(), keyMap.get("Key_A"),keyMap.get("Key_B"));

       }
     }).withSideInputs(map));

 p.run();
}

public static class DummyExternalService {

 public static Map<String, String> readDummyData() {

   Map<String, String> map = new HashMap<>();
   Instant now = Instant.now();

   DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");

   map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
   map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());

   return map;

 }
}

【讨论】:

    【解决方案2】:

    我建议单独阅读这些主题,为管道创建两个不同的输入。您可以稍后交叉/加入它们。跨越它们的方法是提供缓慢移动的流作为热路径的侧输入(快速移动的 PCollection 的转换)。

    请看这里:https://beam.apache.org/documentation/programming-guide/#side-inputs

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-04-29
      • 1970-01-01
      • 2020-04-27
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多