【问题标题】:How do I integration test a Dataflow pipeline writing to Bigtable?如何集成测试写入 Bigtable 的 Dataflow 管道?
【发布时间】:2018-12-12 05:55:42
【问题描述】:

根据Beam website

在您的设备上执行本地单元测试通常更快、更简单 管道代码而不是调试管道的远程执行。

出于这个原因,我想为写入 Bigtable 的 Beam/Dataflow 应用程序使用测试驱动开发。

但是,按照 Beam 测试文档,我陷入了僵局——PAssert 没有用,因为输出 PCollection 包含 org.apache.hadoop.hbase.client.Put 对象,这些对象不会覆盖 equals 方法。

can't get the contents 的 PCollection 也对它们进行验证,因为

无法直接获取 PCollection 的内容 - 一个 Apache Beam 或 Dataflow 管道更像是一个查询计划 应该完成处理,PCollection 是一个逻辑 计划中的中间节点,而不是包含数据。

那么除了手动运行之外,我还可以如何测试这个管道呢?我正在使用 Maven 和 JUnit(在 Java 中,因为 Dataflow Bigtable Connector 似乎都支持这些)。

【问题讨论】:

    标签: google-cloud-dataflow apache-beam google-cloud-bigtable


    【解决方案1】:

    Bigtable Emulator Maven plugin 可用于为此编写集成测试:

    • 配置 Maven Failsafe plugin 并将测试用例的结尾从 *Test 更改为 *IT 以作为集成测试运行。
    • 在命令行的 gcloud sdk 中安装 Bigtable Emulator:

      gcloud components install bigtable   
      

      请注意,此必需步骤会降低代码的可移植性(例如,它会在您的构建系统上运行吗?在其他开发人员的机器上?)所以我将在部署到构建系统之前使用 Docker 对其进行容器化。

    • 根据README将模拟器插件添加到pom中

    • 使用HBase Client API 并查看example Bigtable Emulator integration test 来设置您的会话和表格。

    • 按照 Beam 文档正常编写测试,除了不使用 PAssert 而是实际调用 CloudBigtableIO.writeToTable 然后使用 HBase 客户端从表中读取数据以进行验证。

    这是一个集成测试示例:

    package adair.example;
    
    import static org.apache.hadoop.hbase.util.Bytes.toBytes;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.UUID;
    import java.util.stream.Collectors;
    
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.transforms.Create;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Admin;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.Mutation;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.hamcrest.collection.IsIterableContainingInAnyOrder;
    import org.junit.Assert;
    import org.junit.Test;
    
    import com.google.cloud.bigtable.beam.CloudBigtableIO;
    import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
    import com.google.cloud.bigtable.hbase.BigtableConfiguration;
    
    /**
     *  A simple integration test example for use with the Bigtable Emulator maven plugin.
     */
    public class DataflowWriteExampleIT {
    
      private static final String PROJECT_ID = "fake";
      private static final String INSTANCE_ID = "fakeinstance";
      private static final String TABLE_ID = "example_table";
      private static final String COLUMN_FAMILY = "cf";
      private static final String COLUMN_QUALIFIER = "cq";
    
      private static final CloudBigtableTableConfiguration TABLE_CONFIG =
        new CloudBigtableTableConfiguration.Builder()
          .withProjectId(PROJECT_ID)
          .withInstanceId(INSTANCE_ID)
          .withTableId(TABLE_ID)
          .build();
    
      public static final List<String> VALUES_TO_PUT = Arrays
        .asList("hello", "world", "introducing", "Bigtable", "plus", "Dataflow", "IT");
    
      @Test
      public void testPipelineWrite() throws IOException {
        try (Connection connection = BigtableConfiguration.connect(PROJECT_ID, INSTANCE_ID)) {
          Admin admin = connection.getAdmin();
          createTable(admin);
    
          List<Mutation> puts = createTestPuts();
    
          //Use Dataflow to write the data--this is where you'd call the pipeline you want to test.
          Pipeline p = Pipeline.create();
          p.apply(Create.of(puts)).apply(CloudBigtableIO.writeToTable(TABLE_CONFIG));
          p.run().waitUntilFinish();
    
          //Read the data from the table using the regular hbase api for validation
          ResultScanner scanner = getTableScanner(connection);
          List<String> resultValues = new ArrayList<>();
          for (Result row : scanner) {
            String cellValue = getRowValue(row);
            System.out.println("Found value in table: " + cellValue);
            resultValues.add(cellValue);
          }
    
          Assert.assertThat(resultValues,
            IsIterableContainingInAnyOrder.containsInAnyOrder(VALUES_TO_PUT.toArray()));
        }
      }
    
      private void createTable(Admin admin) throws IOException {
        HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(TABLE_ID));
        tableDesc.addFamily(new HColumnDescriptor(COLUMN_FAMILY));
    
        admin.createTable(tableDesc);
      }
    
      private ResultScanner getTableScanner(Connection connection) throws IOException {
        Scan scan = new Scan();
        Table table = connection.getTable(TableName.valueOf(TABLE_ID));
        return table.getScanner(scan);
      }
    
      private String getRowValue(Result row) {
        return Bytes.toString(row.getValue(toBytes(COLUMN_FAMILY), toBytes(COLUMN_QUALIFIER)));
      }
    
      private List<Mutation> createTestPuts() {
        return VALUES_TO_PUT
              .stream()
              .map(this::stringToPut)
              .collect(Collectors.toList());
      }
    
      private Mutation stringToPut(String cellValue){
        String key = UUID.randomUUID().toString();
        Put put = new Put(toBytes(key));
        put.addColumn(toBytes(COLUMN_FAMILY), toBytes(COLUMN_QUALIFIER), toBytes(cellValue));
        return put;
      }
    
    }
    

    【讨论】:

      【解决方案2】:

      在 Google Cloud 中,您可以使用真实的云资源(例如 Pub/Sub 主题和 BigQuery 表)轻松地对您的 Dataflow 管道进行 e2e 测试。

      通过使用 Junit5 扩展模型 (https://junit.org/junit5/docs/current/user-guide/#extensions),您可以创建自定义类来处理管道所需资源的创建和删除。

      您可以在 https://github.com/gabihodoroaga/dataflow-e2e-demo 找到演示/种子项目,在 https://hodo.dev/posts/post-31-gcp-dataflow-e2e-tests/ 找到博客文章。

      【讨论】:

        猜你喜欢
        • 2020-05-28
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-11-12
        • 1970-01-01
        • 2019-02-25
        • 2016-06-05
        相关资源
        最近更新 更多