【问题标题】:How data is split into part files in sqoopsqoop中如何将数据拆分为部分文件
【发布时间】:2017-07-14 10:17:18
【问题描述】:

如果数据有偏差,我怀疑如何将数据分区为部分文件。如果可能,请帮助我澄清这一点。

假设这是我的department 表,以department_id 作为主键。

mysql> select * from departments;
2 Fitness
3 Footwear
4 Apparel
5 Golf
6 Outdoors
7 Fan Shop

如果我通过在导入命令中提及-m 1 来使用sqoop import,我知道我只会生成一个包含所有记录的零件文件。

现在我在没有指定任何映射器的情况下运行命令。所以默认情况下它应该需要 4 个映射器,它在 HDFS 中创建了 4 个部分文件。下面是每个部分文件的记录是如何分布的。

[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00000
2,Fitness
3,Footwear
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00001
4,Apparel
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00002
5,Golf
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00003
6,Outdoors
7,Fan Shop

根据 BoundingValsQuery,默认使用 Min(department_id)=2、Max(department_id)=8 和 4 个映射器。

计算后,每个mapper应该得到(8-2)/4=1.5条记录。

这里我不知道如何分发数据。我不明白如何在 part-m-00000 中有 2 条记录,而在 part-m-00001、part-m-00002 中只有一条,在 part-m-00003 中又是两条。

【问题讨论】:

  • 如果你想有人帮助你,你应该更好地解释你想要的。你至少应该解释一下你想要达到的目标。
  • 我想知道数据是如何在部分文件之间拆分的,即哪条记录将转到哪个部分文件。
  • Sqoop 为每个映射器创建 sql 查询。您可以从所有 Workers 节点检查 SQL。您可以在您的 sql server 上运行所有 sql 查询并共享结果吗?
  • @SandeepSingh 首先我使用的是 mysql。基本上我知道 sqoop 为每个映射器创建 sql 查询。但我想知道如何准备查询以及如何确定每个映射器的异常值。
  • @ElmerDantas- 实际上我想看看在你的情况下查询是如何形成的,因为数据中的最大值是 7,但你已经分配了 8

标签: hadoop sqoop hadoop-partitioning


【解决方案1】:

如果你有机会去图书馆看看。它涉及一系列步骤。

Sqoop 作业读取记录。通过 DBRecordReader

 org.apache.sqoop.mapreduce.db.DBRecordReader

这里有两种方法可以工作。

方法 1.

protected ResultSet executeQuery(String query) throws SQLException {
Integer fetchSize = dbConf.getFetchSize();
/*get fetchSize according to split which is calculated via getSplits() method of 
org.apache.sqoop.mapreduce.db.DBInputFormat.And no. of splits are calculated
via no. of (count from table/no. of mappers). */
 }

拆分计算:-

org.apache.sqoop.mapreduce.db.DBInputFormat
 public List<InputSplit> getSplits(JobContext job) throws IOException {
 .......//here splits are calculated accroding to count of source table
 .......query.append("SELECT COUNT(*) FROM " + tableName);
}   

方法二。

 protected String getSelectQuery() {
    if (dbConf.getInputQuery() == null) {
      query.append("SELECT ");

      for (int i = 0; i < fieldNames.length; i++) {
        query.append(fieldNames[i]);
        if (i != fieldNames.length -1) {
          query.append(", ");
        }
      }

      query.append(" FROM ").append(tableName);
      query.append(" AS ").append(tableName); 
      if (conditions != null && conditions.length() > 0) {
        query.append(" WHERE (").append(conditions).append(")");
      }

      String orderBy = dbConf.getInputOrderBy();
      if (orderBy != null && orderBy.length() > 0) {
        query.append(" ORDER BY ").append(orderBy);
      }
    } else {
      //PREBUILT QUERY
      query.append(dbConf.getInputQuery());
    }

    try {// main logic to decide division of records between mappers.
      query.append(" LIMIT ").append(split.getLength());
      query.append(" OFFSET ").append(split.getStart());
    } catch (IOException ex) {
      // Ignore, will not throw.
    }

    return query.toString();
  }

查看cmets下的代码部分主逻辑到....... 这里的记录是按照LIMIT和OFFSET来划分的。对于每个 RDBMS,此逻辑的实现方式都不同。只需寻找org.apache.sqoop.mapreduce.db.OracleDBRecordReader 它与 getSelectQuery() 方法的实现几乎没有什么不同。

希望这能让您快速了解如何将记录划分为不同的映射器。

【讨论】:

    【解决方案2】:

    Sqoop 在主键列或按列拆分中找到最小值和最大值,然后尝试为给定数量的映射器划分范围。

    示例,如果您有一个主键列 id 最小值为 0 最大值为 1000 的表,并且 Sqoop 被指示使用 4 个任务,则 Sqoop 将运行四个进程每个执行 SELECT * FROM sometable WHERE id >= lo AND id

    这里 min val =2 max=7 因此 sqoop 将运行四个进程,范围如下 (2-4) , (4-5) , (5-6),(6-7) 这意味着

    1. 第二和第三一起
    2. 第四条记录
    3. 第五条记录
    4. 此范围内的第 6 和第 7

    【讨论】:

      猜你喜欢
      • 2016-08-30
      • 1970-01-01
      • 2011-03-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多