【发布时间】:2017-09-04 03:34:09
【问题描述】:
我正在使用 Flink 1.1.2 并在 Maven 中添加了 ElesticSearch 依赖项,如下所示
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.2.0</version>
</dependency>
我的程序包含以下代码,它从 Kafka 读取数据并插入到 Elastic 搜索中
public class ReadFromKafka {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
new JoinSchema(), properties));
System.out.println("reading form kafka ");
message.print();
Map<String, String> config = new HashMap<>();
config.put("bulk.flush.max.actions", "1"); // flush inserts after every event
config.put("cluster.name", "elasticsearch_amar"); // default cluster name
List<InetSocketAddress> transports = new ArrayList<>();
// set default connection details
transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
message.addSink(new ElasticsearchSink<>(config,transports,new ElasticInserter()));
env.execute();
} //main
public static class ElasticInserter implements ElasticsearchSinkFunction<JoinedStreamEvent>{
@Override
public void process(JoinedStreamEvent record, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
Map<String, Integer> json = new HashMap<>();
json.put("Time", record.getPatient_id());
json.put("heart Rate ", record.getHeartRate());
json.put("resp rete", record.getRespirationRate());
IndexRequest rqst = Requests.indexRequest()
.index("nyc-places") // index name
.type("popular-locations") // mapping name
.source(json);
requestIndexer.add(rqst);
} //process
} //ElasticInserter
} //ReadFromKafka
我已经使用homebrew 安装了ElesticSearch,然后使用elesticsearch 命令启动它,如下所示
【问题讨论】:
标签: maven elasticsearch apache-flink elasticsearch-plugin flink-streaming