【问题标题】:Transparent Streaming & Batch processing透明流和批处理
【发布时间】:2019-04-26 23:28:39
【问题描述】:

我对流和批处理的世界还很陌生,并且试图理解概念和语言。诚然,我的问题的答案很可能众所周知,很容易找到,甚至在 SO 上回答了一百次,但我找不到它。

背景:

我正在从事一个大型科学项目(核聚变研究),我们在实验运行期间产生了大量的测量数据。这些数据主要是带有纳秒时间戳标记的样本流,其中样本可以是任何东西,从单个的 ADC 值,通过这样的数组,通过深度结构化的数据(从 1 位布尔值到 64 位双精度的多达数百个条目浮动)到原始高清视频帧甚至字符串文本消息。如果我理解正确的常用术语,我会在很大程度上将我们的数据视为“表格数据”。

我们主要使用自制软件解决方案,从数据采集到简单的在线(流式)分析(如缩放、二次采样等)到我们自己的数据存储、管理和访问设施。

鉴于操作的规模和维护所有这些实施的工作量,我们正在研究使用标准框架和工具来完成更多任务的可能性。

我的问题:

尤其是在这个阶段,我们面临着对实时/在线/实时数据以及“历史”数据的“事后”离线/批量分析越来越复杂(自动和手动)数据分析的需求.在此努力中,我试图了解现有的分析框架(如 Spark、Flink、Storm 等)(可能由 Kafka、Pulsar 等消息队列支持)是否以及如何支持一种场景,其中

  • 数据流入/流入平台/框架,附加标识符,如 URL 或 ID 等
  • 平台与集成或外部存储交互,以保存与标识符关联的流数据(多年)
  • 分析流程现在可以透明地查询/分析由标识符和任意(打开或关闭)时间窗口寻址的数据,并且框架提供数据批次/样本用于分析,无论是来自后端存储还是来自数据采集的实时分析

简单地将在线数据流式传输到存储中并从那里查询似乎没有选择,因为我们需要原始数据和分析数据来进行实时监控和实验的实时反馈控制。 此外,让用户以不同的方式查询实时输入信号或历史批次也不理想,因为我们的物理学家大多不是数据科学家,我们希望让这些“技术”远离他们,理想情况下使用完全相同的算法应该用于分析新的实时数据和以前实验的旧存储数据。

网站注释:

  • 我们谈论的是每秒 10 千兆比特范围内的 peek 数据负载,这些负载会以秒到几分钟的长度突然增加 - 候选人可以处理吗?
  • 我们正在使用纳秒分辨率的时间戳,甚至考虑 pico - 如果我理解正确,这会对可能的候选者列表造成一些限制?

如果有人能够理解我的问题并为我阐明这个话题,我会非常高兴:-)

非常感谢和亲切的问候, 别波

