最近正好做了有关Kettle中插件开发的工作,对Kettle插件的源码进行了一定的研究,并开发了自定义的插件,在此有些感悟,记录下来。

一 Kettle插件概述
Kettle的开发体系是基于插件的,平台本身提供了接口,开发者按照相关规范就可以开发出相应的插件添加到Kettle中使用,感觉这个体系设计思路很不错,非常有利于Kettle后续的扩展。
初次接触Kettle插件开发可以参考GitHub上有关插件模板DummyPlugin的源码,通过对源码的分析,发现Kettle插件开发的流程还是比较简单的,以DummyPlugin为例,主要包括以下几个类:

    DummyPlugin.java
    DummyPluginData.java
    DummyPluginDialog.java
    DummyPluginMeta.java

Kettle插件的开发遵循了MVC设计模式,其中DummyPlugin.java类实现了Control功能,当转换运行时,负责按照预设的逻辑处理输入数据;DummyPluginDialog.java类实现了View功能,即对话框的实现;而DummyPluginData.java和DummyPluginMeta.java用来存储用户在对话框的配置参数,实现了Model功能。

二 Kettle中的Solr插件开发
由于Kettle中没有预先集成Solr插件,因为项目开发的需要,对Solr插件进行了编写测试,主要功能是读取输入的数据流发送到Solr集群中,开发的插件也比较简单,分享一下。
其实主要就实现了3个类,SolrPluginMeta、SolrPluginDialog、SolrPlugin,分别实现Model、View、Control功能:

/**
 * SolrPluginMeta 类主要用来存储用户的配置数据,页面上的配置包括zk地址、collection名以及分区策略等
 */
public class SolrPluginMeta extends BaseStepMeta implements     StepMetaInterface{
    
    private String zkHost;
    
    private String collectionName;
    
    /**
     * 分区策略 0:不分区 1:字段分区 2:大小分区
     */
    private String mode;
    
    /**
     * 选择的分区字段
     */
    private String filedselected;
    
    /**
     * 每个shard的最大容量
     */
    private long countsize;
    
    public SolrPluginMeta(){
        super();
    }
    
    public void setZkHost(String zkHost) {
        this.zkHost = zkHost;
    }

    public String getZkHost() {
        return zkHost;
    }
    
    public void setCollectionName(String collectionName) {
        this.collectionName = collectionName;
    }

    public String getCollectionName() {
        return collectionName;
    }
    
    public void setMode(String mode) {
        this.mode = mode;
    }

    public String getMode() {
        return mode;
    }

    public void setFiledselected(String filedselected) {
        this.filedselected = filedselected;
    }

    public String getFiledselected() {
        return filedselected;
    }

    public void setCountsize(long countsize) {
        this.countsize = countsize;
    }

    public long getCountsize() {
        return countsize;
    }
    
    /**
     * 这个函数的作用是在复制SolrPluginMeta 时获取原对象参数的
     */
    public String getXML(){
        
        StringBuilder retval = new StringBuilder();
        retval.append("<values>").append(Const.CR);
        retval.append("    ").append(XMLHandler.addTagValue("zkHost", zkHost));
        retval.append("    ").append(XMLHandler.addTagValue("collectionName", collectionName));
        retval.append("    ").append(XMLHandler.addTagValue("filedselected", filedselected));
        retval.append("    ").append(XMLHandler.addTagValue("countsize", countsize));
        retval.append("    ").append(XMLHandler.addTagValue("mode", mode));
        retval.append("</values>").append(Const.CR);
        return retval.toString();
    }
    
    public Object clone(){
        return super.clone();
    }
    
    /**
     * 复制对象时传递参数
     */
    public void loadXML(Node stepnode, List<DatabaseMeta>
    databases, Map<String,Counter> counters){
        
        Node valnode  = XMLHandler.getSubNode(stepnode, "values", "zkHost");
        if(null!=valnode){
            zkHost = valnode.getTextContent();
        }
        valnode  = XMLHandler.getSubNode(stepnode, "values", "collectionName");
        if(null!=valnode){
            collectionName = valnode.getTextContent();
        }
        valnode  = XMLHandler.getSubNode(stepnode, "values", "filedselected");
        if(null!=valnode){
            filedselected = valnode.getTextContent();
        }
        valnode  = XMLHandler.getSubNode(stepnode, "values", "countsize");
        if(null!=valnode){
            countsize = Long.parseLong(valnode.getTextContent());
        }
        valnode  = XMLHandler.getSubNode(stepnode, "values", "mode");
        if(null!=valnode){
            mode = valnode.getTextContent();
        }
    }
    
    @Override
    public void setDefault() {
        this.zkHost = "localhost:2181,localhost:2182,localhost:2183";
        this.collectionName = "collection1234";
        this.mode = "0";
    }
    
    public StepDialogInterface getDialog(Shell shell, StepMetaInterface meta, 
                     TransMeta transMeta, String name){
        return new SolrPluginDialog(shell, meta, transMeta, name);
    }

    @Override
    public StepInterface getStep(StepMeta stepMeta, StepDataInterface stepDataInterface, 
                     int cnr, TransMeta transMeta, Trans disp) {
        return new SolrPlugin(stepMeta, stepDataInterface, cnr, transMeta, disp);
    }

    @Override
    public StepDataInterface getStepData() {
        return new SolrPluginData();
    }
}

SolrPluginMeta 类存储了用户的配置信息,在开发中Solr的分区分为三种模式:不分区(所有记录发送到一个shard中)、字段分区(每个字段单独一个分区)、大小分区(指定数量的记录划分在一个分区中)。
当从Kettle中拖拽一个插件到面板上时,其实就生成了一个SolrPluginMeta 对象,这个对象将存储用户在对话框中输入的配置信息,而当转换运行时,Kettle会重新生成一个SolrPluginMeta 对象并获取原对象的配置参数(这一点我还不太明白为啥Kettle采用这种方式),因此getXML()和loadXML函数就是在复制配置参数时使用的。
SolrPluginDialog就是编写对话框供用户输入参数,并且将参数保存到SolrPluginMeta中,具体代码就不帖出来了。
SolrPlugin类也比较简单,是转换操作的核心逻辑,其实主要的方法就是processRow,Kettle中数据按照流的形式传递,因此processRow方法会分批次对输入流进行处理。

    public class SolrPlugin extends BaseStep implements StepInterface{
    
    private SolrPluginData data;
    private SolrPluginMeta meta;
    
    /**
     * Zk集群地址
     */
    private String zkHost;
    
    /**
     * collection名
     */
    private String collectionName;
    
    /**
     * 输入的数据总量
     */
    private long send_count = 0l;
    /**
     * 字段名列表
     */
    private String[] fieldNames;
    
    /**
     * shard与发送文档的映射
     */
    private Map<String, List<SolrInputDocument>> send_list;
    
    /**
     * 当前shard与hostIp的映射
     */
    private Map<String, String> shard_hostIp;
    
    /**
     * shard与对应的solr地址的映射
     */
    private Map<String, HttpSolrClient> solrserver_url;

    public SolrPlugin(StepMeta s, StepDataInterface stepDataInterface, 
                      int c, TransMeta t, Trans dis){
        super(s,stepDataInterface,c,t,dis);
    }
    
    public boolean init(StepMetaInterface smi, StepDataInterface sdi){
        meta = (SolrPluginMeta)smi;
        data = (SolrPluginData)sdi;
        return super.init(smi, sdi);
    }
    
    public void dispose(StepMetaInterface smi, StepDataInterface sdi){
        meta = (SolrPluginMeta)smi;
        data = (SolrPluginData)sdi;
        super.dispose(smi, sdi);
    }
    
    /**
     * 获取route字段
     * @param mode 分区策略
     * @param doc  输入文档
     * @param site 文档位置
     * @return
     */
    public String getRoute(String mode, SolrInputDocument doc, long site){
        
        //不分区模式
        if(mode.equals("0")){
            return "shard1";
         //字段分区
        }else if(mode.equals("1")){
            
            String filed = meta.getFiledselected();
            String shardname = doc.getFieldValue(filed)==null ? "shard1" : 
                                      doc.getFieldValue(filed).toString();
            return PinyinUtil.getInstance().getStringPinyin(shardname);
         //大小分区
        }else{
            long shard_num = meta.getCountsize();
            int index = (int)(site/shard_num)+1;
            return "shard"+index;
        }    
    }
    
    /**
     * 发送本地缓存的list至solr中
     * @param doclist
     */
    public void sendList(Map<String, List<SolrInputDocument>> doclist) throws KettleException{
        
        if(null==doclist){
            return;
        }
        
        for(String shard : doclist.keySet()){
            if(StringUtils.isEmpty(shard)){
                continue;
            }
            
            //获取该shard对应的hostIp
            String hostIp = shard_hostIp.get(shard);
            if(StringUtils.isEmpty(hostIp)){
                logBasic("准备创建shard:"+shard);
                SolrService.createShard(zkHost, collectionName, shard, this);
                hostIp = SolrService.getCollectionShardInfo(zkHost, collectionName, shard, this);
                shard_hostIp.put(shard, hostIp);
            }
            
            //获取shard对应的url
            HttpSolrClient client = solrserver_url.get(shard);
            if(client==null){
                String url = "http://"+hostIp+"/solr/"+collectionName;
                client = new HttpSolrClient(url);
                solrserver_url.put(shard, client);
            }
            
            long time = System.currentTimeMillis();
            //待发送的文档集合
            List<SolrInputDocument> list = doclist.get(shard);
            
            if(list.size()<=0){
                continue;
            }
            
            try {
                client.add(list);
                client.commit();
            } catch (Exception e) {
                logError(String.format("发送到shard:%s出错,地址:%s,原因:%s", 
                                        shard, client.getBaseURL(), e.getMessage()));
                throw new KettleException(e.getMessage());
            } 
            logBasic(String.format("成功发送到%s %s条记录, 耗时%s毫秒", shard, list.size(), 
                                    System.currentTimeMillis()-time));
            list.clear();
        }
    }
    
    public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException{
        meta = (SolrPluginMeta)smi;
        data = (SolrPluginData)sdi;
        
        //获取上一个步骤的输入流
        Object[] r=getRow();
        if(r==null){
            logBasic("无输入数据");
            sendList(send_list);
            setOutputDone();
            return false;
        }
        
        //first为true则表明是第一行数据,可以在此完成相关的初始化工作
        if(first){
            first = false;
            
            zkHost = meta.getZkHost();
            collectionName = meta.getCollectionName();
            
            SolrService.createCollection(zkHost, collectionName, this);
            
            //获取输入字段名集合
            RowMetaInterface fields = getInputRowMeta().clone();
            fieldNames = fields.getFieldNames();
            for(String o : fieldNames){
                logBasic(o);
            }
            
            send_list = new HashMap<String, List<SolrInputDocument>>();
            solrserver_url = new HashMap<String, HttpSolrClient>();
            shard_hostIp = new HashMap<String, String>();
            
            String hostIp = SolrService.getCollectionShardInfo(zkHost, collectionName, "shard1", this);
            shard_hostIp.put("shard1", hostIp);
        }
        
        if(r.length<fieldNames.length){
            logError("输入数据有误, 本次数据忽略");
            return true;
        }
        
        //存储输入数据至document
        SolrInputDocument input = new SolrInputDocument();
        for(int i=0; i<fieldNames.length; i++){
            input.addField(fieldNames[i], r[i]);
        }
        String shardname = getRoute(meta.getMode(), input, ++send_count);
        input.addField("_route_", shardname);
        
        List<SolrInputDocument> documentlist = send_list.get(shardname);
        if(documentlist==null){
            documentlist = new ArrayList<SolrInputDocument>();
            send_list.put(shardname, documentlist);
        }
        documentlist.add(input);
        
        if(send_count%20000==0){
            sendList(send_list);
        }
        return true;
    }
}

