【问题标题】:Should Storm Spouts only emit output using the thread calling Spout.nextTuple?Storm Spouts 是否应该只使用调用 Spout.nextTuple 的线程发出输出?
【发布时间】:2014-02-15 06:57:11
【问题描述】:

ISpout.nextTuple() javadoc 指定 nextTuple()ack(...)fail(...) 在同一个线程上调用。

但是,调用emit(...) 的实际收集器是较早提供的,作为open(..., collector) 上的参数。

问题是看到一些新数据的后台线程是否必须始终将数据排入队列以供 nextTuple() 出列和发出。如果后台线程立即发出数据会发生什么?支持吗?如果允许,在nextTuple() 中实现“短时间睡眠”的推荐方法是什么?

【问题讨论】:

    标签: multithreading apache-storm


    【解决方案1】:

    nextTuple()/ack()/fail() 方法在同一个线程上调用的隐含含义是,任务(后台 Java 线程)在机器“A”上运行,它发出元组是相同的任务,在“A”上运行ack()/fail() 的调用取决于处理拓扑中元组的成功/失败(由运行在“B”或“C”的 Bolt 处理)。

    只要 messageId 不为 null 并且 Bolt 任务在 execute() 方法中调用 ack(tuple),Storm 框架就会跟踪拓扑内的 tuple 遍历并调用 tuple 的 ack()/fail()拥有任务。

    在回答您的问题之前,这里简要介绍一下后台任务线程的工作原理。后台任务线程具有用于发出的元组的内存结构/缓冲区以及用于状态/待处理的元组等的一些其他内存结构。当 Spout/Bolt 开始发出数据时,缓冲区被填满,这个缓冲区被释放为当元组被处理时,即在调用 ack()/fail() 之后。本质上,当缓冲区空闲时后台线程调用nextTuple(),一旦缓冲区满,后台线程停止调用nextTuple()。简单来说,open()/nextTuple()/close() 中的 emit() 方法填充后台线程缓冲区,ack()/fail() 释放缓冲区。

    通过上面的解释,后台线程不知道新的/传入的数据。由 nextTuple() 中的逻辑从源(Twitter/JMS 提供者/ESB/AMQP 兼容服务器/RDBMS)读取数据并发出数据。因此,根据后台线程的缓冲区大小,Storm 调用 nextTuple() 如上所述。

    对于其他问题,如果需要,应该可以短时间睡觉。请注意,nextTuple() 不需要发出值,它可以返回任何内容。

    【讨论】:

    • 很好的答案。请您添加对一些设计文档的引用,或者指出支持此解释的重要代码段?我问只是因为底层基础设施没有理由无法跟踪通过收集器发出的数据,或者有一个未绑定的缓冲区或阻塞emit(...) 调用。
    • executor.clj调用了'nextTuple()'方法,请看executor.clj第462行的源代码。在同一来源的第 456 行提到了评论
    【解决方案2】:

    据我了解,除非 Storm 通过调用 nextTuple() 方法请求,否则不应发出数据。因此,您的后台线程必须将新数据排入队列,以便在请求时发出。仅当调用该方法时没有要发出的元组时,您的 nextTuple() 方法才应该短暂休眠。

    【讨论】:

    • 谢谢。这也是我的直觉,但我没有找到任何明确说明这一点的东西。你有参考引用吗?与此同时,我正在假设这个限制继续工作。
    • 好问题。根据我的阅读,我相当确定只能通过 nextTuple() 发出数据。然而,在寻找一个明确的声明时,我注意到 ISpout.open() 的 collector 参数描述中的以下文本:"Tuples 可以随时发出,包括 open 和关闭方法”。所以我认为你应该在数据到达后尝试发射。如果是这样,您能在这里报告这种测试的结果吗?提前致谢!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-12-01
    • 1970-01-01
    • 2022-06-10
    • 1970-01-01
    • 1970-01-01
    • 2013-08-14
    相关资源
    最近更新 更多