今日公司需求,需要将mysql更新实时同步到kafka中,后来又要将数据库中的一张表的变化实时同步到另一台mysql中,并且将数据库中的sql也同步到es中,于是乎canal与canal-adapter紧急解决,其中踩了不少坑,下面为总结内容
官方文档:https://github.com/alibaba/canal/wiki
前提:默认安装了es,本文采用6.8.8版本
需要先创建es的index及mapping
{ "settings": { "number_of_shards": 5, "number_of_replicas": 2 }, "mappings": { "_doc": { "properties": { "personnel_name": { "type": "text" }, "personnel_num": { "type": "keyword" } } } } }
{ "settings": { "number_of_shards": 5, "number_of_replicas": 2 }, "mappings": { "_doc": { "properties": { "clock_record_id": { "type": "keyword" }, "personnel_name": { "type": "text" }, "personnel_num": { "type": "keyword" } } } }
一、canal-server镜像的创建及canal-server的compose文件
1、Dockerfile文件内容
FROM openjdk:8-jre-alpine
ADD [ "https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz", "/opt/canal-server/" ]
WORKDIR /opt/canal-server
EXPOSE 11110 11112
COPY ["entrypoint.sh", "/"]
RUN apk add bash tzdata \
&& cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \
&& apk del tzdata
VOLUME ["/opt/canal-server/logs", "/opt/canal-server/conf"]
ENTRYPOINT /entrypoint.sh
2、entrypoint.sh文件内容
#!/bin/bash Base_dir=/opt/canal-server/conf Log_dir=/opt/canal-server/logs set -e # 配置canal-server的运行模式,当前镜像支持tcp和kafka if [ -n "${canal_serverMode}" ]; then sed -i "/^canal.serverMode/ s/serverMode.*/serverMode = ${canal_serverMode}/" ${Base_dir}/canal.properties else echo "Invalid mode ${canal_serverMode}, This image support tcp and kafka mode now" exit 1 fi if [ -n "${instances}" ]; then destinations=$(echo ${instances} | sed 's/ /,/g') sed -i "/^canal.destinations/ccanal.destinations = ${destinations}" ${Base_dir}/canal.properties for instance in ${instances} do declare -A dict ins_dic=$(eval echo '$'"{${instance}_dict}" | awk -F'"' '{print $2}') for kv in ${ins_dic} do k=`echo $kv | awk -F'=' '{print $1}'` v=`echo $kv | awk -F'=' '{print $2}'` dict[$k]=$v done if [ "${instance}" != "example" ]; then mkdir ${Base_dir}/${instance} && cp ${Base_dir}/example/* ${Base_dir}/${instance}/ if [ ${canal_serverMode} = 'kafka' ]; then sed -i "/^canal.mq.servers/ccanal.mq.servers=${canal_mq_servers}" ${Base_dir}/canal.properties if [ -n "${dict[canal_mq_topic]}" ];then sed -i "/.*canal.mq.topic/ccanal.mq.topic=${dict[canal_mq_topic]}" ${Base_dir}/${instance}/instance.properties else sed -i "/^canal.mq.topic/d" ${Base_dir}/${instance}/instance.properties sed -i "/.*canal.mq.dynamicTopic=/ccanal.mq.dynamicTopic=${dict[canal_mq_dynamicTopic]}" ${Base_dir}/${instance}/instance.properties fi fi if [ -n "${dict[canal_instance_master_address]}" ]; then sed -i "/^canal.instance.master.address=/ccanal.instance.master.address=${dict[canal_instance_master_address]}" ${Base_dir}/${instance}/instance.properties fi if [ -n "${dict[canal_instance_filter_regex]}" ]; then sed -i "/^canal.instance.filter.regex/ccanal.instance.filter.regex=${dict[canal_instance_filter_regex]}" ${Base_dir}/${instance}/instance.properties fi fi done fi /bin/sh /opt/canal-server/bin/startup.sh sleep 3 tail -F /opt/canal-server/logs/canal/canal.log