【问题标题】:loading 1GB data into hbase taking 1 hour将 1GB 数据加载到 hbase 需要 1 小时
【发布时间】:2014-05-02 06:06:21
【问题描述】:

我想将 1GB(1000 万条记录)CSV 文件加载到 Hbase。我为它写了 Map-Reduce Program。我的代码运行良好,但需要 1 小时才能完成。最后一个减速器需要半个多小时的时间。谁能帮帮我?

我的代码如下:

驱动程序.Java

包 com.cloudera.examples.hbase.bulkimport; 导入 org.apache.hadoop.conf.Configuration; 导入 org.apache.hadoop.fs.Path; 导入 org.apache.hadoop.hbase.HBaseConfiguration; 导入 org.apache.hadoop.hbase.KeyValue; 导入 org.apache.hadoop.hbase.client.HTable; 导入 org.apache.hadoop.hbase.io.ImmutableBytesWritable; 导入 org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; 导入 org.apache.hadoop.mapreduce.Job; 导入 org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 导入 org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 导入 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * HBase 批量导入示例
* 数据准备 MapReduce 作业驱动 *
    *
  1. args[0]: HDFS 输入路径 *
  2. args[1]:HDFS 输出路径 *
  3. args[2]:HBase 表名 *
*/ 公共类驱动程序{ 公共静态 void main(String[] args) 抛出异常 { 配置 conf = new Configuration(); /* * NBA 总决赛 2010 比赛 1 提示时间(从纪元开始的秒数) * 2010 年 6 月 3 日星期四 18:00:00 PDT */ // conf.setInt("epoch.seconds.tipoff", 1275613200); conf.set("hbase.table.name", args[2]); // 加载 hbase-site.xml HBaseConfiguration.addHbaseResources(conf); Job job = new Job(conf, "HBase Bulk Import Example"); job.setJarByClass(HBaseKVMapper.class); job.setMapperClass(HBaseKVMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); job.setInputFormatClass(TextInputFormat.class); HTable hTable = new HTable(conf, args[2]); // 自动配置 partitioner 和 reducer HFileOutputFormat.configureIncrementalLoad(job, hTable); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); 作业.waitForCompletion(true); // 将生成的 HFiles 加载到表中 // LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); // loader.doBulkLoad(new Path(args[1]), hTable); } }

HColumnEnum.java

包 com.cloudera.examples.hbase.bulkimport; /** * 'srv' 列族的 HBase 表列 */ 公共枚举 HColumnEnum { SRV_COL_employeeid ("employeeid".getBytes()), SRV_COL_eventdesc ("eventdesc".getBytes()), SRV_COL_eventdate ("eventdate".getBytes()), SRV_COL_objectname ("objectname".getBytes()), SRV_COL_objectfolder ("objectfolder".getBytes()), SRV_COL_ipaddress ("ipaddress".getBytes()); 私有最终字节 [] 列名; HColumnEnum (byte[] 列) { this.columnName = 列; } 公共字节[] getColumnName() { 返回 this.columnName; } }

HBaseKVMapper.java

package com.cloudera.examples.hbase.bulkimport;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import au.com.bytecode.opencsv.CSVParser;

/**
 * HBase bulk import example
 * <p>
 * Parses Facebook and Twitter messages from CSV files and outputs
 * <ImmutableBytesWritable, KeyValue>.
 * <p>
 * The ImmutableBytesWritable key is used by the TotalOrderPartitioner to map it
 * into the correct HBase table region.
 * <p>
 * The KeyValue value holds the HBase mutation information (column family,
 * column, and value)
 */
public class HBaseKVMapper extends
    Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {

  final static byte[] SRV_COL_FAM = "srv".getBytes();
  final static int NUM_FIELDS = 6;

  CSVParser csvParser = new CSVParser();
  int tipOffSeconds = 0;
  String tableName = "";

  // DateTimeFormatter p = DateTimeFormat.forPattern("MMM dd, yyyy HH:mm:ss")
  //    .withLocale(Locale.US).withZone(DateTimeZone.forID("PST8PDT"));

  ImmutableBytesWritable hKey = new ImmutableBytesWritable();
  KeyValue kv;

  /** {@inheritDoc} */
  @Override
  protected void setup(Context context) throws IOException,
      InterruptedException {
    Configuration c = context.getConfiguration();

  //  tipOffSeconds = c.getInt("epoch.seconds.tipoff", 0);
    tableName = c.get("hbase.table.name");
  }

  /** {@inheritDoc} */
  @Override
  protected void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

    /*if (value.find("Service,Term,") > -1) {
      // Skip header
      return;
    }*/

    String[] fields = null;

    try {
      fields = value.toString().split(",");
      //csvParser.parseLine(value.toString());
    } catch (Exception ex) {
      context.getCounter("HBaseKVMapper", "PARSE_ERRORS").increment(1);
      return;
    }

    if (fields.length != NUM_FIELDS) {
      context.getCounter("HBaseKVMapper", "INVALID_FIELD_LEN").increment(1);
      return;
    }

    // Get game offset in seconds from tip-off
  /*  DateTime dt = null;

    try {
      dt = p.parseDateTime(fields[9]);
    } catch (Exception ex) {
      context.getCounter("HBaseKVMapper", "INVALID_DATE").increment(1);
      return;
    }

    int gameOffset = (int) ((dt.getMillis() / 1000) - tipOffSeconds);
    String offsetForKey = String.format("%04d", gameOffset);

    String username = fields[2];
    if (username.equals("")) {
      username = fields[3];
    }*/

    // Key: e.g. "1200:twitter:jrkinley"
    hKey.set(String.format("%s|%s|%s|%s|%s|%s", fields[0], fields[1], fields[2],fields[3],fields[4],fields[5])
        .getBytes());

    // Service columns
    if (!fields[0].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_employeeid.getColumnName(), fields[0].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[1].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_eventdesc.getColumnName(), fields[1].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[2].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_eventdate.getColumnName(), fields[2].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[3].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_objectname.getColumnName(), fields[3].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[4].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_objectfolder.getColumnName(), fields[4].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[5].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_ipaddress.getColumnName(), fields[5].getBytes());
      context.write(hKey, kv);
    }


    context.getCounter("HBaseKVMapper", "NUM_MSGS").increment(1);

    /*
     * Output number of messages per quarter and before/after game. This should
     * correspond to the number of messages per region in HBase
     */
  /*  if (gameOffset < 0) {
      context.getCounter("QStats", "BEFORE_GAME").increment(1);
    } else if (gameOffset < 900) {
      context.getCounter("QStats", "Q1").increment(1);
    } else if (gameOffset < 1800) {
      context.getCounter("QStats", "Q2").increment(1);
    } else if (gameOffset < 2700) {
      context.getCounter("QStats", "Q3").increment(1);
    } else if (gameOffset < 3600) {
      context.getCounter("QStats", "Q4").increment(1);
    } else {
      context.getCounter("QStats", "AFTER_GAME").increment(1);
    }*/
  }
}

