【问题标题】:Apache Storm once-only processingApache Storm 一次性处理
【发布时间】:2015-12-25 06:21:23
【问题描述】:

我们目前在集群拓扑模式下使用 Apache Storm 0.9.5 来处理 Amazon Kinesis 记录 (spout) 并将它们存储到 Redshift 数据仓库 (bolt) 中。我们的 Storm 集群部署在 AWS 中,由 1 个 nimbus + UI 节点、1 个 zookeeper 节点和 3 个 supervisor + logviewer 节点组成。我们的拓扑配置支持处理多个 Kinesis 流以及它包含的每个流:

  • 一个 Kinesis 流喷口用于侦听传入记录
  • 一个 Redshift 螺栓可将记录插入数据仓库

拓扑:

final TopologyBuilder topologyBuilder = new TopologyBuilder();

// for every configured kinesis stream
final List<KinesisStreamSpout> kinesisStreamSpouts = kinesisStreamService.getKinesisStreamSpouts();
for (final KinesisStreamSpout kinesisStreamSpout : kinesisStreamSpouts) {
    final String spoutId = kinesisStreamSpout.getSpoutId();
    topologyBuilder.setSpout(spoutId, kinesisStreamSpout.getKinesisSpout());

    // set the corresponding redshift bolt
    final String streamName = kinesisStreamSpout.getStreamName();
    final RedshiftBolt redshiftBolt = new RedshiftBolt(streamName);
    topologyBuilder.setBolt(redshiftBolt.getId(),
        redshiftBolt, stormProperties.getNumberOfWorkersPerStream()).shuffleGrouping(spoutId);
}

return topologyBuilder.createTopology();

系统的一个问题是它无法保证只处理一次输入消息,从而导致将具有相同业务密钥的多条记录插入目标数据库。为了了解问题的严重程度,我们进行了一项受控测试,发现大约有三分之一的输入记录被多次提交处理。

根据this thread(目前尚未得到答复),我们也考虑过使用 Trident 来保证一次性处理,但也得出结论,在系统中内置幂等性更为重要(以及至少一次语义)而不是像other article 所建议的那样增加复杂性、降低性能和生成状态。

我们现在正在就在现有拓扑中以支持集群的方式实现幂等性的最佳方式寻求建议。到目前为止,我们倾向于引入一个 RedisBolt,它可以通过元组消息 id 键值。是否存在使用 Apache Storm 实现此目的的现有模式?

【问题讨论】:

  • 您确定,您在 Kinesis 中没有重复项吗? 1/3 的值在我看来非常高......
  • 重复提交分析基于 Amazon Kinesis spout 组件从 ShardId:SequenceNumber 值构造的唯一 messageId。换句话说,同一业务负载的多个 Kinesis 记录将被系统分配一个不同的元组 messageId。
  • 好吧,那么你的拓扑有问题...你有很多失败的元组吗?
  • 我们的测试运行中没有任何失败的元组。但是我们得到的是重试相同的消息(使用 Kinesis 记录序列号锚定):` casksszInflightRecordTracker [INFO] Retrying使用分区键 1 序列号 49554939912789135525468194646830500145840472776150876162 记录。重试尝试 1 `
  • 你能贴一张风暴 UI 的图片吗 -> 显示可视化(它应该显示拓扑图)。

标签: java amazon-redshift apache-storm trident


【解决方案1】:

如果您不想使用 Trident,您可能需要阅读以下有关“事务拓扑”的文章。这是 Trident 背后的概念,您仍然可以“手动”应用它。对于您的用例来说,这似乎是一个很好的模式:https://storm.apache.org/documentation/Transactional-topologies.html

此外,我想补充一点,Storm(与任何其他系统一样,如 Apache Flink [免责声明:我是 Flink 的提交者] 和 Apache Spark Streaming)只能保证在 系统。如果数据被转发到外部系统,只有当且仅当外部系统可以支持幂等操作时,才能实现精确一次。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-12-28
    • 2015-07-07
    相关资源
    最近更新 更多