【问题标题】:Apache beam stops to process PubSub messages after some time一段时间后,Apache Beam 停止处理 PubSub 消息
【发布时间】:2021-01-25 10:44:31
【问题描述】:

我正在尝试编写一个简单的 Apache Beam 管道(它将在 Dataflow 运行器上运行)来执行以下操作:

  • 从订阅中读取包含 GCS 上文件路径的 PubSub 消息。
  • 对于每条消息,读取与消息关联的文件中包含的数据(文件可以是各种格式(csv、jsonl、json、xml、...))。
  • 对每条记录进行一些处理。
  • 在 GCS 上写回结果。

我在消息上使用了 10 秒的固定窗口。由于传入的文件已经被分块(最大大小为 10MB),我决定不使用可拆分的 do 函数来读取文件,以避免增加无用的复杂性(尤其是对于不能轻易拆分成块的文件)。

这是一个简化的代码示例,它给出了与完整示例完全相同的问题:

package skytv.ingester

import java.io.{BufferedReader, InputStreamReader}
import java.nio.charset.StandardCharsets

import kantan.csv.rfc
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.io.{Compression, FileIO, FileSystems, TextIO, WriteFilesResult}
import org.apache.beam.sdk.io.gcp.pubsub.{PubsubIO, PubsubMessage}
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.transforms.DoFn.ProcessElement
import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, FixedWindows, PaneInfo, Window}
import org.apache.beam.sdk.transforms.{Contextful, DoFn, MapElements, PTransform, ParDo, SerializableFunction, SimpleFunction, WithTimestamps}
import org.apache.beam.sdk.values.{KV, PCollection}
import org.joda.time.{Duration, Instant}
import skytv.cloudstorage.CloudStorageClient
import skytv.common.Closeable
import kantan.csv.ops._
import org.apache.beam.sdk.io.FileIO.{Sink, Write}

class FileReader extends DoFn[String, List[String]] {

  private def getFileReader(filePath: String) = {
    val cloudStorageClient = new CloudStorageClient()
    val inputStream = cloudStorageClient.getInputStream(filePath)
    val isr = new InputStreamReader(inputStream, StandardCharsets.UTF_8)
    new BufferedReader(isr)
  }

  private def getRowsIterator(fileReader: BufferedReader) = {
    fileReader
      .asUnsafeCsvReader[Seq[String]](rfc
        .withCellSeparator(',')
        .withoutHeader
        .withQuote('"'))
      .toIterator
  }

  @ProcessElement
  def processElement(c: ProcessContext): Unit = {
    val filePath = c.element()

    Closeable.tryWithResources(
      getFileReader(filePath)
    ) {
      fileReader => {

        getRowsIterator(fileReader)
          .foreach(record => c.output(record.toList))

      }
    }


  }
}

class DataWriter(tempFolder: String) extends PTransform[PCollection[List[String]], WriteFilesResult[String]] {

  private val convertRecord = Contextful.fn[List[String], String]((dr: List[String]) => {
    dr.mkString(",")
  })

  private val getSink = Contextful.fn[String, Sink[String]]((destinationKey: String) => {
    TextIO.sink()
  })

  private val getPartitioningKey = new SerializableFunction[List[String], String] {
    override def apply(input: List[String]): String = {
      input.head
    }
  }

  private val getNaming = Contextful.fn[String, Write.FileNaming]((destinationKey: String) => {
    new Write.FileNaming {
      override def getFilename(
                                window: BoundedWindow,
                                pane: PaneInfo,
                                numShards: Int,
                                shardIndex: Int,
                                compression: Compression
                              ): String = {
        s"$destinationKey-${window.maxTimestamp()}-${pane.getIndex}.csv"
      }
    }
  })

  override def expand(input: PCollection[List[String]]): WriteFilesResult[String] = {
    val fileWritingTransform = FileIO
      .writeDynamic[String, List[String]]()
      .by(getPartitioningKey)
      .withDestinationCoder(input.getPipeline.getCoderRegistry.getCoder(classOf[String]))
      .withTempDirectory(tempFolder)
      .via(convertRecord, getSink)
      .withNaming(getNaming)
      .withNumShards(1)

    input
      .apply("WriteToAvro", fileWritingTransform)
  }

}

object EnhancedIngesterScalaSimplified {

  private val SUBSCRIPTION_NAME = "projects/<project>/subscriptions/<subscription>"
  private val TMP_LOCATION = "gs://<path>"

  private val WINDOW_SIZE = Duration.standardSeconds(10)

  def main(args: Array[String]): Unit = {

    val options = PipelineOptionsFactory.fromArgs(args: _*).withValidation().create()

    FileSystems.setDefaultPipelineOptions(options)

    val p = Pipeline.create(options)

    val messages = p
      .apply("ReadMessages", PubsubIO.readMessagesWithAttributes.fromSubscription(SUBSCRIPTION_NAME))
//      .apply("AddProcessingTimeTimestamp", WithTimestamps.of(new SerializableFunction[PubsubMessage, Instant] {
//        override def apply(input: PubsubMessage): Instant = Instant.now()
//      }))

    val parsedMessages = messages
      .apply("ApplyWindow", Window.into[PubsubMessage](FixedWindows.of(WINDOW_SIZE)))
      .apply("ParseMessages", MapElements.via(new SimpleFunction[PubsubMessage, String]() {
        override def apply(msg: PubsubMessage): String = new String(msg.getPayload, StandardCharsets.UTF_8)
      }))

    val dataReadResult = parsedMessages
      .apply("ReadData", ParDo.of(new FileReader))

    val writerResult = dataReadResult.apply(
      "WriteData",
      new DataWriter(TMP_LOCATION)
    )

    writerResult.getPerDestinationOutputFilenames.apply(
      "FilesWritten",
      MapElements.via(new SimpleFunction[KV[String, String], String]() {
      override def apply(input: KV[String, String]): String = {
        println(s"Written ${input.getKey}, ${input.getValue}")
        input.getValue
      }
    }))

    p.run.waitUntilFinish()
  }
}

问题在于,在处理了一些消息(大约 1000 条)之后,作业停止处理新消息,并且它们永远保留在 PubSub 订阅中而未被确认。

我看到在这种情况下水印停止前进,数据新鲜度无限线性增加。

这是来自数据流的屏幕截图:

PubSub 队列的情况如下:

是否可能有一些消息停留在填充它们的数据流队列中,从而无法添加新消息?

我认为PubsubIO如何计算时间戳有问题,所以我尝试强制时间戳等于每条消息的处理时间,但没有成功。

如果我让数据流作业处于这种状态,它似乎会不断地重新处理相同的消息,而不会将任何数据写入存储。

你知道如何解决这个问题吗?

谢谢!

【问题讨论】:

  • 找到根本原因了吗?

标签: scala bigdata google-cloud-dataflow apache-beam


【解决方案1】:

管道很可能在处理管道中的一个(或多个)元素时遇到了错误(它应该与 PubsubIO 如何计算时间戳无关),这会阻止水印自失败的工作将在数据流上一次又一次地重试。

您可以从日志中检查是否有任何故障,特别是来自工作器或线束组件的故障。如果出现解析错误等未处理的运行时异常,很可能是流管道卡住的根本原因。

如果没有 UserCodeException,则可能是由数据流后端引起的其他问题,您可以联系 Dataflow 客户支持,以便工程师可以查看您的管道的后端问题。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-08-16
    • 1970-01-01
    • 2023-03-11
    • 2023-03-30
    相关资源
    最近更新 更多