【问题标题】:How to modify message from Twitter API saved in one kafka topic and send it to another kafka topic如何从 Twitter API 修改保存在一个 kafka 主题中的消息并将其发送到另一个 kafka 主题
【发布时间】:2021-03-18 06:48:30
【问题描述】:

我创建了 Kafka 生产者,它使用 hbc-core 从 Twitter API 生成消息到一个主题,我想修改这些消息,因为我只需要几个字段,如推文创建时间、id 字符串、一些关于用户和文本的基本信息从那条推文。我尝试使用 Kafka Streams 和 POJO 模型,但在提取文本时遇到问题,因为全文可能位于不同的命名字段中,具体取决于推文是否被转发、是否有 140 多个标志等。 我的 POJO 模型:

  "type": "object",
  "properties": {
    "created_at": { "type": "string" },
    "id_str": { "type": "string" },
    "user": {
      "type": "object",
      "properties": {
        "location": { "type": "string" },
        "followers_count": { "type": "integer" },
        "friends_count": { "type": "integer" },
        "created_at": { "type": "string" }
      }
    },
    "text": { "type": "string" }
  }
}

这是使用 Kafka Streams 的正确方法,还是有更好的解决方案来提取这些字段并放入另一个主题?

【问题讨论】:

    标签: java json twitter apache-kafka apache-kafka-streams


    【解决方案1】:

    不需要中间客户端、系统、kafka 流花哨的工具或奇迹框架,专注于旧的简单方法:重用该生产者以及生成和发送完整的 POJOS。

    Producers 是线程安全的,因此使用相同的生产者实例通过不同的线程生产两个或多个主题是完全可以的。

    这只是一个简化,因为我不知道你的实现细节。我假设消息(POJO)是一个简单的String。使用一些想象力,相信不同的字母是字段。从fullPojo,您想向另一个主题发送仅包含两个字段(表示为yv)的消息。

      String fullPojo = "xxxxxyxxxxv";
      //some logic to extract the desired fields
      String shortPojo = getDesiredFields(fullPojo);
      /* shortPojo="yv" */
    

    在您的 Kafka 集群上创建一个新主题,在本例中,它将被称为 shortPojoTopic

    只需使用相同的producer 将完整数据发送到您的原始主题,方法是进行第二次调用,以便使用仅包含过滤值的消息填充短主题:

    producer.send(new ProducerRecord<String, String>(fullPojoTopic,  fullPojo));
    producer.send(new ProducerRecord<String, String>(shortPojoTopic, shortPojo));
    

    第二次调用也可以从另一个辅助线程完成。如果您希望在这里完成多线程,您可以定义第二个线程来执行“过滤”工作。只需将原始 producer 引用传递给第二个线程,并将两个线程与包含 fullPojos 的 FIFO 结构 (deques, queues,...) 链接。

    • 原线程将fullPojo发送到fullPojoTopic主题,并将fullPojo推入队列。
    • 此辅助“过滤器”线程将从队列/双端队列中删除顶部消息,提取创建 shortPojo 所需的字段,并将其发送到 shortPojoTopic使用相同的 producer,无需担心生产者同步问题)。

    如果主题之一处于错误状态并且无法接受更多消息,或者主题之一位于刚刚失败的不同 Kafka 集群上(在这种情况下,您也需要两个不同的生产者),或者即使过滤过程在过滤某些格式错误的消息时发现了一些困难。 例如,即使shortPojoTopic out,也不会影响第一个线程的性能,因为它将继续发送他的 fullPojos 而不会出现问题/延迟。

    时刻注意内存使用:队列/双端队列的大小应该以某种方式限制/控制,以避免第二个线程被卡住很长时间,或者如果它不能跟随节奏时出现 OOM的第一个线程。如果发生这种情况,它将无法足够快地读取/删除消息,从而产生可能导致上述 OOM 问题的延迟

    此外,即使没有主题/代理存在问题,这种分离也将提高总体性能,因为原始线程不必等待每次迭代时在其线程中发生的过滤过程。

    第一个线程只是发送 POJO;第二个线程只是过滤并发送短 POJO。 简单的职责,全部并行。

    假设您可以控制生产者及其发送的内容,我建议将逻辑直接放在那里,以避免其他中间系统(流,...)。只需提取核心代码中的字段并使用相同的生产者将恢复的 Pojo 生成到另一个主题。只使用一个线程或尽可能多地使用。

    我打赌我自己的房子和右手,这比你能想到的任何流实用程序都要快得多。

    如果您无权访问该代码,您可以创建一个中间消费者-生产者服务,在下一节继续。


    • 如果原始 POJO 生成和生产的代码不可访问

    如果您只能访问完整的 POJO 主题,而不能访问上一步(生成消息并将它们发送到主题的代码),则第二个选项可能是创建一个中间 kafka 消费者-生产者,它使用来自fullPojoTopic 的消息,提取字段并将过滤的shortPojo 生成到shortPojoTopic

    请注意,逻辑与第一种方法中的逻辑相同,但此解决方案意味着更多的资源浪费新的生产者和消费者线程(相信我,它们创建了一个大量的辅助线程),一个新的消费者组来管理,fullPOJO 消息在线上的双重传输,等等。

    我的意见是,仅当您无法直接访问以第一种方式生成和生成完整 POJOS 的代码,或者您希望更好地控制过滤完整 POJOS 的微服务时,才应使用此选项数据并将所需字段发送到另一个主题。

    【讨论】:

    • 首先感谢您的回复。正如我提到的,我在提取该推文的文本时遇到了问题。 Twitter API JSON 响应没有固定的形式/模型。它取决于许多因素,例如字符串的长度,例如全文可以嵌套在名为“extended_tweet”的对象中。这就是我在使用 POJO 模型时遇到问题的原因,因为我不知道如何在整个 JSON 中找到确切的字段。我已经创建了涵盖(我认为)所有文本场景的模型,但我不想使用 if/else 的墙来找到所需的。
    猜你喜欢
    • 1970-01-01
    • 2020-05-09
    • 1970-01-01
    • 1970-01-01
    • 2021-06-17
    • 2019-02-25
    • 2019-10-09
    • 1970-01-01
    • 2019-07-25
    相关资源
    最近更新 更多