首先讲一下背景,公司现在要做流式,最后敲定用flink来做(考虑flink在阿里的巨大成功)。我们的数据读取Kafka ,经过处理之后会存储到ES中。现在还处在研究阶段,把一些基本的东西跑通,写这篇博客希望做一个记录 也希望能给刚开始搞flinkd 朋友们一点帮助。
用到的技术:flink on yarn (1.7.0) kafka(1.1.1) es(1.7) maven(3.04) 我用idea编辑器 scala(2.11)语言写的 flink 本地也是可以跑的 没有hadoop集群的小伙伴可以在本地测试
package info
import java.util.Properties
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.StateTtlConfig.TimeCharacteristic
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSink, ElasticsearchSinkFunction}
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
import org.elasticsearch.common.transport.{InetSocketTransportAddress, TransportAddress}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010}
object Test03 {
def main(args: Array[String]) {
//获取flink执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// env.enableCheckpointing(5000)
val config = new java.util.HashMap[String, String]
config.put("cluster.name", "elasticsearch1")
// 加上这个条件 按每个事件处理 不加会走buff
config.put("bulk.flush.max.actions", "1")
env.enableCheckpointing(1000)
val transportAddresses = new java.util.ArrayList[TransportAddress]
transportAddresses.add(new InetSocketTransportAddress("ESIp", 9300))
val properties: Properties = new Properties();
properties.setProperty("bootstrap.servers","kafkaIp:9092")
properties.setProperty("zookeeper.connect","zookeeperIp:2181")
properties.put("auto.offset.reset", "earliest")
properties.put("enable.auto.commit", "true")
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("group.id","testgroup12")
val kafkaConsumer = new FlinkKafkaConsumer010(
"test2",//topic
new SimpleStringSchema,
properties)
val messageStream = env
.addSource(kafkaConsumer)
messageStream.print()
messageStream.addSink(new ElasticsearchSink(config,transportAddresses,new ElasticsearchSinkFunction[String](){
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)
Requests.indexRequest()
.index("fktest")
.`type`("streaming")
.source(json)
}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
def process(element:String,ctx: RuntimeContext,indexer: RequestIndexer):Unit= {
indexer.add(createIndexRequest(element))
}
}))
env.execute("StreamESTest")
}
}
在idea 中启动代码 然后再kafka的机器上 起一个生产者 输入一些数据 看能否出结果
起生产者命令: %KAFKA_HOME%/bin/kafka-console-producer.sh --broker-list 1 kafkaIp:9092 --topic test2
ES中查询出的数据
看到这个结果出来 基本上kafka-flink-es 就通了