【问题标题】:How to read and write to HBase in flink streaming job如何在 flink 流作业中读取和写入 HBase
【发布时间】:2016-10-26 12:50:58
【问题描述】:

如果我们必须在流式应用程序中读取和写入 HBASE,我们该怎么做。我们通过 open 方法打开一个连接进行写入,我们如何打开一个连接进行读取。

object test {

    if (args.length != 11) {
      //print args
      System.exit(1)
    }

    val Array() = args
    println("Parameters Passed " + ...);

    val env = StreamExecutionEnvironment.getExecutionEnvironment


    val properties = new Properties()
    properties.setProperty("bootstrap.servers", metadataBrokerList)
    properties.setProperty("zookeeper.connect", zkQuorum)
    properties.setProperty("group.id", group)


    val messageStream = env.addSource(new FlinkKafkaConsumer08[String](topics, new SimpleStringSchema(), properties))

    messageStream.map { x => getheader(x) }





    def getheader(a: String) {

        //Get header and parse and split the headers
                if (metadata not available hit HBASE) { //Device Level send(Just JSON)

            //How to read from HBASE here .

                      } 
                      //If the resultset is not available in Map fetch from phoenix
                      else {
                          //fetch from cache
                      }
     }




    }
   messageStream.writeUsingOutputFormat(new HBaseOutputFormat());
   env.execute()

}

现在在方法getheader 中,如果我想从if(metadata not available hit HBASE) 中的HBASE 读取,我该怎么做。我不想在这里打开一个连接,这个想法是为一个线程维护一个连接并重用它,就像 flink 使用 open() 方法对 HBASE sink 所做的那样,或者 spark 对 foreachpartition 所做的那样。我尝试了this,但我无法将 StreamExecutionEnvironment 传递给方法。我怎么能做到这一点,有人可以提供一个sn-p吗?

【问题讨论】:

    标签: hadoop apache-flink flink-streaming


    【解决方案1】:

    您想从流式用户函数中读取/写入 Apache HBase。您链接的HBaseReadExample 正在做一些不同的事情:它将 HBase 表读入 DataSet(Flink 的批处理抽象)。在用户函数中使用此代码意味着从 Flink 程序中启动 Flink 程序。

    对于您的用例,您需要直接在您的用户函数中创建一个 HBase 客户端并与之交互。最好的方法是使用RichFlatMapFunction 并在open() 方法中创建到HBase 的连接。

    Flink 的下一个版本(1.2.0)将在用户函数中支持asynchronous I/O operations,这将显着提高应用程序的吞吐量。

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-11-10
    • 2017-12-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多