【发布时间】:2016-12-31 21:40:00
【问题描述】:
我正在尝试使用 Spark 流从 Kafka 接收消息,将它们转换为 Put 并插入 HBase。
我创建了一个inputDstream 来接收来自Kafka 的消息,然后创建一个JobConf,最后使用saveAsHadoopDataset(JobConf) 将记录保存到HBase。
每次将记录插入 HBase 时,都会建立从 Hbase 到 zookeeper 的会话,但不会关闭。如果连接数增加到超过 zookeeper 的最大客户端连接数,则引发流式传输崩溃。
我的代码如下所示:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
object ReceiveKafkaAsDstream {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ReceiveKafkaAsDstream")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val topics = "test"
val brokers = "10.0.2.15:6667"
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
val tableName = "KafkaTable"
val conf = HBaseConfiguration.create()
conf.set("zookeeper.znode.parent", "/hbase-unsecure")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val jobConfig: JobConf = new JobConf(conf, this.getClass)
jobConfig.set("mapreduce.output.fileoutputformat", "/user/root/out")
jobConfig.setOutputFormat(classOf[TableOutputFormat])
jobConfig.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val records = messages
.map(_._2)
.map(SampleKafkaRecord.parseToSampleRecord)
records.print()
records.foreachRDD{ stream => stream.map(SampleKafkaRecord.SampleToHbasePut).saveAsHadoopDataset(jobConfig) }
ssc.start()
ssc.awaitTermination()
}
case class SampleKafkaRecord(id: String, name: String)
object SampleKafkaRecord extends Serializable {
def parseToSampleRecord(line: String): SampleKafkaRecord = {
val values = line.split(";")
SampleKafkaRecord(values(0), values(1))
}
def SampleToHbasePut(CSVData: SampleKafkaRecord): (ImmutableBytesWritable, Put) = {
val rowKey = CSVData.id
val putOnce = new Put(rowKey.getBytes)
putOnce.addColumn("cf1".getBytes, "column-Name".getBytes, CSVData.name.getBytes)
return (new ImmutableBytesWritable(rowKey.getBytes), putOnce)
}
}
}
我在 zookeeper conf 文件 zoo.cfg 中将 SSC (SparkStreamingContext) 的持续时间设置为 1s,并将 maxClientCnxns 设置为 10,因此从一个客户端到 zookeeper 最多允许 10 个连接。
10 秒后(从 HBase 到 Zookeeper 设置了 10 个会话),我收到如下所示的错误:
16/08/24 14:59:30 WARN RecoverableZooKeeper: Possibly transient ZooKeeper, quorum=localhost:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase-unsecure/hbaseid
16/08/24 14:59:31 INFO ClientCnxn: Opening socket connection to server localhost.localdomain/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
16/08/24 14:59:31 INFO ClientCnxn: Socket connection established to localhost.localdomain/127.0.0.1:2181, initiating session
16/08/24 14:59:31 WARN ClientCnxn: Session 0x0 for server localhost.localdomain/127.0.0.1:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:384)
at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)
据我了解,存在此错误是因为连接数超过了 zookeeper 的最大连接数。如果我将 maxClientCnxn 设置为 20,则流处理可以持续 20 秒。我知道我可以将 maxClientCnxn 设置为无限制,但我真的不认为这是解决这个问题的好方法。
另一件事是,如果我使用 TextFileStream 将文本文件作为 DStream 获取并使用 saveAsHadoopDataset(jobConf) 将它们保存到 hbase,它运行得很好。如果我只是使用val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 从 kafka 读取数据并简单地打印信息,那么也没有问题。当我收到 kafka 消息然后将它们保存到应用程序中的 HBase 时,问题就来了。
我的环境是 HDP 2.4 沙盒。版本 spark:1.6,hbase:1.1.2,kafka:2.10.0,zookeeper:3.4.6。
感谢任何帮助。
【问题讨论】:
标签: apache-spark hbase apache-kafka spark-streaming hortonworks-data-platform