大致思路:
canal去mysql拉取数据,放在canal所在的节点上,并且自身对外提供一个tcp服务,我们只要写一个连接该服务的客户端,去拉取数据并且指定往kafka写数据的格式就能达到以protobuf的格式往kafka中写数据的要求。
1. 配置canal(/bigdata/canal/conf/canal.properties),然后启动canal,这样就会开启一个tcp服务
2. 写拉取数据的客户端代码
PbOfCanalToKafka
package cn._51doit.flink.canal; import cn._51doit.proto.OrderDetailProto; import com.alibaba.google.common.base.CaseFormat; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; public class PbOfCanalToKafka { public static void main(String[] args) throws Exception { CanalConnector canalConnector = CanalConnectors.newSingleConnector((new InetSocketAddress("192.168.57.12", 11111)), "example", "canal", "canal123"); // 1 配置参数 Properties props = new Properties(); //连接kafka节点 props.setProperty("bootstrap.servers", "feng05:9092,feng06:9092,feng07:9092"); props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props); while (true) { //建立连接 canalConnector.connect(); //订阅bigdata数据库下的所有表 canalConnector.subscribe("doit.orderdetail"); //每100毫秒拉取一次数据 Message message = canalConnector.get(10); if (message.getEntries().size() > 0) { // System.out.println(message); List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { //获取表名 String tableName = entry.getHeader().getTableName(); CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); // System.out.println(rowDatasList); //判断对数据库操作的类型,这里只采集INSERT/update的数据 OrderDetailProto.OrderDetail.Builder bean = OrderDetailProto.OrderDetail.newBuilder(); CanalEntry.EventType eventType = rowChange.getEventType(); if (eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE) { for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); System.out.println("======================打印afterColumnsList=============================="); System.out.println(afterColumnsList); Map<String, String> kv = new HashMap<String, String>(); for (CanalEntry.Column column : afterColumnsList) { String propertyName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName()); kv.put(propertyName, column.getValue()); } // 设置属性 bean.setAmount(Integer.parseInt(kv.get("amount"))); bean.setMoney(Double.parseDouble(kv.get("money"))); bean.setOrderId(Long.parseLong(kv.get("orderId"))); bean.setCreateTime(kv.get("createTime")); bean.setUpdateTime(kv.get("updateTime")); bean.setId(Integer.parseInt(kv.get("id"))); bean.setSku(Long.parseLong(kv.get("sku"))); bean.setCategoryId(Integer.parseInt(kv.get("categoryId"))); //将数据转成JSON格式,然后用Kafka的Producer发送出去 byte[] bytes = bean.build().toByteArray(); ProducerRecord<String, byte[]> record = new ProducerRecord<>(tableName, bytes); producer.send(record); } } } } } } }