|
| 1 | +--- |
| 2 | +title: "MySQL 同步到 Doris" |
| 3 | +weight: 1 |
| 4 | +type: docs |
| 5 | +aliases: |
| 6 | +- /get-started/quickstart/mysql-to-doris |
| 7 | +--- |
| 8 | +<!-- |
| 9 | +Licensed to the Apache Software Foundation (ASF) under one |
| 10 | +or more contributor license agreements. See the NOTICE file |
| 11 | +distributed with this work for additional information |
| 12 | +regarding copyright ownership. The ASF licenses this file |
| 13 | +to you under the Apache License, Version 2.0 (the |
| 14 | +"License"); you may not use this file except in compliance |
| 15 | +with the License. You may obtain a copy of the License at |
| 16 | +
|
| 17 | + http://www.apache.org/licenses/LICENSE-2.0 |
| 18 | +
|
| 19 | +Unless required by applicable law or agreed to in writing, |
| 20 | +software distributed under the License is distributed on an |
| 21 | +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 22 | +KIND, either express or implied. See the License for the |
| 23 | +specific language governing permissions and limitations |
| 24 | +under the License. |
| 25 | +--> |
| 26 | + |
| 27 | +# Streaming ELT 同步 MySQL 到 Doris |
| 28 | + |
| 29 | +这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 到 Doris 的 Streaming ELT 作业,包含整库同步、表结构变更同步和分库分表同步的功能。 |
| 30 | +本教程的演示都将在 Flink CDC CLI 中进行,无需一行 Java/Scala 代码,也无需安装 IDE。 |
| 31 | + |
| 32 | +## 准备阶段 |
| 33 | +准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。 |
| 34 | + |
| 35 | +### 准备 Flink Standalone 集群 |
| 36 | +1. 下载 [Flink 2.2.0](https://archive.apache.org/dist/flink/flink-2.2.0/flink-2.2.0-bin-scala_2.12.tgz),解压后得到 flink-2.2.0 目录。 |
| 37 | + 使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-2.2.0 所在目录。 |
| 38 | + |
| 39 | + ```shell |
| 40 | + cd flink-2.2.0 |
| 41 | + ``` |
| 42 | + |
| 43 | +2. 通过在 conf/config.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint。 |
| 44 | + |
| 45 | + ```yaml |
| 46 | + execution: |
| 47 | + checkpointing: |
| 48 | + interval: 3s |
| 49 | + ``` |
| 50 | +
|
| 51 | +3. 使用下面的命令启动 Flink 集群。 |
| 52 | +
|
| 53 | + ```shell |
| 54 | + ./bin/start-cluster.sh |
| 55 | + ``` |
| 56 | + |
| 57 | +启动成功的话,可以在 [http://localhost:8081/](http://localhost:8081/)访问到 Flink Web UI,如下所示: |
| 58 | + |
| 59 | +{{< img src="/fig/mysql-doris-tutorial/flink-ui.png" alt="Flink UI" >}} |
| 60 | + |
| 61 | +多次执行 `start-cluster.sh` 可以拉起多个 TaskManager。 |
| 62 | + |
| 63 | +### 准备 Docker 环境 |
| 64 | +接下来的教程将以 `docker-compose` 的方式准备所需要的组件。 |
| 65 | + |
| 66 | +1. 宿主机配置 |
| 67 | + 由于 Doris 的运行需要内存映射支持,需在宿主机执行如下命令: |
| 68 | + |
| 69 | + ```shell |
| 70 | + sysctl -w vm.max_map_count=2000000 |
| 71 | + ``` |
| 72 | +MacOS 由于内部实现容器的方式不同,在部署时宿主机直接修改max_map_count值可能无法成功,需要先创建以下容器: |
| 73 | + ```shell |
| 74 | + docker run -it --privileged --pid=host --name=change_count debian nsenter -t 1 -m -u -n -i sh |
| 75 | + ``` |
| 76 | + |
| 77 | +容器创建成功执行以下命令: |
| 78 | + ```shell |
| 79 | + sysctl -w vm.max_map_count=2000000 |
| 80 | + ``` |
| 81 | + |
| 82 | +然后 `exit` 退出,创建 Doris Docker 集群。 |
| 83 | + |
| 84 | +2. docker 镜像启动 |
| 85 | + 使用下面的内容创建一个 `docker-compose.yml` 文件: |
| 86 | + |
| 87 | + ```yaml |
| 88 | + version: '2.1' |
| 89 | + services: |
| 90 | + doris: |
| 91 | + image: yagagagaga/doris-standalone |
| 92 | + ports: |
| 93 | + - "8030:8030" |
| 94 | + - "8040:8040" |
| 95 | + - "9030:9030" |
| 96 | + mysql: |
| 97 | + image: debezium/example-mysql:1.1 |
| 98 | + ports: |
| 99 | + - "3306:3306" |
| 100 | + environment: |
| 101 | + - MYSQL_ROOT_PASSWORD=123456 |
| 102 | + - MYSQL_USER=mysqluser |
| 103 | + - MYSQL_PASSWORD=mysqlpw |
| 104 | + ``` |
| 105 | +
|
| 106 | +该 Docker Compose 中包含的容器有: |
| 107 | +- MySQL: 包含商品信息的数据库 `app_db` |
| 108 | +- Doris: 存储从 MySQL 中根据规则映射过来的结果表 |
| 109 | + |
| 110 | +在 `docker-compose.yml` 所在目录下执行下面的命令来启动本教程需要的组件: |
| 111 | + |
| 112 | + ```shell |
| 113 | + docker-compose up -d |
| 114 | + ``` |
| 115 | + |
| 116 | +该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了,也可以通过访问[http://localhost:8030/](http://localhost:8030/) 来查看 Doris 是否运行正常。 |
| 117 | + |
| 118 | +#### 在 MySQL 数据库中准备数据 |
| 119 | +1. 进入 MySQL 容器 |
| 120 | + |
| 121 | + ```shell |
| 122 | + docker-compose exec mysql mysql -uroot -p123456 |
| 123 | + ``` |
| 124 | + |
| 125 | +2. 创建数据库 `app_db` 和表 `orders`,`products`,`shipments`,并插入数据 |
| 126 | + |
| 127 | + ```sql |
| 128 | + -- 创建数据库 |
| 129 | + CREATE DATABASE app_db; |
| 130 | + |
| 131 | + USE app_db; |
| 132 | + |
| 133 | + -- 创建 orders 表 |
| 134 | + CREATE TABLE `orders` ( |
| 135 | + `id` INT NOT NULL, |
| 136 | + `price` DECIMAL(10,2) NOT NULL, |
| 137 | + PRIMARY KEY (`id`) |
| 138 | + ); |
| 139 | + |
| 140 | + -- 插入数据 |
| 141 | + INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00); |
| 142 | + INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00); |
| 143 | + |
| 144 | + -- 创建 shipments 表 |
| 145 | + CREATE TABLE `shipments` ( |
| 146 | + `id` INT NOT NULL, |
| 147 | + `city` VARCHAR(255) NOT NULL, |
| 148 | + PRIMARY KEY (`id`) |
| 149 | + ); |
| 150 | + |
| 151 | + -- 插入数据 |
| 152 | + INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing'); |
| 153 | + INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian'); |
| 154 | + |
| 155 | + -- 创建 products 表 |
| 156 | + CREATE TABLE `products` ( |
| 157 | + `id` INT NOT NULL, |
| 158 | + `product` VARCHAR(255) NOT NULL, |
| 159 | + PRIMARY KEY (`id`) |
| 160 | + ); |
| 161 | + |
| 162 | + -- 插入数据 |
| 163 | + INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer'); |
| 164 | + INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap'); |
| 165 | + INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut'); |
| 166 | + ``` |
| 167 | + |
| 168 | +#### Create database in Doris |
| 169 | +`Doris` 暂时不支持自动创建数据库,需要先创建写入表对应的数据库。 |
| 170 | +1. 进入 Doris Web UI。 |
| 171 | + [http://localhost:8030/](http://localhost:8030/) |
| 172 | + 默认的用户名为 `root`,默认密码为空。 |
| 173 | + |
| 174 | + {{< img src="/fig/mysql-doris-tutorial/doris-ui.png" alt="Doris UI" >}} |
| 175 | + |
| 176 | +2. 通过 Web UI 创建 `app_db` 数据库 |
| 177 | + |
| 178 | + ```sql |
| 179 | + create database app_db; |
| 180 | + ``` |
| 181 | + |
| 182 | + {{< img src="/fig/mysql-doris-tutorial/doris-create-table.png" alt="Doris create table" >}} |
| 183 | + |
| 184 | +## 通过 Flink CDC CLI 提交任务 |
| 185 | +1. 下载下面列出的二进制压缩包,并解压得到目录 `flink-cdc-{{< param Version >}}`; |
| 186 | + [flink-cdc-{{< param Version >}}-bin.tar.gz](https://www.apache.org/dyn/closer.lua/flink/flink-cdc-{{< param Version >}}/flink-cdc-{{< param Version >}}-bin.tar.gz) |
| 187 | + flink-cdc-{{< param Version >}} 下会包含 `bin`、`lib`、`log`、`conf` 四个目录。 |
| 188 | + |
| 189 | +2. 下载下面列出的 connector 包,并且移动到 `lib` 目录下; |
| 190 | + **下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地基于 master 或 release- 分支编译.** |
| 191 | + **请注意,您需要将 jar 移动到 Flink CDC Home 的 lib 目录,而非 Flink Home 的 lib 目录下。** |
| 192 | + - [MySQL pipeline connector {{< param Version >}}](https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/{{< param Version >}}/flink-cdc-pipeline-connector-mysql-{{< param Version >}}.jar) |
| 193 | + - [Apache Doris pipeline connector {{< param Version >}}](https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-doris/{{< param Version >}}/flink-cdc-pipeline-connector-doris-{{< param Version >}}.jar) |
| 194 | + |
| 195 | + 您还需要将下面的 Driver 包放在 Flink `lib` 目录下,或通过 `--jar` 参数将其传入 Flink CDC CLI,因为 CDC Connectors 不再包含这些 Drivers: |
| 196 | + - [MySQL Connector Java](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar) |
| 197 | + |
| 198 | +3.编写任务配置 yaml 文件。 |
| 199 | +下面给出了一个整库同步的示例文件 `mysql-to-doris.yaml`: |
| 200 | + |
| 201 | + ```yaml |
| 202 | + ################################################################################ |
| 203 | + # Description: Sync MySQL all tables to Doris |
| 204 | + ################################################################################ |
| 205 | + source: |
| 206 | + type: mysql |
| 207 | + hostname: localhost |
| 208 | + port: 3306 |
| 209 | + username: root |
| 210 | + password: 123456 |
| 211 | + tables: app_db.\.* |
| 212 | + server-id: 5400-5404 |
| 213 | + server-time-zone: UTC |
| 214 | + |
| 215 | + sink: |
| 216 | + type: doris |
| 217 | + fenodes: 127.0.0.1:8030 |
| 218 | + username: root |
| 219 | + password: "" |
| 220 | + table.create.properties.light_schema_change: true |
| 221 | + table.create.properties.replication_num: 1 |
| 222 | + |
| 223 | + pipeline: |
| 224 | + name: Sync MySQL Database to Doris |
| 225 | + parallelism: 2 |
| 226 | + |
| 227 | + ``` |
| 228 | + |
| 229 | +其中: |
| 230 | +source 中的 `tables: app_db.\.*` 通过正则匹配同步 `app_db` 下的所有表。 |
| 231 | +sink 添加 `table.create.properties.replication_num` 参数是由于 Docker 镜像中只有一个 Doris BE 节点。 |
| 232 | + |
| 233 | +4. 最后,通过命令行提交任务到 Flink Standalone cluster |
| 234 | + ```shell |
| 235 | + bash bin/flink-cdc.sh mysql-to-doris.yaml |
| 236 | + ``` |
| 237 | +提交成功后,返回信息如: |
| 238 | + ```shell |
| 239 | + Pipeline has been submitted to cluster. |
| 240 | + Job ID: ae30f4580f1918bebf16752d4963dc54 |
| 241 | + Job Description: Sync MySQL Database to Doris |
| 242 | + ``` |
| 243 | +在 Flink Web UI,可以看到一个名为 `Sync MySQL Database to Doris` 的任务正在运行。 |
| 244 | + |
| 245 | +{{< img src="/fig/mysql-doris-tutorial/mysql-to-doris.png" alt="MySQL-to-Doris" >}} |
| 246 | + |
| 247 | +打开 Doris 的 Web UI,可以看到数据表已经被创建出来,数据能成功写入。 |
| 248 | + |
| 249 | +{{< img src="/fig/mysql-doris-tutorial/doris-display-data.png" alt="Doris display data" >}} |
| 250 | + |
| 251 | +### 同步变更 |
| 252 | +进入 MySQL 容器 |
| 253 | + |
| 254 | +```shell |
| 255 | + docker-compose exec mysql mysql -uroot -p123456 |
| 256 | +``` |
| 257 | + |
| 258 | +接下来,修改 MySQL 数据库中表的数据,Doris 中显示的订单数据也将实时更新: |
| 259 | +1. 在 MySQL 的 `orders` 表中插入一条数据 |
| 260 | + |
| 261 | + ```sql |
| 262 | + INSERT INTO app_db.orders (id, price) VALUES (3, 100.00); |
| 263 | + ``` |
| 264 | + |
| 265 | +2. 在 MySQL 的 `orders` 表中增加一个字段 |
| 266 | + |
| 267 | + ```sql |
| 268 | + ALTER TABLE app_db.orders ADD amount varchar(100) NULL; |
| 269 | + ``` |
| 270 | + |
| 271 | +3. 在 MySQL 的 `orders` 表中更新一条数据 |
| 272 | + |
| 273 | + ```sql |
| 274 | + UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1; |
| 275 | + ``` |
| 276 | +4. 在 MySQL 的 `orders` 表中删除一条数据 |
| 277 | + |
| 278 | + ```sql |
| 279 | + DELETE FROM app_db.orders WHERE id=2; |
| 280 | + ``` |
| 281 | + |
| 282 | +每执行一步就刷新一次 Doris Web UI,可以看到 Doris 中显示的 orders 数据将实时更新,如下所示: |
| 283 | + |
| 284 | +{{< img src="/fig/mysql-doris-tutorial/doris-display-result.png" alt="Doris display result" >}} |
| 285 | + |
| 286 | +同样的,去修改 `shipments`, `products` 表,也能在 Doris 中实时看到同步变更的结果。 |
| 287 | + |
| 288 | +### Route the changes |
| 289 | +Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,借助这种能力,我们能够实现表名库名替换,整库同步等功能。 |
| 290 | +下面提供一个配置文件说明: |
| 291 | + ```yaml |
| 292 | + ################################################################################ |
| 293 | + # Description: Sync MySQL all tables to Doris |
| 294 | + ################################################################################ |
| 295 | + source: |
| 296 | + type: mysql |
| 297 | + hostname: localhost |
| 298 | + port: 3306 |
| 299 | + username: root |
| 300 | + password: 123456 |
| 301 | + tables: app_db.\.* |
| 302 | + server-id: 5400-5404 |
| 303 | + server-time-zone: UTC |
| 304 | +
|
| 305 | + sink: |
| 306 | + type: doris |
| 307 | + fenodes: 127.0.0.1:8030 |
| 308 | + benodes: 127.0.0.1:8040 |
| 309 | + username: root |
| 310 | + password: "" |
| 311 | + table.create.properties.light_schema_change: true |
| 312 | + table.create.properties.replication_num: 1 |
| 313 | +
|
| 314 | + route: |
| 315 | + - source-table: app_db.orders |
| 316 | + sink-table: ods_db.ods_orders |
| 317 | + - source-table: app_db.shipments |
| 318 | + sink-table: ods_db.ods_shipments |
| 319 | + - source-table: app_db.products |
| 320 | + sink-table: ods_db.ods_products |
| 321 | +
|
| 322 | + pipeline: |
| 323 | + name: Sync MySQL Database to Doris |
| 324 | + parallelism: 2 |
| 325 | + ``` |
| 326 | + |
| 327 | +通过上面的 `route` 配置,会将 `app_db.orders` 表的结构和数据同步到 `ods_db.ods_orders` 中。从而实现数据库迁移的功能。 |
| 328 | +特别地,`source-table` 支持正则表达式匹配多表,从而实现分库分表同步的功能,例如下面的配置: |
| 329 | + |
| 330 | + ```yaml |
| 331 | + route: |
| 332 | + - source-table: app_db.order\.* |
| 333 | + sink-table: ods_db.ods_orders |
| 334 | + ``` |
| 335 | + |
| 336 | +这样,就可以将诸如 `app_db.order01`、`app_db.order02`、`app_db.order03` 的表汇总到 ods_db.ods_orders 中。注意,目前还不支持多表中存在相同主键数据的场景,将在后续版本支持。 |
| 337 | + |
| 338 | +## 环境清理 |
| 339 | +本教程结束后,在 `docker-compose.yml` 文件所在的目录下执行如下命令停止所有容器: |
| 340 | + ```shell |
| 341 | + docker-compose down |
| 342 | + ``` |
| 343 | +在 Flink 所在目录 `flink-2.2.0` 下执行如下命令停止 Flink 集群: |
| 344 | + |
| 345 | + ```shell |
| 346 | + ./bin/stop-cluster.sh |
| 347 | + ``` |
| 348 | + |
| 349 | +{{< top >}} |
0 commit comments