【问题讨论】:

    标签: apache-spark apache-flink apache-storm apache-pulsar


    【解决方案1】:

    我认为没有人可以说“是的,框架 X 绝对可以处理您的工作量”,因为这在很大程度上取决于您对消息处理的需求,例如关于消息传递的可靠性,以及如何对数据流进行分区。

    您可能对BenchmarkingDistributedStreamProcessingEngines 感兴趣。这篇论文使用的是几年前的 Storm/Flink/Spark 版本(看起来它们是在 2016 年发布的),但也许作者愿意让你使用他们的基准来评估这三个框架的新版本?

    流式分析的一个非常常见的设置是数据源 -> Kafka/Pulsar -> 分析框架 -> 长期数据存储。这将处理与数据摄取分离,让您可以像处理新数据一样重新处理历史数据。

    我认为你的第一步应该是看看你是否可以通过 Kafka/Pulsar 获得你需要的数据量。要么手动生成一个测试集,要么从你的生产环境中获取一些你认为可能具有代表性的数据,然后看看你是否能以你需要的吞吐量/延迟将它通过 Kafka/Pulsar。

    记得考虑对数据进行分区。如果您的某些数据流可以独立处理(即排序无关紧要),则不应将它们放在相同的分区中。例如,可能没有理由混合传感器测量和视频馈送流。如果您可以将数据分成独立的流,那么您就不太可能在 Kafka/Pulsar 和分析框架中遇到瓶颈。单独的数据流还可以让您更好地并行化分析框架中的处理,因为您可以运行例如不同机器上的视频馈送和传感器处理。

    一旦您知道是否可以通过 Kafka/Pulsar 获得足够的吞吐量,您应该为这 3 个框架中的每一个编写一个小示例。首先,我会从 Kafka/Pulsar 接收和删除数据,这应该让您及早知道 Kafka/Pulsar -> 分析路径中是否存在瓶颈。之后,您可以扩展示例以使用示例数据做一些有趣的事情,例如进行一些处理,就像您在生产中可能想做的那样。

    您还需要考虑您的数据流需要哪些类型的处理保证。通常,您会为保证至少一次或完全一次处理而支付性能损失。对于某些类型的数据(例如视频源),偶尔丢失消息可能是可以的。一旦确定了所需的保证,您就可以适当地配置分析框架(例如,在 Storm 中禁用 acking),并尝试对您的测试数据进行基准测试。

    只是为了更明确地回答您的一些问题:

    实时数据分析/监控用例听起来非常适合 Storm/Flink 系统。将其直接连接到 Kafka/Pulsar,然后执行您需要的任何分析,这听起来很适合您。

    历史数据的重新处理将取决于您需要执行哪种查询。如果您只需要一个时间间隔 + id,您可以使用 Kafka 加上一个过滤器或适当的分区来做到这一点。 Kafka 允许您在特定的时间戳开始处理,如果您的数据按 id 分区或者您将其作为分析的第一步进行过滤,您可以从提供的时间戳开始并在您在时间窗口之外遇到消息时停止处理。这仅适用于您感兴趣的时间戳是消息添加到 Kafka 的时间。我也不相信 Kafka 支持它生成的时间戳低于毫秒的分辨率。

    如果您需要执行更高级的查询(例如,您需要查看传感器生成的时间戳),您可以考虑使用 CassandraElasticsearchSolr 作为您的永久数据存储。您还需要研究如何将这些系统中的数据返回到您的分析系统中。例如,我相信 Spark 附带了一个用于从 Elasticsearch 读取的连接器,而 Elasticsearch 为 Storm 提供了一个连接器。您应该检查您的数据存储/分析系统组合是否存在这样的连接器,或者是否愿意编写自己的连接器。

    编辑:详细回答您的评论。

    我不知道 Kafka 或 Pulsar 支持用户指定的时间戳,但果然,他们bothdo。我没有看到 Pulsar 支持亚毫秒时间戳?

    你所描述的想法,卡夫卡绝对可以支持。

    您需要的是能够在特定时间戳启动 Kafka/Pulsar 客户端并向前读取。 Pulsar 似乎还不支持这一点,但 Kafka 支持。

    您需要保证当您将数据写入分区时,它们按时间戳顺序到达。这意味着您不被允许,例如使用时间戳 10 编写第一条消息 1,然后使用时间戳 5 编写消息 2。

    如果您可以确保为 Kafka 编写消息,那么您描述的示例将起作用。然后你可以说“从时间戳开始'昨晚午夜'”,卡夫卡将从那里开始。当实时数据进入时,它将接收它并将其添加到其日志的末尾。当消费者/分析框架读取了从午夜到当前时间的所有数据时,它将开始等待新的(实时)数据到达,并在它进来时对其进行处理。然后您可以在分析框架中编写自定义代码以确保它在到达带有时间戳“明天晚上”的第一条消息时停止处理。

    关于对亚毫秒时间戳的支持,我认为 Kafka 或 Pulsar 不会开箱即用地支持它,但您可以相当轻松地解决它。只需将亚毫秒时间戳作为自定义字段放入消息中即可。当您想从例如开始时时间戳 9ms 10ns,您要求 Kafka 从 9ms 开始,并使用分析框架中的过滤器删除 9ms 到 9ms 10ns 之间的所有消息。

    【讨论】:

    • 非常感谢您的有益而全面的回答!一件事我还不清楚,也许你也可以在这里帮助我? - 如果我们,例如,连接 pulsar(因为它似乎支持 sub-micro)和 Flink(我认为不支持 sub-micro,但仅作为示例),该组合是否可以处理分析过程(通过提供所需的抽象)问“好的,给我所有的数据一个主题 #4711 从昨天午夜开始直到明天晚上”,框架首先交付存储的数据和他们进来的liev 数据?非常感谢,贝波
    • 更新答案。
    • 小澄清:分析框架的选择与您评论中的问题无关,它仅取决于您的消息系统是否支持从时间戳开始,以及您是否按顺序摄取消息时间戳。
    • 再次感谢斯蒂格!我知道 pulsar 可以支持 sub-micro,因为 event_time 的含义似乎是应用程序定义的,我们可以将 64 位 nano-timestamps 放在那里,但另一方面,pulsars event_time 似乎只是你描述的解决方法卡夫卡。谢谢,你对我的理解帮助很大!
    【解决方案2】:

    请允许我添加以下关于 Apache Pulsar 如何帮助满足您的一些需求的建议。值得深思。

    “数据正在流入/流入平台/框架,附加一个标识符,如 URL 或 ID 等”

    您可能想查看Pulsar Functions,它允许您编写简单的函数(在 Java 或 Python 中),这些函数可以在发布到主题的每条消息上执行。它们非常适合这种类型的数据增强用例。

    平台与集成或外部存储交互,以保存与标识符关联的流数据(多年)

    Pulsar 最近添加了tiered-storage,允许您将事件流保留在 S3、Azure Blob Store 或 Google Cloud 存储中。这将使您可以将数据保存在廉价且可靠的数据存储中多年

    分析流程现在可以透明地查询/分析由标识符和任意(打开或关闭)时间窗口寻址的数据,并且该框架从后端存储或从数据采集中实时提供用于分析的数据批次/样本

    Apache Pulsar 还为 Presto 查询引擎添加了integration,它允许您查询给定时间段内的数据(包括来自分层存储的数据)并将其放入主题中进行处理。

    【讨论】:

    • 感谢您的信息!我是否正确地假设您参与了 Pulsar 开发?我想为我们的 szenario (see thsi question) 添加一个关于脉冲星使用的问题:脉冲星(例如 Presto 引擎)能否处理纳秒级分辨率事件时间?再次感谢!
    • Pulsar 用于 SQL 查询的 Presto 查询引擎支持以下数据类型。 prestodb.github.io/docs/current/language/types.html,我认为固定精度的十进制数将是存储纳秒分辨率时间的好选择,因为它支持高达 38 位的精度。如果这不起作用,插件可以提供其他类型,所以理论上你可以编写自己的数据类型。
    猜你喜欢
    • 2010-10-14
    • 1970-01-01
    • 1970-01-01
    • 2010-10-15
    • 1970-01-01
    • 2017-02-01
    • 1970-01-01
    • 1970-01-01
    • 2018-01-11
    相关资源
    最近更新 更多