|
| 1 | +# MongoDB |
| 2 | + |
| 3 | +> MongoDB数据接收器 |
| 4 | +
|
| 5 | +## 支持引擎 |
| 6 | + |
| 7 | +> Spark<br/> |
| 8 | +> Flink<br/> |
| 9 | +> SeaTunnel Zeta<br/> |
| 10 | +
|
| 11 | +## 主要特性 |
| 12 | + |
| 13 | +- [x] [exactly-once](../../concept/connector-v2-features.md) |
| 14 | +- [x] [cdc](../../concept/connector-v2-features.md) |
| 15 | + |
| 16 | +**提示** |
| 17 | + |
| 18 | +> 1.如果要使用CDC编写的功能,建议启用追加销售启用配置。 |
| 19 | +
|
| 20 | +## 描述 |
| 21 | + |
| 22 | +MongoDB连接器提供了从MongoDB读取数据和向MongoDB写入数据的能力。 |
| 23 | +本文档描述了如何设置MongoDB连接器以针对MongoDB运行数据写入器。 |
| 24 | + |
| 25 | +## 支持的数据源信息 |
| 26 | + |
| 27 | +为了使用Mongodb连接器,需要以下依赖关系。 |
| 28 | +它们可以通过install-plugin.sh或Maven中央存储库下载。 |
| 29 | + |
| 30 | +| 数据来源 | 支持的版本 | 依赖 | |
| 31 | +|------------|--------------------|---------------------------------------------------------------------------------------| |
| 32 | +| MongoDB | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/connector-mongodb) | |
| 33 | + |
| 34 | +## 数据类型映射 |
| 35 | + |
| 36 | +下表列出了从MongoDB BSON类型到Seatunnel数据类型的字段数据类型映射。 |
| 37 | + |
| 38 | +| Seatunnel数据类型 | MongoDB BSON类型 | |
| 39 | +|---------------------|-------------------| |
| 40 | +| STRING | ObjectId | |
| 41 | +| STRING | String | |
| 42 | +| BOOLEAN | Boolean | |
| 43 | +| BINARY | Binary | |
| 44 | +| INTEGER | Int32 | |
| 45 | +| TINYINT | Int32 | |
| 46 | +| SMALLINT | Int32 | |
| 47 | +| BIGINT | Int64 | |
| 48 | +| DOUBLE | Double | |
| 49 | +| FLOAT | Double | |
| 50 | +| DECIMAL | Decimal128 | |
| 51 | +| Date | Date | |
| 52 | +| Timestamp | Timestamp[Date] | |
| 53 | +| ROW | Object | |
| 54 | +| ARRAY | Array | |
| 55 | + |
| 56 | +**提示** |
| 57 | + |
| 58 | +> 1.当使用SeaTunnel将Date和Timestamp类型写入MongoDB时,两者都会在MongoDB中生成Date数据类型,但精度会有所不同。SeaTunnel Date类型生成的数据具有二级精度,而SeaTunnel Timestamp类型生成的数字具有毫秒级精度。<br/> |
| 59 | +> 2.在SeaTunnel中使用DECIMAL类型时,请注意最大范围不能超过34位数字,这意味着您应该使用DECIMAL(34,18)。<br/> |
| 60 | +
|
| 61 | +## Sink 选项 |
| 62 | + |
| 63 | +| 名称 | 类型 | 是否必传 | 默认值 | 描述 | |
| 64 | +|-----------------------|----------|----------|---------|------------------------------------------------------------------------------------------------------------------------------| |
| 65 | +| uri | String | 是 | - | MongoDB标准连接uri。例如。mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true. | |
| 66 | +| database | String | 是 | - | 要读取或写入的MongoDB数据库的名称. | |
| 67 | +| collection | String | 是 | - | 要读取或写入的MongoDB集合的名称. | |
| 68 | +| schema | String | 是 | - | MongoDB的BSON和seatunnel数据结构映射. | |
| 69 | +| buffer-flush.max-rows | String | 否 | 1000 | 指定每个批处理请求的最大缓冲行数. | |
| 70 | +| buffer-flush.interval | String | 否 | 30000 | 指定每个批处理请求的最大缓冲行间隔,单位为毫秒. | |
| 71 | +| retry.max | String | 否 | 3 | 指定向数据库写入记录失败时的最大重试次数. | |
| 72 | +| retry.interval | Duration | 否 | 1000 | 指定向数据库写入记录失败时的重试时间间隔,单位为毫秒. | |
| 73 | +| upsert-enable | Boolean | 否 | 否 | 是否通过追加销售模式编写文档. | |
| 74 | +| primary-key | List | 否 | - | 追加销售/更新的主键。属性的键采用`["id"、"name"、…]`格式. | |
| 75 | +| transaction | Boolean | 否 | 否 | 是否在MongoSink中使用交易 (需要MongoDB 4.2+). | |
| 76 | +| common-options | | 否 | - | Sink插件常用参数,详见[Sink common Options](../Sink common Options.md) |
| 77 | + |
| 78 | +### 提示 |
| 79 | + |
| 80 | +> 1.MongoDB Sink Connector的数据刷新逻辑由三个参数共同控制:"buffer flush.max lines"、"buffer flush.interval"和"checkpoint.interval"。<br/> |
| 81 | +> 如果满足这些条件中的任何一个,将触发数据刷新。<br/> |
| 82 | +> 2.与历史参数"upsert-key"兼容。如果设置了"upsert-key",请不要设置"primary-key"。<br/> |
| 83 | +
|
| 84 | +## 如何创建MongoDB数据同步作业 |
| 85 | + |
| 86 | +以下示例演示了如何创建数据同步作业,将随机生成的数据写入MongoDB数据库: |
| 87 | + |
| 88 | +```bash |
| 89 | +# 设置要执行的任务的基本配置 |
| 90 | +env { |
| 91 | + parallelism = 1 |
| 92 | + job.mode = "BATCH" |
| 93 | + checkpoint.interval = 1000 |
| 94 | +} |
| 95 | + |
| 96 | +source { |
| 97 | + FakeSource { |
| 98 | + row.num = 2 |
| 99 | + bigint.min = 0 |
| 100 | + bigint.max = 10000000 |
| 101 | + split.num = 1 |
| 102 | + split.read-interval = 300 |
| 103 | + schema { |
| 104 | + fields { |
| 105 | + c_bigint = bigint |
| 106 | + } |
| 107 | + } |
| 108 | + } |
| 109 | +} |
| 110 | + |
| 111 | +sink { |
| 112 | + MongoDB{ |
| 113 | + uri = mongodb://user: [email protected]:27017 |
| 114 | + database = "test" |
| 115 | + collection = "test" |
| 116 | + schema = { |
| 117 | + fields { |
| 118 | + _id = string |
| 119 | + c_bigint = bigint |
| 120 | + } |
| 121 | + } |
| 122 | + } |
| 123 | +} |
| 124 | +``` |
| 125 | + |
| 126 | +## Parameter Interpretation |
| 127 | + |
| 128 | +### MongoDB数据库连接URI示例 |
| 129 | + |
| 130 | +未经身份验证的单节点连接: |
| 131 | + |
| 132 | +```bash |
| 133 | +mongodb://127.0.0.0:27017/mydb |
| 134 | +``` |
| 135 | + |
| 136 | +副本集连接: |
| 137 | + |
| 138 | +```bash |
| 139 | +mongodb://127.0.0.0:27017/mydb?replicaSet=xxx |
| 140 | +``` |
| 141 | + |
| 142 | +经过身份验证的副本集连接: |
| 143 | + |
| 144 | +```bash |
| 145 | +mongodb://admin: [email protected]:27017/mydb ?replicaSet=xxx &authSource=admin |
| 146 | +``` |
| 147 | + |
| 148 | +多节点副本集连接: |
| 149 | + |
| 150 | +```bash |
| 151 | +mongodb://127.0.0..1:27017,127.0.0..2:27017,127.0.0.3:27017/mydb?replicaSet=xxx |
| 152 | +``` |
| 153 | + |
| 154 | +分片集群连接: |
| 155 | + |
| 156 | +```bash |
| 157 | +mongodb://127.0.0.0:27017/mydb |
| 158 | +``` |
| 159 | + |
| 160 | +多个mongos连接: |
| 161 | + |
| 162 | +```bash |
| 163 | +mongodb://192.168.0.1:27017,192.168.0.2:27017,192.168.0.3:27017/mydb |
| 164 | +``` |
| 165 | + |
| 166 | +注意:URI中的用户名和密码在连接到连接字符串之前必须进行URL编码。 |
| 167 | + |
| 168 | +### 缓冲区冲洗 |
| 169 | + |
| 170 | +```bash |
| 171 | +sink { |
| 172 | + MongoDB { |
| 173 | + uri = "mongodb://user:[email protected]:27017" |
| 174 | + database = "test_db" |
| 175 | + collection = "users" |
| 176 | + buffer-flush.max-rows = 2000 |
| 177 | + buffer-flush.interval = 1000 |
| 178 | + schema = { |
| 179 | + fields { |
| 180 | + _id = string |
| 181 | + id = bigint |
| 182 | + status = string |
| 183 | + } |
| 184 | + } |
| 185 | + } |
| 186 | +} |
| 187 | +``` |
| 188 | + |
| 189 | +### 为什么不建议使用交易进行操作? |
| 190 | + |
| 191 | +尽管MongoDB从4.2版本开始就完全支持多文档事务,但这并不意味着每个人都应该鲁莽地使用它们。 |
| 192 | +事务相当于锁、节点协调、额外开销和性能影响。 |
| 193 | +相反,使用交易的原则应该是:尽可能避免使用它们。 |
| 194 | +通过合理设计系统,可以大大避免使用事务的必要性。 |
| 195 | + |
| 196 | +### Idempotent写作 |
| 197 | + |
| 198 | +通过指定一个明确的主键并使用upstart方法,可以实现精确一次写入语义。 |
| 199 | + |
| 200 | +如果在配置中定义了"主键"和"upstart enable",MongoDB接收器将使用upstart语义,而不是常规的INSERT语句。我们将upstart key中声明的主键组合为MongoDB保留的主键,并使用upstart模式进行写入,以确保幂等写入。 |
| 201 | +如果发生故障,Seatunnel作业将从上一个成功的检查点恢复并重新处理,这可能会导致恢复过程中出现重复的消息处理。强烈建议使用升级模式,因为它有助于避免违反数据库主键约束,并在需要重新处理记录时生成重复数据。 |
| 202 | + |
| 203 | +```bash |
| 204 | +sink { |
| 205 | + MongoDB { |
| 206 | + uri = "mongodb://user:[email protected]:27017" |
| 207 | + database = "test_db" |
| 208 | + collection = "users" |
| 209 | + upsert-enable = true |
| 210 | + primary-key = ["name","status"] |
| 211 | + schema = { |
| 212 | + fields { |
| 213 | + _id = string |
| 214 | + name = string |
| 215 | + status = string |
| 216 | + } |
| 217 | + } |
| 218 | + } |
| 219 | +} |
| 220 | +``` |
| 221 | + |
| 222 | +## 更改日志 |
| 223 | + |
| 224 | +### 2.2.0-beta |
| 225 | + |
| 226 | +- 添加MongoDB源连接器 |
| 227 | + |
| 228 | +### 2.3.1-release |
| 229 | + |
| 230 | +- [功能]重构mongodb源连接器([4620](https://github.com/apache/incubator-seatunnel/pull/4620)) |
| 231 | + |
| 232 | +### 随后版本 |
| 233 | + |
| 234 | +- [功能]Mongodb支持cdc接收器([4833](https://github.com/apache/seatunnel/pull/4833)) |
| 235 | + |
0 commit comments