今日公司需求,需要将mysql更新实时同步到kafka中,后来又要将数据库中的一张表的变化实时同步到另一台mysql中,并且将数据库中的sql也同步到es中,于是乎canal与canal-adapter紧急解决,其中踩了不少坑,下面为总结内容

官方文档:https://github.com/alibaba/canal/wiki

前提:默认安装了es,本文采用6.8.8版本

需要先创建es的index及mapping

docker化canal-adapter

 

 

{
    "settings": {
        "number_of_shards": 5,
        "number_of_replicas": 2
    },
    "mappings": {
        "_doc": {
            "properties": {
                "personnel_name": {
                    "type": "text"
                },
                "personnel_num": {
                    "type": "keyword"
                }
            }
        }
    }
}

{
    "settings": {
        "number_of_shards": 5,
        "number_of_replicas": 2
    },
    "mappings": {
        "_doc": {
            "properties": {
                "clock_record_id": {
                    "type": "keyword"
                },
                "personnel_name": {
                    "type": "text"
                },
                "personnel_num": {
                    "type": "keyword"
                }
            }
        }
    }

一、canal-server镜像的创建及canal-server的compose文件

1、Dockerfile文件内容

FROM openjdk:8-jre-alpine
ADD [ "https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz", "/opt/canal-server/" ]
WORKDIR /opt/canal-server
EXPOSE 11110 11112
COPY ["entrypoint.sh", "/"]
RUN apk add bash tzdata \
&& cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \
&& apk del tzdata
VOLUME ["/opt/canal-server/logs", "/opt/canal-server/conf"]
ENTRYPOINT /entrypoint.sh

2、entrypoint.sh文件内容

#!/bin/bash

Base_dir=/opt/canal-server/conf
Log_dir=/opt/canal-server/logs

set -e
# 配置canal-server的运行模式,当前镜像支持tcp和kafka
if [ -n "${canal_serverMode}" ]; then
  sed -i "/^canal.serverMode/ s/serverMode.*/serverMode = ${canal_serverMode}/" ${Base_dir}/canal.properties
else
  echo "Invalid mode ${canal_serverMode}, This image support tcp and kafka mode now"
  exit 1
fi

if [ -n "${instances}" ]; then
  destinations=$(echo ${instances} | sed 's/ /,/g')
  sed -i "/^canal.destinations/ccanal.destinations = ${destinations}" ${Base_dir}/canal.properties
  for instance in ${instances}
  do
    declare -A dict
    ins_dic=$(eval echo '$'"{${instance}_dict}" | awk -F'"' '{print $2}')
    for kv in ${ins_dic}
    do
      k=`echo $kv | awk -F'=' '{print $1}'`
      v=`echo $kv | awk -F'=' '{print $2}'`
      dict[$k]=$v
    done
    if [ "${instance}" != "example" ]; then
      mkdir ${Base_dir}/${instance} && cp ${Base_dir}/example/* ${Base_dir}/${instance}/
      if [ ${canal_serverMode} = 'kafka' ]; then
        sed -i "/^canal.mq.servers/ccanal.mq.servers=${canal_mq_servers}" ${Base_dir}/canal.properties
        if [ -n "${dict[canal_mq_topic]}" ];then
          sed -i "/.*canal.mq.topic/ccanal.mq.topic=${dict[canal_mq_topic]}" ${Base_dir}/${instance}/instance.properties
        else
          sed -i "/^canal.mq.topic/d" ${Base_dir}/${instance}/instance.properties
          sed -i "/.*canal.mq.dynamicTopic=/ccanal.mq.dynamicTopic=${dict[canal_mq_dynamicTopic]}" ${Base_dir}/${instance}/instance.properties
        fi
      fi

      if [ -n "${dict[canal_instance_master_address]}" ]; then
        sed -i  "/^canal.instance.master.address=/ccanal.instance.master.address=${dict[canal_instance_master_address]}" ${Base_dir}/${instance}/instance.properties
      fi

      if [ -n "${dict[canal_instance_filter_regex]}" ]; then
        sed -i "/^canal.instance.filter.regex/ccanal.instance.filter.regex=${dict[canal_instance_filter_regex]}" ${Base_dir}/${instance}/instance.properties
      fi
    fi
  done
fi

/bin/sh /opt/canal-server/bin/startup.sh
sleep 3
tail -F /opt/canal-server/logs/canal/canal.log
View Code

相关文章: