【问题标题】:Spring Cloud Data Flow topics creation for Kafka为 Kafka 创建 Spring Cloud Data Flow 主题
【发布时间】:2019-12-14 19:09:30
【问题描述】:

我有自己的带有 Python 的 Spring Cloud Data Flow 处理器,我使用这个示例作为指导:https://dataflow.spring.io/docs/recipes/polyglot/processor/。 然后我想扩展并创建三个这样的处理器,所以我使用spring.cloud.deployer.myApp.count=3 创建了 3 个内部带有 Python 的 Pod。 我稍微修改了示例中的代码:当我创建一个 Kafka 消费者时,我还传递了一个组 id,所以消息应该是负载均衡的。

consumer = KafkaConsumer(get_input_channel(), group_id=get_consumer_group(), bootstrap_servers=[get_kafka_binder_brokers()])

问题是 SCDF 创建了一个只有 1 个分区的 Kafka 主题,因此消息只能到达一个 pod。 所以我想知道:

  • 我是否应该以某种方式配置 SCDF 以创建具有 3 个分区的 Kafka 主题?
  • 或者我不应该依赖 SCDF 并在 Python 中自己创建主题?我想这将是多余的,因为 SCDF 也创建了这个主题。
  • SCDF 中的哪个组件实际上负责创建 Kafka 主题?我如何影响它的分区数量?
  • 如果我停止此流并使用 4 个处理器步骤再次启动,是否应该使用第 4 个分区扩展主题?因为目前没有创建新分区。

【问题讨论】:

    标签: python spring kubernetes apache-kafka spring-cloud-dataflow


    【解决方案1】:

    请花点时间查看 Spring Cloud Data Flow 的 responsibilities。如果不清楚,SCDF 既不会与支持消息传递中间件(如 Kafka)进行交互,也不会在运行时使用它。换句话说,SCDF 不会创建与之关联的主题或分区——它只是自动配置 Spring Cloud Stream (SCSt) 属性。

    但是,如果您在自定义处理器中使用 SCSt,框架会自动将所需通道绑定到中间件中的底层主题。该框架还具有更改分区行为的功能。您也可以使用过度分区主题部署处理器。有several other configuration options 来构建所需的流式数据处理行为。

    您正在查看的 Python 示例不具备 SCSt 提供的所有功能。该秘籍是一个示例演练,展示了人们如何在 Python 中构建本机处理器风格的应用程序,其中生产者和消费者配置是在 Python 代码本身中手动创建的。 SCDF 和 SCSt 都不会影响这个秘籍中的应用程序行为。

    我是否应该以某种方式配置 SCDF 以创建具有 3 个分区的 Kafka 主题?

    如前所述,SCDF 不与 Kafka 交互。

    或者我不应该依赖 SCDF 并在 Python 中自己创建主题?我想这将是多余的,因为 SCDF 也创建了这个主题。

    如果您的自定义处理器不是 Spring Cloud Stream 应用程序,是的,您有责任在代码中明确定义主题 + 分区。

    SCDF 中的哪个组件实际上负责创建 Kafka 主题?我如何影响它的分区数量?

    春天云流。见上面的解释。

    如果我停止此流并使用 4 个处理器步骤再次启动,是否应该使用第 4 个分区扩展主题?因为目前没有创建新的分区。

    您不一定需要重新启动流数据管道。如果您的主题预先过度分区,是的,运行时的任何其他消费者都应该能够自动参与竞争消费者关系。密切关注spring-io/dataflow.spring.io#156 — 我们正在添加一个配方来演示使用 SCSt + SCDF + Kafka 进行手动和自动缩放的可能性。

    【讨论】:

      【解决方案2】:

      能够通过将以下代码引入 Python 容器启动脚本来解决此问题,改进了 https://dataflow.spring.io/docs/recipes/polyglot/processor/ 中提供的代码。使用 SCDF 服务器传递的参数来获取代理 URL、主题名称、实例数:

      admin_client = KafkaAdminClient(bootstrap_servers=[get_kafka_binder_brokers()], client_id=sys.argv[0])
      
      partition_count = get_cmd_arg("spring.cloud.stream.instanceCount")
      
      # create Kafka topic if does not exist
      new_topic = NewTopic(name=get_input_channel(), num_partitions=partition_count, replication_factor=1)
      try:
          admin_client.create_topics(new_topics=[new_topic])
      except TopicAlreadyExistsError:
          logging.info(f"Topic {get_input_channel()} was already created")
      
      # add Kafka partitions to existing topic
      new_partitions = NewPartitions(total_count=partition_count)
      try:
          admin_client.create_partitions(topic_partitions={get_input_channel(): new_partitions})
      except InvalidPartitionsError as exp:
          logging.info(f"No need to increase Kafka partitions for topic {get_input_channel()}")
      

      【讨论】:

        猜你喜欢
        • 2022-12-10
        • 2018-04-06
        • 2020-08-12
        • 1970-01-01
        • 1970-01-01
        • 2023-03-24
        • 1970-01-01
        • 1970-01-01
        • 2018-04-27
        相关资源
        最近更新 更多