processRow返回true则表明数据处理没有结束,则Kettle会继续调用processRow处理输入数据;返回false则表明处理完成,记住在返回false之前要调用基类的setOutputDone()方法。

三 插件部署到Kettle中
源码写好后,打成jar包,接下来还要编写plugin.xml配置文档:

<?xml version="1.0" encoding="UTF-8"?>
<plugin id="SolrPlugin" iconfile="solr.png" description="SolrPlugin" 
tooltip="This is a solr plugin step" category="TestDemo"
classname="com.alibaba.kettle.solr.SolrPluginMeta" >
    
    <libraries>
      <library name="SolrPlugin.jar"/>
    </libraries>

    <localized_category>
      <category locale="en_US">TestDemo</category>
      <category locale="zh_CN">插件测试</category>
    </localized_category>
    
    <localized_description>
      <description locale="en_US">SolrPlugin</description>
    </localized_description>

    <localized_tooltip>
      <tooltip locale="en_US">发送记录到Solr中</tooltip>
    </localized_tooltip>
</plugin>

其中的id是插件注册的标识,iconfile指定了插件的图标,classname指定了插件的入口类,就是SolrPluginMeta类;<localized_category>指定了插件在Kettle左侧列表中的位置。将打好的jar包、plugin.xml配置文件、图标等放置在单独的文件夹中,并将该文件夹Kettle目录下的plugins\steps中(如果没有steps目录则新建),重启Kettle就可看到自定义的插件:

Kettle插件开发流程

四 总结
Kettle插件的开发并不复杂,掌握了基本的开发流程就可以自己开发需要的插件了,写得比较乱,欢迎各位多多交流指正。
 

相关文章: