-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Expand file tree
/
Copy pathpipeline-definition-full.yaml
More file actions
72 lines (68 loc) · 2.5 KB
/
pipeline-definition-full.yaml
File metadata and controls
72 lines (68 loc) · 2.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
source:
type: mysql
name: source-database
host: localhost
port: 3306
username: admin
password: pass
tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*
chunk-column: app_order_.*:id,web_order:product_id
capture-new-tables: true
scan.snapshot.filters:
- table: db1.user_table_[0-9]+
filter: id > 100
- table: db[1-2].[app|web]_order_\.*
filter: city != 'China:beijing'
sink:
type: kafka
name: sink-queue
bootstrap-servers: localhost:9092
auto-create-table: true
route:
- source-table: mydb.default.app_order_.*
sink-table: odsdb.default.app_order
description: sync all sharding tables to one
- source-table: mydb.default.web_order
sink-table: odsdb.default.ods_web_order
description: sync table to with given prefix ods_
transform:
- source-table: mydb.app_order_.*
projection: id, order_id, TO_UPPER(product_name)
filter: id > 10 AND order_id > 100
primary-keys: id
partition-keys: product_name
table-options: comment=app order
description: project fields from source table
converter-after-transform: SOFT_DELETE
- source-table: mydb.web_order_.*
projection: CONCAT(id, order_id) as uniq_id, *
filter: uniq_id > 10
description: add new uniq_id for each row
pipeline:
name: source-database-sync-pipe
parallelism: 4
schema.change.behavior: evolve
schema-operator.rpc-timeout: 1 h
execution.runtime-mode: STREAMING
model:
model-name: GET_EMBEDDING
class-name: OpenAIEmbeddingModel
openai.model: text-embedding-3-small
openai.host: https://xxxx
openai.apikey: abcd1234