请帮助我提高性能,或者如果您有任何带有示例代码的替代解决方案,请告诉我。

我的 mapred-site.xml

 <?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>

<property>
  <name>mapred.job.tracker</name>
    <value>namenode:54311</value>
    </property>

<property>
  <name>mapred.reduce.parallel.copies</name>
    <value>20</value>
    </property>

<property>
  <name>tasktracker.http.threads</name>
    <value>50</value>
    </property>

<property>
  <name>mapred.job.shuffle.input.buffer.percent</name>
    <value>0.70</value>
    </property>

<property>
  <name>mapred.tasktracker.map.tasks.maximum</name>
    <value>4</value>
    </property>

<property>
  <name>mapred.tasktracker.reduce.tasks.maximum</name>
    <value>4</value>
    </property>

<property>
  <name>mapred.map.tasks</name>
    <value>4</value>
    </property>

<property>
  <name>reduce.map.tasks</name>
    <value>4</value>
    </property>

<property>
  <name>mapred.job.shuffle.merge.percent</name>
    <value>0.65</value>
    </property>

<property>
  <name>mapred.task.timeout</name>
    <value>1200000</value>
    </property>

<property>
    <name>mapred.child.java.opts</name>
        <value>-Xms1024M -Xmx2048M</value>
        </property>



<property>
  <name>mapred.job.reuse.jvm.num.tasks</name>
    <value>-1</value>
    </property>

<property>
    <name>mapred.compress.map.output</name>
    <value>true</value>
</property>

<property>
    <name>mapred.map.output.compression.codec</name>
    <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

<property>
    <name>io.sort.mb</name>
    <value>800</value>
</property>


<property>
  <name>mapred.child.ulimit</name>
    <value>unlimited</value>
    </property>

<property>
<name>io.sort.factor</name>
<value>100</value>
<description>More streams merged at once while sorting files.</description>
</property>  


 <property>
 <name>mapreduce.admin.map.child.java.opts</name>
 <value>-Djava.net.preferIPv4Stack=true</value>
 </property>
 <property>
 <name>mapreduce.admin.reduce.child.java.opts</name>
 <value>-Djava.net.preferIPv4Stack=true</value>
 </property>


