【问题标题】:Read HBase Table using Spark on Zeppelin在 Zeppelin 上使用 Spark 读取 HBase 表
【发布时间】:2016-12-19 22:11:46
【问题描述】:

我有一个读取 HBase 表的代码,将其格式化,然后将其转换为 DataFrame:

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;

val tableName = "my_table"

val conf = HBaseConfiguration.create()
// Add local HBase conf
conf.addResource(new Path("file:///opt/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/etc/hbase/conf.dist/hbase-site.xml"))
conf.set(TableInputFormat.INPUT_TABLE, tableName)

val admin = new HBaseAdmin(conf)

admin.isTableAvailable(tableName)

val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], 
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])


case class MyClass(srcid: Long, srcLat: Double, srcLong: Double, dstid: Long, dstLat: Double, dstLong: Double, time: Int, duration: Integer )

val parsed = hBaseRDD.map{ case(b, a) => val iter = a.list().iterator();
            ( Bytes.toString(a.getRow()).toLong,
            Bytes.toString( iter.next().getValue()).toDouble,
            Bytes.toString(iter.next().getValue()).toDouble,
            Bytes.toString(iter.next().getValue()).toLong,
            Bytes.toString(iter.next().getValue()).toDouble,
            Bytes.toString(iter.next().getValue()).toDouble,
            Bytes.toString(iter.next().getValue()).toInt,
            Bytes.toString(iter.next().getValue()) 
)}.map{ s => 
                        val time = s._8.replaceAll( "T", "")
                        val time2 = time.replaceAll( "\\+03:00", "")
                        val format = new java.text.SimpleDateFormat("yyyy-MM-ddHH:mm:ss.SSS")
                        val date = format.parse(time2)
                        MyClass( s._1,
                        s._5,
                        s._6,
                        s._4,
                        s._2,
                        s._3,
                        date.getHours(),
                        //s(6),
                        s._7) }.toDF()
parsed.registerTempTable("my_table")

这段代码在 spark-shell 中运行良好。但是我想在 Zeppelin 笔记本中使用它。我期待这能很好地适用于 paragrah。但是,当我运行代码时,它会在导入语句中输出以下错误:

<console>:28: error: object hbase is not a member of package org.apache.hadoop
       import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}

我是否需要添加依赖项才能在 Zeppelin 中将 HBase 与 Spark 结合使用。如果是这样,我该怎么做?

【问题讨论】:

    标签: apache-spark dependencies hbase apache-zeppelin


    【解决方案1】:

    按照文档中的描述添加对 HBase 的依赖:http://zeppelin.apache.org/docs/0.6.0/manual/dependencymanagement.html

    你需要org.apache.hbase:hbase:1.2.3

    另外,您可能对 Zeppelin HBase 解释器感兴趣,可以直接从 Zeppelin 运行 HBase 查询。但是它超出了这个问题的主题

    【讨论】:

      猜你喜欢
      • 2015-01-23
      • 1970-01-01
      • 1970-01-01
      • 2016-01-03
      • 1970-01-01
      • 2017-04-28
      • 1970-01-01
      • 1970-01-01
      • 2018-09-01
      相关资源
      最近更新 更多