scala版本2.11

java版本1.8

spark版本2.2.1

es版本6.2.2

hadoop版本2.9.0

elasticsearch节点列表:

192.168.0.120
192.168.0.121
192.168.0.122

内容导航:

1)首先,讲解使用elasticsearch client api讲解如何创建(删除、修改、查询)index,type,mapping;对数据进行增删改查。

2)然后,讲解如何使用在spark下写入elasticsearch。

3)最后,讲解如何读取kafka上的数据,然后读取kafka上数据流写入es。

使用elasticsearch client api

Client

Client是一个类,可以通过该类实现对ES集群各种操作:index/get/delete/search操作,以及对ES集群的管理任务。

Client的构造需要基于TransportClient。

TransportClient

TransportClient可以远程连接到ES集群,通过一个传输模块,但是它不真正的连接到集群,只是获取集群的一个或多个初始传输地址,在每次请求动作时,才真正连接到ES集群。

Settgings

Settings类主要是在启动Client之前,配置一些属性参数,主要配置集群名称cluster name,还有其他参数:

client.transport.sniff:是否为传输client添加嗅探功能;

client.transport.ignore_cluster_name 设为true,或略连接点的集群名称验证;

client.transport.ping_timeout 设置ping节点的时间超时时长,默认5s;

client.transport.nodes_sample_interval 设置sample/ping nodes listed间隔时间,默认5s。

初始化client的示例如下:

1)ClientTools.java(单利方式提供TransportClient对象,关于如何创建client参考《https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html》)

package com.dx.es;

import java.net.InetAddress;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

public class ClientTools {
    private static ClientTools instance=null;
    private TransportClient client=null;
    
    private ClientTools(){
        this.client=null;
        init();
    }
        
    public static synchronized ClientTools getInstance(){
        if(instance==null){
            instance=new ClientTools();
        }
        return instance;
    }
    
    public TransportClient get(){
        return client;
    }
    
    public void close(){
        if(null != client){
            try {
                client.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    private void init() {
        if(null != this.client){
            return;
        }
        
        try {
            Settings settings = Settings.builder()
                    .put("cluster.name",Config.getInstance().get("cluster.name"))
                    .put("client.transport.sniff", Boolean.valueOf(Config.getInstance().get("client.transport.sniff")))
                    .build();
            
            @SuppressWarnings("unchecked")
            PreBuiltTransportClient preBuiltTransportClient = new PreBuiltTransportClient(settings);
            
            this.client = preBuiltTransportClient;
            this.client.addTransportAddress(new TransportAddress(InetAddress.getByName(Config.getInstance().get("host1")), 9300));
            this.client.addTransportAddress(new TransportAddress(InetAddress.getByName(Config.getInstance().get("host2")), 9300));

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2)(es配置信息管理)

package com.dx.es;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;

public class Config {
    private static Config instance=null;
    private Map<String, String> confItems=null;
    
    private Config(){
        this.confItems=new HashMap<String, String>();
    
        init();
    }
    
    public static synchronized Config getInstance(){
        if(instance==null){
            instance=new Config();
        }
        return instance;
    }

    public String get(String key){
        if(!this.confItems.containsKey(key))
            return null;
        
        return this.confItems.get(key);
    }
    
    private void init() {        
        Properties prop = new Properties();     
        try{
            // 读取属性文件conf.properties
            InputStream in = new BufferedInputStream (new FileInputStream("E:\\spark_hadoop_cdh\\workspace\\ES_Client_API\\src\\main\\resources\\conf.properties"));
            // 加载属性列表
            prop.load(in);     
            Iterator<String> it=prop.stringPropertyNames().iterator();
            while(it.hasNext()){
                String key=it.next();
                System.out.println(key+":"+prop.getProperty(key));
                this.confItems.put(key, prop.getProperty(key));
            }
            in.close();          
        }
        catch(Exception e){
            System.out.println(e);
        }
    }
}
View Code

相关文章:

  • 2021-09-22
  • 2021-06-04
  • 2021-07-09
  • 2022-02-20
  • 2021-06-05
  • 2021-12-02
  • 2021-12-01
  • 2021-08-18
猜你喜欢
  • 2021-12-07
  • 2021-12-25
  • 2021-09-11
  • 2021-09-03
  • 2021-12-13
  • 2021-09-11
  • 2022-01-28
相关资源
相似解决方案