<property>
   <name>mapred.min.split.size</name>
   <value>0</value>
</property>

<property>
   <name>mapred.job.map.memory.mb</name>
     <value>-1</value>
     </property>

<property>
   <name>mapred.jobtracker.maxtasks.per.job</name>
        <value>-1</value>
             </property>


</configuration>

hbase-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
    <name>hbase.rootdir</name>
    <value>hdfs://namenode:54310/hbase</value>
    <description>The directory shared by RegionServers.
    </description>
</property>

<property>
    <name>hbase.master</name>
    <value>slave:60000</value>
    <description>The host and port that the HBase master runs at.
    A value of 'local' runs the master and a regionserver
    in a single process.
    </description>
</property>

<property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
    <description>The mode the cluster will be in. Possible values are
    false: standalone and pseudo-distributed setups with managed Zookeeper
    true: fully-distributed with unmanaged Zookeeper Quorum (see hbase-env.sh)
    </description>
</property>

<property>
    <name>hbase.zookeeper.quorum</name>
    <value>slave</value>
    <description>Comma separated list of servers in the ZooKeeper Quorum.
    For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com".
    By default this is set to localhost for local and pseudo-distributed modes
    of operation. For a fully-distributed setup, this should be set to a full
    list of ZooKeeper quorum servers. If HBASE_MANAGES_ZK is set in hbase-env.sh
    this is the list of servers which we will start/stop ZooKeeper on.
    </description>
</property>

<property>
       <name>hbase.zookeeper.property.clientPort</name>
       <value>2181</value>
</property>

<property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/home/hduser/work/zoo_data</value>
    <description>Property from ZooKeeper's config zoo.cfg.
    The directory where the snapshot is stored.
    </description>
</property>

</configuration>

请帮帮我,这样我可以提高我的表现。

【问题讨论】:

  • 您是否尝试过分析和监控 I/O?
  • 一分钟以上对我来说听起来很慢。听起来好像出了点问题,分析您的应用程序似乎是最好的方法。我将从使用最多 CPU 的进程开始。
  • 可能与事务大小有关,如果您还没有这样做,那么我会尝试将插入批量处理为每个事务 5k。
  • 我已将所有最佳配置放在我的 mapred-site.xml 上,但仍然没有提高性能。我正在使用 5 节点集群。我的所有节点都有 quard 核心处理器和 8GB RAM。请建议我如何提高性能
  • 你的mapred-site.xmlhbase-site.xml 是什么?您是否有任何硬件指标,例如网络、IO、CPU 利用率?

标签: java hadoop mapreduce hbase hadoop2


【解决方案1】:

首先,为什么我们需要 Mapreduce 程序来为这么小的文件(1GB)加载数据到 Hbase。

根据我的经验,我已经使用 Jackson 流处理了 5GB Json(我不想将所有 json 放入内存中)并使用批处理技术在 8 分钟内持久保存在 Hbase 中。

我使用 hbase 批量放入 100000 条记录的 List 对象。

下面是我实现这一点的代码 sn-p。解析其他格式时也可以做同样的事情)

可能你需要在 2 个地方调用这个方法

1) 批处理 100000 条记录。

2) 处理提醒您的批次记录少于100000

  public void addRecord(final ArrayList<Put> puts, final String tableName) throws Exception {
        try {
            final HTable table = new HTable(HBaseConnection.getHBaseConfiguration(), getTable(tableName));
            table.put(puts);
            LOG.info("INSERT record[s] " + puts.size() + " to table " + tableName + " OK.");
        } catch (final Throwable e) {
            e.printStackTrace();
        } finally {
            LOG.info("Processed ---> " + puts.size());
            if (puts != null) {
                puts.clear();
            }
        }
    }

【讨论】:

    【解决方案2】:

    我只创建了映射器类并采用 hbase 输出格式类。现在需要 10 分钟。我的网络速度很慢,这就是为什么它需要很长时间。

    【讨论】:

    • 这可以通过正常的java程序来实现,时间间隔小于mapreduce。请看我的回答。
    【解决方案3】:

    可以通过在创建 Hbase 表时指定要使用的 Region 拆分数来进一步微调。由于批量加载的减速器实例的数量也取决于区域的数量。这可以通过以下命令来完成

    hbase org.apache.hadoop.hbase.util.RegionSplitter -c <number of regions> -f <column families> <New Hbase Table Name> <splitAlgorithm>
    

    对于分割算法,可以指定

    • UniformSplit - 将键视为任意字节

    • HexStringSplit - 将键视为十六进制 ASCII

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2012-12-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多