【问题标题】:Storm-kafka: set startOffsetTime to kafka.api.OffsetRequest.LatestTime in apache Flux Yaml topologyStorm-kafka:在 apache Flux Yaml 拓扑中将 startOffsetTime 设置为 kafka.api.OffsetRequest.LatestTime
【发布时间】:2018-05-27 21:20:34
【问题描述】:

我正在使用 apache 通量研究拓扑。目前,strom 从头开始​​获取消息,但我希望它只从 kafka 获取最新消息。

我正在 YAML 文件中编写拓扑。

这就是我的 spoutConfig 的样子:

  - id: "stringScheme"
    className: "org.apache.storm.kafka.StringScheme"

  - id: "stringMultiScheme"
    className: "org.apache.storm.spout.SchemeAsMultiScheme"
    constructorArgs:
      - ref: "stringScheme"

  - id: "zkHosts"
    className: "org.apache.storm.kafka.ZkHosts"
    constructorArgs:
      - "172.25.33.191:2181"

  - id: "spoutConfig"
    className: "org.apache.storm.kafka.SpoutConfig"
    constructorArgs:
      - ref: "zkHosts"
      - "blockdata"
      - ""
      - "myId"
    properties:
      - name: "scheme"
        ref: "stringMultiScheme"

      - name: "ignoreZkOffsets"
        value: true

      - name: "startOffsetTime"
        ref: "XXXXXXXXX"

现在,我被困住了。如何将 startOffsetTime 设置为正确的函数以仅从 kafka 获取最新消息?

我试过 ref:"LatestTime",但无论我放什么,它都会给我错误:

java.lang.IllegalArgumentException: Can not set long field org.apache.storm.kafka.KafkaConfig.startOffsetTime to null value

【问题讨论】:

    标签: apache-kafka apache-storm


    【解决方案1】:

    我相信 Flux 可以处理调用静态工厂方法。

    - id: "startingOffsetTime"
      className: "kafka.api.OffsetRequest"
      factory: "LatestTime"
    

    然后在你的 SpoutConfig 定义中使用它

    properties:
      - name: "startOffsetTime"
        ref: "startingOffsetTime"
    

    我没有对此进行测试,但我认为它应该可以工作。调用静态工厂方法的能力不久前被合并https://issues.apache.org/jira/browse/STORM-2796,但文档中似乎缺少它。我提出了更新文档https://issues.apache.org/jira/browse/STORM-3086 的问题。

    如果您想查看此功能的示例,请查看https://github.com/apache/storm/blob/master/flux/flux-core/src/test/resources/configs/config-methods-test.yaml#L38

    【讨论】:

    • 我尝试了建议的解决方案,但它给出了一个错误:线程“main”中的异常无法为标签构造 java 对象:yaml.org,2002:org.apache.storm.flux.model .拓扑定义;异常=无法为 JavaBean=org.apache.storm.flux.model.TopologyDef@67e2d983 创建属性=组件;无法为 JavaBean=org.apache.storm.flux.model.BeanDef@4de5031f 创建属性=工厂;无法在类中找到属性“工厂”:org.apache.storm.flux.model.BeanDef 在“字符串”,第 2 行,第 1 列:名称:“kafka-topology”
    • 这是一个要点,显示了一个配置文件,它将正确设置 startOffsetTime gist.github.com/srdo/33fb8e2618fb650114504c90bc20d526。你想要的行是gist.github.com/srdo/…gist.github.com/srdo/…。我已经针对 Storm/Flux 1.2.1 对其进行了测试。可以试试吗?
    • 谢谢。我会试一试,然后告诉你。
    • 我正在做同样的事情,请在此处查看我的要点gist.github.com/obiii/178f699514d4994b7ec1005cd1062bbb 我的风暴版本是 Storm 1.2.1 你可以看到我的要点,我正在做同样的事情:/
    • 如果我针对旧版本的 Flux 尝试该示例,我会得到一个非常相似的错误。你能确认你使用的是 Flux 1.2.1 吗?
    猜你喜欢
    • 1970-01-01
    • 2023-04-02
    • 2019-06-25
    • 1970-01-01
    • 1970-01-01
    • 2016-08-11
    • 2018-10-17
    • 2021-09-28
    • 1970-01-01
    相关资源
    最近更新 更多