【问题标题】:pubSubToBq getAttribute from messagepubSubToBq 从消息中获取属性
【发布时间】:2021-06-11 07:51:29
【问题描述】:

我想从 pubsub 消息中提取属性并将其用作 BQ 目标表名称的一部分。 这是我处理每个 pubsub 消息:

  static class PubsubMessageToFailsafeElementFn
      extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
    @ProcessElement
    public void processElement(ProcessContext context) {
      PubsubMessage message = context.element();
      assert message != null;
      context.output(
          FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
    }
  }

这是我将所有消息转换为 TableRow 以插入 BQ:

  static class PubsubMessageToTableRow
      extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {

    private final Options options;

    PubsubMessageToTableRow(Options options) {
      this.options = options;
    }

    @Override
    public PCollectionTuple expand(PCollection<PubsubMessage> input) {

      PCollectionTuple jsonToTableRowOut =
          input
              // Map the incoming messages into FailsafeElements so we can recover from failures
              // across multiple transforms.

              .apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
              .apply(
                      "JsonToTableRow",
                      FailsafeJsonToTableRow.<PubsubMessage>newBuilder()
                              .setSuccessTag(TRANSFORM_OUT)
                              .setFailureTag(TRANSFORM_DEADLETTER_OUT)
                              .build());

      // Re-wrap the PCollections so we can return a single PCollectionTuple
      return PCollectionTuple.of(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))
              .and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));

最后一个过程,我将所有的 TableRow 上传到 BQ:

    PCollectionTuple convertedTableRows =
        messages
            /*
             * Step #2: Transform the PubsubMessages into TableRows
             */
            .apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));

    WriteResult writeResult =
        convertedTableRows.get(TRANSFORM_OUT)
            .apply(
                "WriteSuccessfulRecords",
                BigQueryIO.writeTableRows()
                    .withoutValidation()
                    .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withExtendedErrorInfo()
                    .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                    .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                    .to(**SHOULD BE MESSAGE ATTRIBUTE HERE**));

【问题讨论】:

    标签: java google-bigquery google-cloud-dataflow google-cloud-pubsub dataflow


    【解决方案1】:

    您可以在 BigQueryIO to 字段中使用 DynamicDestinations

    【讨论】:

      猜你喜欢
      • 2015-06-18
      • 2016-04-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-04-07
      • 1970-01-01
      • 2012-07-23
      • 2019-09-24
      相关资源
      最近更新 更多