【发布时间】:2015-12-25 06:21:23
【问题描述】:
我们目前在集群拓扑模式下使用 Apache Storm 0.9.5 来处理 Amazon Kinesis 记录 (spout) 并将它们存储到 Redshift 数据仓库 (bolt) 中。我们的 Storm 集群部署在 AWS 中,由 1 个 nimbus + UI 节点、1 个 zookeeper 节点和 3 个 supervisor + logviewer 节点组成。我们的拓扑配置支持处理多个 Kinesis 流以及它包含的每个流:
- 一个 Kinesis 流喷口用于侦听传入记录
- 一个 Redshift 螺栓可将记录插入数据仓库
拓扑:
final TopologyBuilder topologyBuilder = new TopologyBuilder();
// for every configured kinesis stream
final List<KinesisStreamSpout> kinesisStreamSpouts = kinesisStreamService.getKinesisStreamSpouts();
for (final KinesisStreamSpout kinesisStreamSpout : kinesisStreamSpouts) {
final String spoutId = kinesisStreamSpout.getSpoutId();
topologyBuilder.setSpout(spoutId, kinesisStreamSpout.getKinesisSpout());
// set the corresponding redshift bolt
final String streamName = kinesisStreamSpout.getStreamName();
final RedshiftBolt redshiftBolt = new RedshiftBolt(streamName);
topologyBuilder.setBolt(redshiftBolt.getId(),
redshiftBolt, stormProperties.getNumberOfWorkersPerStream()).shuffleGrouping(spoutId);
}
return topologyBuilder.createTopology();
系统的一个问题是它无法保证只处理一次输入消息,从而导致将具有相同业务密钥的多条记录插入目标数据库。为了了解问题的严重程度,我们进行了一项受控测试,发现大约有三分之一的输入记录被多次提交处理。
根据this thread(目前尚未得到答复),我们也考虑过使用 Trident 来保证一次性处理,但也得出结论,在系统中内置幂等性更为重要(以及至少一次语义)而不是像other article 所建议的那样增加复杂性、降低性能和生成状态。
我们现在正在就在现有拓扑中以支持集群的方式实现幂等性的最佳方式寻求建议。到目前为止,我们倾向于引入一个 RedisBolt,它可以通过元组消息 id 键值。是否存在使用 Apache Storm 实现此目的的现有模式?
【问题讨论】:
-
您确定,您在 Kinesis 中没有重复项吗? 1/3 的值在我看来非常高......
-
重复提交分析基于 Amazon Kinesis spout 组件从 ShardId:SequenceNumber 值构造的唯一 messageId。换句话说,同一业务负载的多个 Kinesis 记录将被系统分配一个不同的元组 messageId。
-
好吧,那么你的拓扑有问题...你有很多失败的元组吗?
-
我们的测试运行中没有任何失败的元组。但是我们得到的是重试相同的消息(使用 Kinesis 记录序列号锚定):` casksszInflightRecordTracker [INFO] Retrying使用分区键 1 序列号 49554939912789135525468194646830500145840472776150876162 记录。重试尝试 1 `
-
你能贴一张风暴 UI 的图片吗 -> 显示可视化(它应该显示拓扑图)。
标签: java amazon-redshift apache-storm trident