【问题标题】:Reading Bigtable data from the middle of a Dataflow pipeline从 Dataflow 管道的中间读取 Bigtable 数据
【发布时间】:2020-05-28 00:38:34
【问题描述】:

我有一个管道,它从 pub sub 接收一些数据,进行一些处理,并且需要根据该处理的结果处理 Bigtable 上的所有数据。

例如,我有一条 pub sub 消息,例如:{clientId: 10},所以我需要从 Bigtable 中读取 clientId 10 的所有数据(我知道如何根据 clientId 创建扫描)。问题是我们目前对 Bigtable(BigtableIO 和 CloudBigtableIO)的两个读取都是基于管道以 bigtable 开头的事实,所以我不能(或找不到方法)在中间使用它们管道。我怎样才能实现这种情况?

简单的类伪代码:

Pipeline p = Pipeline.create(...)
p.apply(PubsubIO.readMessagesWithAttributes ...)
.apply( PubsubMessageToScans()) // I know how to do this
.apply( ReadBigTable()) // How to do this?

【问题讨论】:

    标签: google-cloud-dataflow apache-beam dataflow google-cloud-bigtable


    【解决方案1】:

    更新:

    我最近在玩 Bigtable 和 Dataflow,遇到了您在此处描述的相同问题。我不相信有办法在管道中间执行Read.from(CloudBigtableIO.read(config),因此您必须创建自己的 DoFn。您可以扩展AbstractCloudBigtableTableDoFn 并通过getConnection() 访问易于重用和可配置的Bigtable 连接。这是我整理的一个示例 Dataflow/Beam 作业,展示了如何执行此操作:

    public class ReadInMiddleOfPipeline {
      public static void main(String[] args) {
        BigtableOptions options =
            PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    
        Pipeline p = Pipeline.create(options);
        CloudBigtableTableConfiguration bigtableTableConfig =
            new CloudBigtableTableConfiguration.Builder()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .build();
    
        p.apply(GenerateSequence.from(0).to(10).withRate(1, new Duration(1000)))
            .apply(ParDo.of(new ReadFromTableFn(bigtableTableConfig)));
    
        p.run().waitUntilFinish();
      }
    
      static class ReadFromTableFn extends AbstractCloudBigtableTableDoFn<Long, Void> {
        public ReadFromTableFn(CloudBigtableConfiguration config) {
          super(config);
        }
    
        @ProcessElement
        public void processElement(@Element Long input, OutputReceiver<Void> out, PipelineOptions po) {
          BigtableOptions options = po.as(BigtableOptions.class);
          try {
            Table table = getConnection().getTable(TableName.valueOf(options.getBigtableTableId()));
            Scan scan = new Scan().setRowPrefixFilter(Bytes.toBytes("#phone"));
            ResultScanner rows = table.getScanner(scan);
    
            for (Result row : rows) {
              System.out.printf(
                  "Reading data for %s%n", Bytes.toString(row.rawCells()[0].getRowArray()));
            }
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      }
    
      public interface BigtableOptions extends DataflowPipelineOptions {
        @Description("The Bigtable project ID, this can be different than your Dataflow project")
        @Default.String("bigtable-project")
        String getBigtableProjectId();
    
        void setBigtableProjectId(String bigtableProjectId);
    
        @Description("The Bigtable instance ID")
        @Default.String("bigtable-instance")
        String getBigtableInstanceId();
    
        void setBigtableInstanceId(String bigtableInstanceId);
    
        @Description("The Bigtable table ID in the instance.")
        @Default.String("bigtable-table")
        String getBigtableTableId();
    
        void setBigtableTableId(String bigtableTableId);
      }
    }
    

    【讨论】:

    • 这样我就不能根据PubSub消息的内容配置扫描了吧?
    • 你不能用我上面给出的代码来做到这一点,但如果你以此为起点,你应该能够将一些配置输出到管道并将其作为 Bigtable 的输入阅读。
    【解决方案2】:

    为了补充@Billy 的答案,您还可以尝试在 ParDo 转换中使用 BigtableDataClient 类。 数据输入将是PubsubMessage中包含的参数来配置Scan对象,然后在ParDo中设置Scan参数,连接BigTable,获取过滤结果。

    这个 sn-p 可能有用:

        @ProcessElement
        public void processElement(@Element String element, OutputReceiver<String> out){
    
            String projectId = "<PROJECT_ID>";
            String instanceId = "<INSTANCE_ID>";
            String tableName = "<TABLENAME>";
    
    
            String[] scanParameters = element.split(",");
    
            try (Connection connection = BigtableConfiguration.connect(projectId, instanceId)){
    
                Table table = connection.getTable(TableName.valueOf(tableName));
    
                Scan scan = new Scan();
                scan.withStartRow(Bytes.toBytes(scanParameters[0]));
                scan.withStopRow(Bytes.toBytes(scanParameters[1]));
    
                ResultScanner scanner = table.getScanner(scan);
    
                for (Result row : scanner) {
                    System.out.println(row);
                }
    
                catch (Exception e){
                    e.printStackTrace();
                }
    
                out.output("");
            }
    

    我没有直接使用 PubsubMessage 进行测试,但是您可以进行另一个转换来调整消息或直接获取 PubsubMessage 并设置 Scan 对象。

    【讨论】:

    • 最后我们将使用像这样的解决方案,主要区别在于我们将扩展 AbstractCloudBigtableTableDoFn 来管理我们案例的连接,并且扫描将由另一个步骤生成管道,所以类似于 ``` fun processElement(@Element metadata: Metadata, output: OutputReceiver) { createScansForClient(metadata.clientId).forEach { output.output(it) } } ``` 并在下一步我们通过扫描并行读取 BigTable。
    • 嘿Felipe,我昨天在玩Bigtable+Dataflow,遇到了你描述的同样的问题。我相信您现在已经弄清楚了,但是我添加了一个代码示例,展示了如何使用 AbstractCloudBigtableTableDoFn 并连接到 Bigtable。希望您能发现它有用,如果出现其他问题,请发布更多信息。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-10-28
    • 1970-01-01
    • 2018-12-12
    • 2020-11-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多