友快網

導航選單

使用flink zookeeper單節點環境,實現一臺4g 1u的虛擬機器的虛擬機器部署

一。 背景介紹

本文將介紹如何將 MySQL 中的資料,透過 Binlog + Canal 的形式匯入到 Kafka 中,繼而被 Flink 消費的案例。

為了能夠快速的驗證整套流程的功能性,所有的元件都以單機的形式部署。如果手上的物理資源不足,可以將本文中的所有元件一臺 4G 1U 的虛擬機器環境中。

如果需要在生產環境中部署,建議將每一個元件替換成高可用的叢集部署方案。

其中,我們單獨建立了一套 Zookeeper 單節點環境,Flink、Kafka、Canal 等元件共用這個 Zookeeper 環境。

針對於所有需要 JRE 的元件,如 Flink,Kafka,Canal,Zookeeper,考慮到升級 JRE 可能會影響到其他的應用,我們選擇每個元件獨立使用自己的 JRE 環境。

本文分為兩個部分,其中,前七小節主要介紹基礎環境的搭建,最後一個小節介紹了資料是如何在各個元件中流通的。

資料的流動經過以下元件:

MySQL 資料來源生成 Binlog。

Canal 讀取 Binlog,生成 Canal json,推送到 Kafka 指定的 Topic 中。

Flink 使用 flink-sql-connector-kafka API,消費 Kafka Topic 中的資料。

Flink 在透過 flink-connector-jdbc,將資料寫入到 TiDB 中。

TiDB + Flink 的結構,支援開發與執行多種不同種類的應用程式。

目前主要的特性主要包括:

批流一體化。

精密的狀態管理。

事件時間支援。

精確的一次狀態一致性保障。

Flink 可以執行在包括 YARN、Mesos、Kubernetes 在內的多種資源管理框架上,還支援裸機叢集上獨立部署。TiDB 可以部署 AWS、Kubernetes、GCP GKE 上,同時也支援使用 TiUP 在裸機叢集上獨立部署。

TiDB + Flink 結構常見的幾類應用如下:

事件驅動型應用:反欺詐。異常檢測。基於規則的報警。業務流程監控。

資料分析應用:網路質量監控。產品更新及試驗評估分析。事實資料即席分析。大規模圖分析。

資料管道應用:電商實時查詢索引構建。電商持續 ETL。

二。 環境介紹

2。1 作業系統環境

[root@r20 topology]# cat /etc/redhat-releaseCentOS Stream release 8

2。2 軟體環境

Item

Version

Download link

TiDB

v4。0。9

https://download。pingcap。org/tidb-community-server-v4。0。9-linux-amd64。tar。gz 1

Kafka

v2。7。0

https://mirrors。bfsu。edu。cn/apache/kafka/2。7。0/kafka_2。13-2。7。0。tgz

Flink

v1。12。1

https://mirrors。tuna。tsinghua。edu。cn/apache/flink/flink-1。12。1/flink-1。12。1-bin-scala_2。11。tgz

Jre

v1。8。0_281

https://javadl。oracle。com/webapps/download/AutoDL?BundleId=244058_89d678f2be164786b292527658ca1605

Zookeeper

v3。6。2

https://mirrors。tuna。tsinghua。edu。cn/apache/zookeeper/zookeeper-3。6。2/apache-zookeeper-3。6。2-bin。tar。gz

flink-sql-connector-kafka

v1。12。1

https://repo1。maven。org/maven2/org/apache/flink/flink-sql-connector-kafka_2。12/1。12。0/flink-sql-connector-kafka_2。12-1。12。0。jar

flink-connector-jdbc

v1。12。0

https://repo1。maven。org/maven2/org/apache/flink/flink-connector-jdbc_2。12/1。12。0/flink-connector-jdbc_2。12-1。12。0。jar

MySQL

v8。0。23

https://dev。mysql。com/get/Downloads/MySQL-8。0/mysql-8。0。23-linux-glibc2。12-x86_64。tar。xz

Canal

v1。1。4

https://github。com/alibaba/canal/releases/download/canal-1。1。4/canal。deployer-1。1。4。tar。gz

2。3 機器分配

Hostname

IP

Component

r21

192。168。12。21

TiDB Cluster

r22

192。168。12。22

Kafka

r23

192。168。12。23

Flink

r24

192。168。12。24

Zookeeper

r25

192。168。12。25

MySQL

r26

192。168。12。26

Canal

三。 部署 TiDB Cluster

與傳統的單機資料庫相比,TiDB 具有以下優勢:

純分散式架構,擁有良好的擴充套件性,支援彈性的擴縮容。

支援 SQL,對外暴露 MySQL 的網路協議,併兼容大多數 MySQL 的語法,在大多數場景下可以直接替換 MySQL。

預設支援高可用,在少數副本失效的情況下,資料庫本身能夠自動進行資料修復和故障轉移,對業務透明。

支援 ACID 事務,對於一些有強一致需求的場景友好,例如:銀行轉賬。

具有豐富的工具鏈生態,覆蓋資料遷移、同步、備份等多種場景。

在核心設計上,TiDB 分散式資料庫將整體架構拆分成了多個模組,各模組之間互相通訊,組成完整的 TiDB 系統。對應的架構圖如下:

在本文中,我們只做最簡單的功能測試,所以部署了一套單節點但副本的 TiDB,涉及到了以下的三個模組:

TiDB Server:SQL 層,對外暴露 MySQL 協議的連線 endpoint,負責接受客戶端的連線,執行 SQL 解析和最佳化,最終生成分散式執行計劃。

PD (Placement Driver) Server:整個 TiDB 叢集的元資訊管理模組,負責儲存每個 TiKV 節點實時的資料分佈情況和叢集的整體拓撲結構,提供 TiDB Dashboard 管控介面,併為分散式事務分配事務 ID。

TiKV Server:負責儲存資料,從外部看 TiKV 是一個分散式的提供事務的 Key-Value 儲存引擎。

3。1 TiUP 部署模板檔案

# # Global variables are applied to all deployments and used as the default value of# # the deployments if a specific deployment value is missing。global: user: “tidb” ssh_port: 22 deploy_dir: “/opt/tidb-c1/” data_dir: “/opt/tidb-c1/data/”# # Monitored variables are applied to all the machines。#monitored:# node_exporter_port: 19100# blackbox_exporter_port: 39115# deploy_dir: “/opt/tidb-c3/monitored”# data_dir: “/opt/tidb-c3/data/monitored”# log_dir: “/opt/tidb-c3/log/monitored”# # Server configs are used to specify the runtime configuration of TiDB components。# # All configuration items can be found in TiDB docs:# # - TiDB: https://pingcap。com/docs/stable/reference/configuration/tidb-server/configuration-file/# # - TiKV: https://pingcap。com/docs/stable/reference/configuration/tikv-server/configuration-file/# # - PD: https://pingcap。com/docs/stable/reference/configuration/pd-server/configuration-file/# # All configuration items use points to represent the hierarchy, e。g:# # readpool。storage。use-unified-pool# ## # You can overwrite this configuration via the instance-level `config` field。server_configs: tidb: log。slow-threshold: 300 binlog。enable: false binlog。ignore-error: false tikv-client。copr-cache。enable: true tikv: server。grpc-concurrency: 4 raftstore。apply-pool-size: 2 raftstore。store-pool-size: 2 rocksdb。max-sub-compactions: 1 storage。block-cache。capacity: “16GB” readpool。unified。max-thread-count: 12 readpool。storage。use-unified-pool: false readpool。coprocessor。use-unified-pool: true raftdb。rate-bytes-per-sec: 0 pd: schedule。leader-schedule-limit: 4 schedule。region-schedule-limit: 2048 schedule。replica-schedule-limit: 64pd_servers: - host: 192。168。12。21 ssh_port: 22 name: “pd-2” client_port: 12379 peer_port: 12380 deploy_dir: “/opt/tidb-c1/pd-12379” data_dir: “/opt/tidb-c1/data/pd-12379” log_dir: “/opt/tidb-c1/log/pd-12379” numa_node: “0” # # The following configs are used to overwrite the `server_configs。pd` values。 config: schedule。max-merge-region-size: 20 schedule。max-merge-region-keys: 200000tidb_servers: - host: 192。168。12。21 ssh_port: 22 port: 14000 status_port: 12080 deploy_dir: “/opt/tidb-c1/tidb-14000” log_dir: “/opt/tidb-c1/log/tidb-14000” numa_node: “0” # # The following configs are used to overwrite the `server_configs。tidb` values。 config: log。slow-query-file: tidb-slow-overwrited。log tikv-client。copr-cache。enable: truetikv_servers: - host: 192。168。12。21 ssh_port: 22 port: 12160 status_port: 12180 deploy_dir: “/opt/tidb-c1/tikv-12160” data_dir: “/opt/tidb-c1/data/tikv-12160” log_dir: “/opt/tidb-c1/log/tikv-12160” numa_node: “0” # # The following configs are used to overwrite the `server_configs。tikv` values。 config: server。grpc-concurrency: 4 #server。labels: { zone: “zone1”, dc: “dc1”, host: “host1” }#monitoring_servers:# - host: 192。168。12。21# ssh_port: 22# port: 19090# deploy_dir: “/opt/tidb-c1/prometheus-19090”# data_dir: “/opt/tidb-c1/data/prometheus-19090”# log_dir: “/opt/tidb-c1/log/prometheus-19090”#grafana_servers:# - host: 192。168。12。21# port: 13000# deploy_dir: “/opt/tidb-c1/grafana-13000”#alertmanager_servers:# - host: 192。168。12。21# ssh_port: 22# web_port: 19093# cluster_port: 19094# deploy_dir: “/opt/tidb-c1/alertmanager-19093”# data_dir: “/opt/tidb-c1/data/alertmanager-19093”# log_dir: “/opt/tidb-c1/log/alertmanager-19093”

3。2 TiDB Cluster 環境

本文重點非部署 TiDB Cluster,作為快速實驗環境,只在一臺機器上部署單副本的 TiDB Cluster 叢集。不需要部署監控環境。

[root@r20 topology]# tiup cluster display tidb-c1-v409Starting component `cluster`: /root/。tiup/components/cluster/v1。3。2/tiup-cluster display tidb-c1-v409Cluster type: tidbCluster name: tidb-c1-v409Cluster version: v4。0。9SSH type: builtinDashboard URL: http://192。168。12。21:12379/dashboardID Role Host Ports OS/Arch Status Data Dir Deploy Dir—— —— —— ——- ————- ———— ———— ——————192。168。12。21:12379 pd 192。168。12。21 12379/12380 linux/x86_64 Up|L|UI /opt/tidb-c1/data/pd-12379 /opt/tidb-c1/pd-12379192。168。12。21:14000 tidb 192。168。12。21 14000/12080 linux/x86_64 Up - /opt/tidb-c1/tidb-14000192。168。12。21:12160 tikv 192。168。12。21 12160/12180 linux/x86_64 Up /opt/tidb-c1/data/tikv-12160 /opt/tidb-c1/tikv-12160Total nodes: 4

建立用於測試的表

mysqlshow create table t1;+————-+————————————————————————————————————————————————————————————————-+| Table | Create Table |+————-+————————————————————————————————————————————————————————————————-+| t1 | CREATE TABLE `t1` ( `id` int(11) NOT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin |+————-+————————————————————————————————————————————————————————————————-+1 row in set (0。00 sec)

四。 部署 Zookeeper 環境

在本實驗中單獨配置 Zookeeper 環境,為 Kafka 和 Flink 環境提供服務。

作為實驗演示方案,只部署單機環境。

4。1 解壓 Zookeeper 包

[root@r24 soft]# tar vxzf apache-zookeeper-3。6。2-bin。tar。gz[root@r24 soft]# mv apache-zookeeper-3。6。2-bin /opt/zookeeper

4。2 部署用於 Zookeeper 的 jre

[root@r24 soft]# tar vxzf jre1。8。0_281。tar。gz[root@r24 soft]# mv jre1。8。0_281 /opt/zookeeper/jre

修改 /opt/zookeeper/bin/zkEnv。sh 檔案,增加 JAVA_HOME 環境變數

## add bellowing env var in the head of zkEnv。shJAVA_HOME=/opt/zookeeper/jre

4。3 建立 Zookeeper 的配置檔案

[root@r24 conf]# cat zoo。cfg | grep -v “#”tickTime=2000initLimit=10syncLimit=5dataDir=/opt/zookeeper/dataclientPort=2181

4。4 啟動 Zookeeper

[root@r24 bin]# /opt/zookeeper/bin/zkServer。sh start

4。5 檢查 Zookeeper 的狀態

## check zk status[root@r24 bin]# 。/zkServer。sh statusZooKeeper JMX enabled by defaultUsing config: /opt/zookeeper/bin/。。/conf/zoo。cfgClient port found: 2181。 Client address: localhost。 Client SSL: false。Mode: standalone## check OS port status[root@r24 bin]# netstat -ntlpActive Internet connections (only servers)Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program nametcp 0 0 0。0。0。0:22 0。0。0。0:* LISTEN 942/sshdtcp6 0 0 :::2181 :::* LISTEN 15062/javatcp6 0 0 :::8080 :::* LISTEN 15062/javatcp6 0 0 :::22 :::* LISTEN 942/sshdtcp6 0 0 :::44505 :::* LISTEN 15062/java## use zkCli tool to check zk connection[root@r24 bin]# 。/zkCli。sh -server 192。168。12。24:2181

4。6 關於 Zookeeper 的建議

我個人有一個關於 Zookeeper 的不成熟的小建議:

Zookeeper 叢集版本一定要開啟網路監控。特別是要關注 system metrics 裡面的 network bandwidth。

五。 部署 Kafka

Kafka 是一個分散式流處理平臺,主要應用於兩大類的應用中:

構造實時流資料管道,它可以在系統或應用之間可靠地獲取資料。 (相當於message queue)

構建實時流式應用程式,對這些流資料進行轉換或者影響。 (就是流處理,透過kafka stream topic和topic之間內部進行變化)

Kafka 有四個核心的 API:

The Producer API 允許一個應用程式釋出一串流式的資料到一個或者多個Kafka topic。

The Consumer API 允許一個應用程式訂閱一個或多個 topic ,並且對釋出給他們的流式資料進行處理。

The Streams API 允許一個應用程式作為一個流處理器,消費一個或者多個topic產生的輸入流,然後生產一個輸出流到一個或多個topic中去,在輸入輸出流中進行有效的轉換。

The Connector API 允許構建並執行可重用的生產者或者消費者,將Kafka topics連線到已存在的應用程式或者資料系統。比如,連線到一個關係型資料庫,捕捉表(table)的所有變更內容。

在本實驗中只做功能性驗證,只搭建一個單機版的 Kafka 環境。

5。1 下載並解壓 Kafka

[root@r22 soft]# tar vxzf kafka_2。13-2。7。0。tgz[root@r22 soft]# mv kafka_2。13-2。7。0 /opt/kafka

5。2 部署用於 Kafka 的 jre

[root@r22 soft]# tar vxzf jre1。8。0_281。tar。gz[root@r22 soft]# mv jre1。8。0_281 /opt/kafka/jre

修改 Kafka 的 jre 環境變數

[root@r22 bin]# vim /opt/kafka/bin/kafka-run-class。sh## add bellowing line in the head of kafka-run-class。shJAVA_HOME=/opt/kafka/jre

5。3 修改 Kafka 配置檔案

修改 Kafka 配置檔案 /opt/kafka/config/server。properties

## change bellowing variable in /opt/kafka/config/server。propertiesbroker。id=0listeners=PLAINTEXT://192。168。12。22:9092log。dirs=/opt/kafka/logszookeeper。connect=i192。168。12。24:2181

5。4 啟動 Kafka

[root@r22 bin]# /opt/kafka/bin/kafka-server-start。sh /opt/kafka/config/server。properties

5。5 檢視 Kafka 的版本資訊

Kafka 並沒有提供 ——version 的 optional 來檢視 Kafka 的版本資訊。

[root@r22 ~]# ll /opt/kafka/libs/ | grep kafka-rw-r——r—— 1 root root 4929521 Dec 16 09:02 kafka_2。13-2。7。0。jar-rw-r——r—— 1 root root 821 Dec 16 09:03 kafka_2。13-2。7。0。jar。asc-rw-r——r—— 1 root root 41793 Dec 16 09:02 kafka_2。13-2。7。0-javadoc。jar-rw-r——r—— 1 root root 821 Dec 16 09:03 kafka_2。13-2。7。0-javadoc。jar。asc-rw-r——r—— 1 root root 892036 Dec 16 09:02 kafka_2。13-2。7。0-sources。jar-rw-r——r—— 1 root root 821 Dec 16 09:03 kafka_2。13-2。7。0-sources。jar。asc。。。 。。。

其中 2。13 是 scale 的版本資訊,2。7。0 是 Kafka 的版本資訊。

六。 部署 Flink

Apache Flink 是一個框架和分散式處理引擎,用於在無邊界和有邊界資料流上進行有狀態的計算。Flink 能在所有常見叢集環境中執行,並能以記憶體速度和任意規模進行計算。

支援高吞吐、低延遲、高效能的分散式處理框架 Apache Flink 是一個框架和分散式處理引擎,用於對無界和有界資料流進行有狀態計算。Flink被設計在所有常見的叢集環境中執行,以記憶體執行速度和任意規模來執行計算。

本實驗只做功能性測試,僅部署單機 Flink 環境。

6。1 下載並分發 Flink

[root@r23 soft]# tar vxzf flink-1。12。1-bin-scala_2。11。tgz[root@r23 soft]# mv flink-1。12。1 /opt/flink

6。2 部署 Flink 的 jre

[root@r23 soft]# tar vxzf jre1。8。0_281。tar。gz[root@r23 soft]# mv jre1。8。0_281 /opt/flink/jre

6。3 新增 Flink 需要的 lib

Flink 消費 Kafka 資料,需要 flink-sql-connector-kafka 包。

Flink 連結 MySQL/TiDB,需要 flink-connector-jdbc 包。

[root@r23 soft]# mv flink-sql-connector-kafka_2。12-1。12。0。jar /opt/flink/lib/[root@r23 soft]# mv flink-connector-jdbc_2。12-1。12。0。jar /opt/flink/lib/

6。4 修改 Flink 配置檔案

## add or modify bellowing lines in /opt/flink/conf/flink-conf。yamljobmanager。rpc。address: 192。168。12。23env。java。home: /opt/flink/jre

6。5 啟動 Flink

[root@r23 ~]# /opt/flink/bin/start-cluster。shStarting cluster。Starting standalonesession daemon on host r23。Starting taskexecutor daemon on host r23。

6。6 檢視 Flink GUI

七。 部署 MySQL

7。1 解壓 MySQL package

[root@r25 soft]# tar vxf mysql-8。0。23-linux-glibc2。12-x86_64。tar。xz[root@r25 soft]# mv mysql-8。0。23-linux-glibc2。12-x86_64 /opt/mysql/

7。2 建立 MySQL Service 檔案

[root@r25 ~]# touch /opt/mysql/support-files/mysqld。service[root@r25 support-files]# cat mysqld。service[Unit]Description=MySQL 8。0 database serverAfter=syslog。targetAfter=network。target[Service]Type=simpleUser=mysqlGroup=mysql#ExecStartPre=/usr/libexec/mysql-check-socket#ExecStartPre=/usr/libexec/mysql-prepare-db-dir %n# Note: we set ——basedir to prevent probes that might trigger SELinux alarms,# per bug #547485ExecStart=/opt/mysql/bin/mysqld_safe#ExecStartPost=/opt/mysql/bin/mysql-check-upgrade#ExecStopPost=/opt/mysql/bin/mysql-wait-stop# Give a reasonable amount of time for the server to start up/shut downTimeoutSec=300# Place temp files in a secure directory, not /tmpPrivateTmp=trueRestart=on-failureRestartPreventExitStatus=1# Sets open_files_limitLimitNOFILE = 10000# Set enviroment variable MYSQLD_PARENT_PID。 This is required for SQL restart command。Environment=MYSQLD_PARENT_PID=1[Install]WantedBy=multi-user。target## copy mysqld。service to /usr/lib/systemd/system/[root@r25 support-files]# cp mysqld。service /usr/lib/systemd/system/

7。3 建立 my。cnf 檔案

[root@r34 opt]# cat /etc/my。cnf[mysqld]port=3306basedir=/opt/mysqldatadir=/opt/mysql/datasocket=/opt/mysql/data/mysql。socketmax_connections = 100default-storage-engine = InnoDBcharacter-set-server=utf8log-error = /opt/mysql/log/error。logslow_query_log = 1long-query-time = 30slow_query_log_file = /opt/mysql/log/show。logmin_examined_row_limit = 1000log-slow-slave-statementslog-queries-not-using-indexes#skip-grant-tables

7。4 初始化並啟動 MySQL

[root@r25 ~]# /opt/mysql/bin/mysqld ——initialize ——user=mysql ——console[root@r25 ~]# chown -R mysql:mysql /opt/mysql[root@r25 ~]# systemctl start mysqld## check mysql temp passord from /opt/mysql/log/error。log2021-02-24T02:45:47。316406Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: I?nDjijxa3-

7。5 建立一個新的 MySQL 使用者用以連線 Canal

## change mysql temp password firstlymysqlalter user ‘root’@‘localhost’ identified by ‘mysql’;Query OK, 0 rows affected (0。00 sec)## create a management user ‘root’@‘%’mysqlcreate user ‘root’@‘%’ identified by ‘mysql’;Query OK, 0 rows affected (0。01 sec)mysqlgrant all privileges on *。* to ‘root’@‘%’;Query OK, 0 rows affected (0。00 sec)## create a canal replication user ‘canal’@‘%’mysqlcreate user ‘canal’@‘%’ identified by ‘canal’;Query OK, 0 rows affected (0。01 sec)mysqlgrant select, replication slave, replication client on *。* to ‘canal’@‘%’;Query OK, 0 rows affected (0。00 sec)mysqlflush privileges;Query OK, 0 rows affected (0。00 sec)

7。6 在 MySQL 中建立用於測試的表

mysqlshow create table test。t2;+————-+——————————————————————————————————————————+| Table | Create Table |+————-+——————————————————————————————————————————+| t2 | CREATE TABLE `t2` ( `id` int DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8 |+————-+——————————————————————————————————————————+1 row in set (0。00 sec)

八。 部署 Canal

Canal 主要用途是基於 MySQL 資料庫增量日誌解析,提供增量資料訂閱和消費。

早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基於業務 trigger 獲取增量變更。

從 2010 年開始,業務逐步嘗試資料庫日誌解析獲取增量變更進行同步,由此衍生出了大量的資料庫增量訂閱和消費業務。

基於日誌增量訂閱和消費的業務包括:

資料庫映象。

資料庫實時備份。

索引構建和實時維護(拆分異構索引、倒排索引等)。

業務 cache 重新整理。

帶業務邏輯的增量資料處理。

當前的 canal 支援源端 MySQL 版本包括 5。1。x , 5。5。x , 5。6。x , 5。7。x , 8。0。x。

8。1 解壓 Canal 包

[root@r26 soft]# mkdir /opt/canaltar vxzf canal。deployer-1。1。4。tar。gz -C /opt/canal

8。2 部署 Canal 的 jre

[root@r26 soft]# tar vxzf jre1。8。0_281。tar。gz[root@r26 soft]# mv jre1。8。0_281 /opt/canal/jre## configue jre, add bellowing line in the head of /opt/canal/bin/startup。sh JAVA=/opt/canal/jre/bin/java

8。3 修改 Canal 的配置檔案

修改 /opt/canal/conf/canal。properties 配置檔案

## modify bellowing configurationcanal。zkServers =192。168。12。24:2181canal。serverMode = kafkacanal。destinations = example ## 需要在 /opt/canal/conf 目錄下建立一個 example 資料夾,用於存放 destination 的配置canal。mq。servers = 192。168。12。22:9092

修改 /opt/canal/conf/example/instance。properties 配置檔案

## modify bellowing configurationcanal。instance。master。address=192。168。12。25:3306canal。instance。dbUsername=canalcanal。instance。dbPassword=canalcanal。instance。filter。regex=。*\\。。* ## 過濾資料庫的表canal。mq。topic=canal-kafka

九。 配置資料流向

9。1 MySQL Binlog -Canal -Kafka 通路

9。1。1 檢視 MySQL Binlog 資訊

檢視 MySQL Binlog 資訊,確保 Binlog 是正常的。

mysqlshow master status;+————————-+——————+————————+——————————+——————————-+| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |+————————-+——————+————————+——————————+——————————-+| binlog。000001 | 2888 | | | |+————————-+——————+————————+——————————+——————————-+1 row in set (0。00 sec)

9。1。2 在 Kafka 中建立一個 Topic

在 Kafka 中建立一個 Topic canal-kafka,這個Topic 的名字要與 Canal 配置檔案 /opt/canal/conf/example/instance。properties 中的 canal。mq。topic=canal-kafka 對應:

[root@r22 kafka]# /opt/kafka/bin/kafka-topics。sh ——create \——zookeeper 192。168。12。24:2181 \——config max。message。bytes=12800000 \——config flush。messages=1 \——replication-factor 1 \——partitions 1 \——topic canal-kafkaCreated topic canal-kafka。[2021-02-24 01:51:55,050] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(canal-kafka-0) (kafka。server。ReplicaFetcherManager)[2021-02-24 01:51:55,052] INFO [Log partition=canal-kafka-0, dir=/opt/kafka/logs] Loading producer state till offset 0 with message format version 2 (kafka。log。Log)[2021-02-24 01:51:55,053] INFO Created log for partition canal-kafka-0 in /opt/kafka/logs/canal-kafka-0 with properties {compression。type -producer, message。downconversion。enable -true, min。insync。replicas -1, segment。jitter。ms -0, cleanup。policy -[delete], flush。ms -9223372036854775807, segment。bytes -1073741824, retention。ms -604800000, flush。messages -1, message。format。version -2。7-IV2, file。delete。delay。ms -60000, max。compaction。lag。ms -9223372036854775807, max。message。bytes -12800000, min。compaction。lag。ms -0, message。timestamp。type -CreateTime, preallocate -false, min。cleanable。dirty。ratio -0。5, index。interval。bytes -4096, unclean。leader。election。enable -false, retention。bytes ——1, delete。retention。ms -86400000, segment。ms -604800000, message。timestamp。difference。max。ms -9223372036854775807, segment。index。bytes -10485760}。 (kafka。log。LogManager)[2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] No checkpointed highwatermark is found for partition canal-kafka-0 (kafka。cluster。Partition)[2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] Log loaded for partition canal-kafka-0 with initial high watermark 0 (kafka。cluster。Partition)

檢視 Kafka 中所有的 Topic:

[root@r22 kafka]# /opt/kafka/bin/kafka-topics。sh ——list ——zookeeper 192。168。12。24:2181__consumer_offsetscanal-kafkaticdc-test

檢視 Kafka 中 Topic ticdc-test 的資訊:

[root@r22 ~]# /opt/kafka/bin/kafka-topics。sh ——describe ——zookeeper 192。168。12。24:2181 ——topic canal-kafkaTopic: ticdc-test PartitionCount: 1 ReplicationFactor: 1 Configs: max。message。bytes=12800000,flush。messages=1 Topic: ticdc-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

9。1。3 啟動 Canal

在啟動 Canal 之前,需要在 Canal 節點上檢視一下埠的情況:

## check MySQL 3306 port## canal。instance。master。address=192。168。12。25:3306[root@r26 bin]# telnet 192。168。12。25 3306## check Kafka 9092 port## canal。mq。servers = 192。168。12。22:9092[root@r26 bin]# telnet 192。168。12。22 9092## check zookeeper 2181 port## canal。zkServers = 192。168。12。24:2181[root@r26 bin]# telnet 192。168。12。24 2181

啟動 Canal:

[root@r26 bin]# /opt/canal/bin/startup。shcd to /opt/canal/bin for workaround relative pathLOG CONFIGURATION : /opt/canal/bin/。。/conf/logback。xmlcanal conf : /opt/canal/bin/。。/conf/canal。propertiesCLASSPATH :/opt/canal/bin/。。/conf:/opt/canal/bin/。。/lib/zookeeper-3。4。5。jar:/opt/canal/bin/。。/lib/zkclient-0。10。jar:/opt/canal/bin/。。/lib/spring-tx-3。2。18。RELEASE。jar:/opt/canal/bin/。。/lib/spring-orm-3。2。18。RELEASE。jar:/opt/canal/bin/。。/lib/spring-jdbc-3。2。18。RELEASE。jar:/opt/canal/bin/。。/lib/spring-expression-3。2。18。RELEASE。jar:/opt/canal/bin/。。/lib/spring-core-3。2。18。RELEASE。jar:/opt/canal/bin/。。/lib/spring-context-3。2。18。RELEASE。jar:/opt/canal/bin/。。/lib/spring-beans-3。2。18。RELEASE。jar:/opt/canal/bin/。。/lib/spring-aop-3。2。18。RELEASE。jar:/opt/canal/bin/。。/lib/snappy-java-1。1。7。1。jar:/opt/canal/bin/。。/lib/snakeyaml-1。19。jar:/opt/canal/bin/。。/lib/slf4j-api-1。7。12。jar:/opt/canal/bin/。。/lib/simpleclient_pushgateway-0。4。0。jar:/opt/canal/bin/。。/lib/simpleclient_httpserver-0。4。0。jar:/opt/canal/bin/。。/lib/simpleclient_hotspot-0。4。0。jar:/opt/canal/bin/。。/lib/simpleclient_common-0。4。0。jar:/opt/canal/bin/。。/lib/simpleclient-0。4。0。jar:/opt/canal/bin/。。/lib/scala-reflect-2。11。12。jar:/opt/canal/bin/。。/lib/scala-logging_2。11-3。8。0。jar:/opt/canal/bin/。。/lib/scala-library-2。11。12。jar:/opt/canal/bin/。。/lib/rocketmq-srvutil-4。5。2。jar:/opt/canal/bin/。。/lib/rocketmq-remoting-4。5。2。jar:/opt/canal/bin/。。/lib/rocketmq-logging-4。5。2。jar:/opt/canal/bin/。。/lib/rocketmq-common-4。5。2。jar:/opt/canal/bin/。。/lib/rocketmq-client-4。5。2。jar:/opt/canal/bin/。。/lib/rocketmq-acl-4。5。2。jar:/opt/canal/bin/。。/lib/protobuf-java-3。6。1。jar:/opt/canal/bin/。。/lib/oro-2。0。8。jar:/opt/canal/bin/。。/lib/netty-tcnative-boringssl-static-1。1。33。Fork26。jar:/opt/canal/bin/。。/lib/netty-all-4。1。6。Final。jar:/opt/canal/bin/。。/lib/netty-3。2。2。Final。jar:/opt/canal/bin/。。/lib/mysql-connector-java-5。1。47。jar:/opt/canal/bin/。。/lib/metrics-core-2。2。0。jar:/opt/canal/bin/。。/lib/lz4-java-1。4。1。jar:/opt/canal/bin/。。/lib/logback-core-1。1。3。jar:/opt/canal/bin/。。/lib/logback-classic-1。1。3。jar:/opt/canal/bin/。。/lib/kafka-clients-1。1。1。jar:/opt/canal/bin/。。/lib/kafka_2。11-1。1。1。jar:/opt/canal/bin/。。/lib/jsr305-3。0。2。jar:/opt/canal/bin/。。/lib/jopt-simple-5。0。4。jar:/opt/canal/bin/。。/lib/jctools-core-2。1。2。jar:/opt/canal/bin/。。/lib/jcl-over-slf4j-1。7。12。jar:/opt/canal/bin/。。/lib/javax。annotation-api-1。3。2。jar:/opt/canal/bin/。。/lib/jackson-databind-2。9。6。jar:/opt/canal/bin/。。/lib/jackson-core-2。9。6。jar:/opt/canal/bin/。。/lib/jackson-annotations-2。9。0。jar:/opt/canal/bin/。。/lib/ibatis-sqlmap-2。3。4。726。jar:/opt/canal/bin/。。/lib/httpcore-4。4。3。jar:/opt/canal/bin/。。/lib/httpclient-4。5。1。jar:/opt/canal/bin/。。/lib/h2-1。4。196。jar:/opt/canal/bin/。。/lib/guava-18。0。jar:/opt/canal/bin/。。/lib/fastsql-2。0。0_preview_973。jar:/opt/canal/bin/。。/lib/fastjson-1。2。58。jar:/opt/canal/bin/。。/lib/druid-1。1。9。jar:/opt/canal/bin/。。/lib/disruptor-3。4。2。jar:/opt/canal/bin/。。/lib/commons-logging-1。1。3。jar:/opt/canal/bin/。。/lib/commons-lang3-3。4。jar:/opt/canal/bin/。。/lib/commons-lang-2。6。jar:/opt/canal/bin/。。/lib/commons-io-2。4。jar:/opt/canal/bin/。。/lib/commons-compress-1。9。jar:/opt/canal/bin/。。/lib/commons-codec-1。9。jar:/opt/canal/bin/。。/lib/commons-cli-1。2。jar:/opt/canal/bin/。。/lib/commons-beanutils-1。8。2。jar:/opt/canal/bin/。。/lib/canal。store-1。1。4。jar:/opt/canal/bin/。。/lib/canal。sink-1。1。4。jar:/opt/canal/bin/。。/lib/canal。server-1。1。4。jar:/opt/canal/bin/。。/lib/canal。protocol-1。1。4。jar:/opt/canal/bin/。。/lib/canal。prometheus-1。1。4。jar:/opt/canal/bin/。。/lib/canal。parse。driver-1。1。4。jar:/opt/canal/bin/。。/lib/canal。parse。dbsync-1。1。4。jar:/opt/canal/bin/。。/lib/canal。parse-1。1。4。jar:/opt/canal/bin/。。/lib/canal。meta-1。1。4。jar:/opt/canal/bin/。。/lib/canal。instance。spring-1。1。4。jar:/opt/canal/bin/。。/lib/canal。instance。manager-1。1。4。jar:/opt/canal/bin/。。/lib/canal。instance。core-1。1。4。jar:/opt/canal/bin/。。/lib/canal。filter-1。1。4。jar:/opt/canal/bin/。。/lib/canal。deployer-1。1。4。jar:/opt/canal/bin/。。/lib/canal。common-1。1。4。jar:/opt/canal/bin/。。/lib/aviator-2。2。1。jar:/opt/canal/bin/。。/lib/aopalliance-1。0。jar:cd to /opt/canal/bin for continue

9。1。4 檢視 Canal 日誌

檢視 /opt/canal/logs/example/example。log

2021-02-24 01:41:40。293 [destination = example , address = /192。168。12。25:3306 , EventParser] WARN c。a。o。c。p。inbound。mysql。rds。RdsBinlogEventParserProxy - ——-begin to find start position, it will be long time for reset or first position2021-02-24 01:41:40。293 [destination = example , address = /192。168。12。25:3306 , EventParser] WARN c。a。o。c。p。inbound。mysql。rds。RdsBinlogEventParserProxy - prepare to find start position just show master status2021-02-24 01:41:40。542 [destination = example , address = /192。168。12。25:3306 , EventParser] WARN c。a。o。c。p。inbound。mysql。rds。RdsBinlogEventParserProxy - ——-find start position successfully, EntryPosition[included=false,journalName=binlog。000001,position=4,serverId=1,gtid=null,timestamp=1614134832000] cost : 244ms , the next step is binlog dump

9。1。5 檢視 Kafka 中 consumer 資訊

在 MySQL 中插入一條測試資訊:

mysqlinsert into t2 values(1);Query OK, 1 row affected (0。00 sec)

檢視 consumer 的資訊,已經有了剛才插入的測試資料:

/opt/kafka/bin/kafka-console-consumer。sh ——bootstrap-server 192。168。12。22:9092 ——topic canal-kafka ——from-beginning{“data”:null,“database”:“test”,“es”:1614151725000,“id”:2,“isDdl”:false,“mysqlType”:null,“old”:null,“pkNames”:null,“sql”:“create database test”,“sqlType”:null,“table”:“”,“ts”:1614151725890,“type”:“QUERY”}{“data”:null,“database”:“test”,“es”:1614151746000,“id”:3,“isDdl”:true,“mysqlType”:null,“old”:null,“pkNames”:null,“sql”:“create table t2(id int)”,“sqlType”:null,“table”:“t2”,“ts”:1614151746141,“type”:“CREATE”}{“data”:[{“id”:“1”}],“database”:“test”,“es”:1614151941000,“id”:4,“isDdl”:false,“mysqlType”:{“id”:“int”},“old”:null,“pkNames”:null,“sql”:“”,“sqlType”:{“id”:4},“table”:“t2”,“ts”:1614151941235,“type”:“INSERT”}

9。2 Kafka -Flink 通路

在 Flink 中建立 t2 表,connector 型別為 kafka。

## create a test table t2 in FlinkFlink SQLcreate table t2(id int)WITH (‘connector’ = ‘kafka’,‘topic’ = ‘canal-kafka’,‘properties。bootstrap。servers’ = ‘192。168。12。22:9092’,‘properties。group。id’ = ‘canal-kafka-consumer-group’,‘format’ = ‘canal-json’,‘scan。startup。mode’ = ‘latest-offset’);Flink SQLselect * from t1;

在 MySQL 中在插入一條測試資料:

mysqlinsert into test。t2 values(2);Query OK, 1 row affected (0。00 sec)

從 Flink 中可以實時同步資料:

Flink SQLselect * from t1; Refresh: 1 s Page: Last of 1 Updated: 02:49:27。366 id 2

9。3 Flink -TiDB 通路

9。3。1 在 下游的 TiDB 中建立用於測試的表

[root@r20 soft]# mysql -uroot -P14000 -hr21mysqlcreate table t3 (id int);Query OK, 0 rows affected (0。31 sec)

9。3。2 在 Flink 中建立測試表

Flink SQLCREATE TABLE t3 (id int) with (‘connector’ = ‘jdbc’,‘url’ = ‘jdbc:mysql://192。168。12。21:14000/test’,‘table-name’ = ‘t3’,‘username’ = ‘root’,‘password’ = ‘mysql’);Flink SQLinsert into t3 values(3);[INFO] Submitting SQL update statement to the cluster。。。[INFO] Table update statement has been successfully submitted to the cluster:Job ID: a0827487030db177ee7e5c8575ef714e

9。3。3 在下游 TiDB 中檢視插入的資料

mysqlselect * from test。t3;+————+| id |+————+| 3 |+————+1 row in set (0。00 sec)

上一篇:【燒友評測】1099元的小明q1迷你投影儀,獲得德國萊茵低藍光認證!
下一篇:乾貨| 一個有趣的程式碼實現引數校驗,你不可不知的一些常見錯誤資訊