【问题标题】:linking spark-streaming to HBase将 spark-streaming 链接到 HBase
【发布时间】:2016-08-04 23:48:51
【问题描述】:

我是 Spark 和 HBase 的新手,但我需要将两者链接在一起,我尝试了 spark-hbase-connector 库,但使用 spark-submit 它不起作用,即使没有显示错误。我在这里和其他地方搜索了类似的问题或教程,但找不到,所以任何人都可以解释如何从 Spark 流中写入 HBase 或推荐教程或书籍吗? 提前谢谢你

【问题讨论】:

  • 这个问题在很多层面上都是题外话。请阅读如何提出关于 SO 的问题以及在此处回答的范围内的问题。

标签: apache-spark hbase spark-streaming


【解决方案1】:

最终奏效的是:

val hconf = HBaseConfiguration.create()
val hTable = new HTable(hconf, "mytab")
val thePut = new Put(Bytes.toBytes(row))
thePut.add(Bytes.toBytes("colfamily"), Bytes.toBytes("c1"), Bytes.toBytes(value)
hTable.put(thePut)

【讨论】:

    【解决方案2】:

    这里是一些示例代码,使用 Splice Machine(开源)通过 Spark Streaming 和 Kafka 将数据存储到 HBase...

    https://github.com/splicemachine/splice-community-sample-code/tree/master/tutorial-kafka-spark-streaming

    我们也经历过这个过程,并且知道这可能有点令人生畏。

    【讨论】:

    • 请在答案中包含相关部分,因为该链接可能会消失。
    【解决方案3】:

    这里是相关代码...

            LOG.info("************ SparkStreamingKafka.processKafka start");
    
       // Create the spark application and set the name to MQTT
        SparkConf sparkConf = new SparkConf().setAppName("KAFKA");
    
        // Create the spark streaming context with a 'numSeconds' second batch size
        jssc = new JavaStreamingContext(sparkConf, Durations.seconds(numSeconds));
        jssc.checkpoint(checkpointDirectory);
    
        LOG.info("zookeeper:" + zookeeper);
        LOG.info("group:" + group);
        LOG.info("numThreads:" + numThreads);
        LOG.info("numSeconds:" + numSeconds);
    
    
        Map<String, Integer> topicMap = new HashMap<>();
        for (String topic: topics) {
            LOG.info("topic:" + topic);
          topicMap.put(topic, numThreads);
        }
    
        LOG.info("************ SparkStreamingKafka.processKafka about to read the MQTTUtils.createStream");
        //2. KafkaUtils to collect Kafka messages
        JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper, group, topicMap);
    
        //Convert each tuple into a single string.  We want the second tuple
        JavaDStream<String> lines = messages.map(new TupleFunction());
    
        LOG.info("************ SparkStreamingKafka.processKafka about to do foreachRDD");
        //process the messages on the queue and save them to the database
        lines.foreachRDD(new SaveRDDWithVTI());
    
    
        LOG.info("************ SparkStreamingKafka.processKafka prior to context.strt");
        // Start the context
        jssc.start();
        jssc.awaitTermination();
    

    【讨论】:

      猜你喜欢
      • 2017-12-20
      • 2016-02-28
      • 1970-01-01
      • 2016-05-04
      • 1970-01-01
      • 2016-10-25
      • 1970-01-01
      • 2014-11-25
      • 1970-01-01
      相关资源
      最近更新 更多