【发布时间】:2019-05-30 23:57:40
【问题描述】:
我目前正在使用 V2 api 实现两个自定义阅读器,用于 spark 结构化流作业。作业运行约 30-60 分钟后,它会爆炸:
Caused by: java.lang.RuntimeException: Offsets committed out of order: 608799 followed by 2982
我正在重新利用here 找到的示例,它正在第 206 行轰炸。
我没有使用示例中提供的 twitter 流,而是为 JMS 和 SQS 实现它。
我的问题是:有人遇到过这个问题吗?还是那个实现有问题?
代码sn-p:
override def commit(end: Offset): Unit = {
internalLog(s"** commit($end) lastOffsetCommitted: $lastOffsetCommitted")
val newOffset = TwitterOffset.convert(end).getOrElse(
sys.error(s"TwitterStreamMicroBatchReader.commit() received an offset ($end) that did not " +
s"originate with an instance of this class")
)
val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
if (offsetDiff < 0) {
sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
}
tweetList.trimStart(offsetDiff)
lastOffsetCommitted = newOffset
}
我无法通过我常用的网点找到答案。但是,我确实看到了this。提出的一点是删除检查点数据——这在生产系统中似乎不是一个可行的解决方案。另一个是源系统不维护偏移信息?我的印象是 spark 会自己处理偏移信息。如果这第二点是问题,我如何确保源系统处理这个范式。
如果我可以提供更多信息,请告诉我。
编辑:查看 MicroBatchReader 界面,提交文档说:
/**
* Informs the source that Spark has completed processing all data for offsets less than or
* equal to `end` and will only request offsets greater than `end` in the future.
*/
void commit(Offset end);
所以问题就变成了,为什么 spark 会向我发送已经提交的提交偏移量?
【问题讨论】:
标签: scala apache-spark amazon-emr spark-structured-streaming amazon-efs