scala版本2.11
java版本1.8
spark版本2.2.1
es版本6.2.2
hadoop版本2.9.0
elasticsearch节点列表:
192.168.0.120 192.168.0.121 192.168.0.122
内容导航:
1)首先,讲解使用elasticsearch client api讲解如何创建(删除、修改、查询)index,type,mapping;对数据进行增删改查。
2)然后,讲解如何使用在spark下写入elasticsearch。
3)最后,讲解如何读取kafka上的数据,然后读取kafka上数据流写入es。
使用elasticsearch client api
Client
Client是一个类,可以通过该类实现对ES集群各种操作:index/get/delete/search操作,以及对ES集群的管理任务。
Client的构造需要基于TransportClient。
TransportClient
TransportClient可以远程连接到ES集群,通过一个传输模块,但是它不真正的连接到集群,只是获取集群的一个或多个初始传输地址,在每次请求动作时,才真正连接到ES集群。
Settgings
Settings类主要是在启动Client之前,配置一些属性参数,主要配置集群名称cluster name,还有其他参数:
client.transport.sniff:是否为传输client添加嗅探功能;
client.transport.ignore_cluster_name 设为true,或略连接点的集群名称验证;
client.transport.ping_timeout 设置ping节点的时间超时时长,默认5s;
client.transport.nodes_sample_interval 设置sample/ping nodes listed间隔时间,默认5s。
初始化client的示例如下:
1)ClientTools.java(单利方式提供TransportClient对象,关于如何创建client参考《https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html》)
package com.dx.es; import java.net.InetAddress; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.client.PreBuiltTransportClient; public class ClientTools { private static ClientTools instance=null; private TransportClient client=null; private ClientTools(){ this.client=null; init(); } public static synchronized ClientTools getInstance(){ if(instance==null){ instance=new ClientTools(); } return instance; } public TransportClient get(){ return client; } public void close(){ if(null != client){ try { client.close(); } catch (Exception e) { e.printStackTrace(); } } } private void init() { if(null != this.client){ return; } try { Settings settings = Settings.builder() .put("cluster.name",Config.getInstance().get("cluster.name")) .put("client.transport.sniff", Boolean.valueOf(Config.getInstance().get("client.transport.sniff"))) .build(); @SuppressWarnings("unchecked") PreBuiltTransportClient preBuiltTransportClient = new PreBuiltTransportClient(settings); this.client = preBuiltTransportClient; this.client.addTransportAddress(new TransportAddress(InetAddress.getByName(Config.getInstance().get("host1")), 9300)); this.client.addTransportAddress(new TransportAddress(InetAddress.getByName(Config.getInstance().get("host2")), 9300)); } catch (Exception e) { e.printStackTrace(); } } }
2)(es配置信息管理)
package com.dx.es; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.InputStream; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties; public class Config { private static Config instance=null; private Map<String, String> confItems=null; private Config(){ this.confItems=new HashMap<String, String>(); init(); } public static synchronized Config getInstance(){ if(instance==null){ instance=new Config(); } return instance; } public String get(String key){ if(!this.confItems.containsKey(key)) return null; return this.confItems.get(key); } private void init() { Properties prop = new Properties(); try{ // 读取属性文件conf.properties InputStream in = new BufferedInputStream (new FileInputStream("E:\\spark_hadoop_cdh\\workspace\\ES_Client_API\\src\\main\\resources\\conf.properties")); // 加载属性列表 prop.load(in); Iterator<String> it=prop.stringPropertyNames().iterator(); while(it.hasNext()){ String key=it.next(); System.out.println(key+":"+prop.getProperty(key)); this.confItems.put(key, prop.getProperty(key)); } in.close(); } catch(Exception e){ System.out.println(e); } } }