Skip to content

Latest commit

 

History

History
1114 lines (1074 loc) · 38.3 KB

File metadata and controls

1114 lines (1074 loc) · 38.3 KB

FlowServer with Confluent and Debezium

External Table Architecture

0.Setup WarehousePG ( Target DB )

install Flow Server RPM at all node of WarehousePG

sudo dnf install whpg7-flow-server*

install Flow Server RPM and WarehousePG RPM at Flow Server Node.

sudo dnf install warehouse-pg-7.2.1_WHPG-1*
sudo dnf install whpg7-flow-server*

create Extension for Flow Server

create extension fs_formatter;

SQL to check when an error occurs and normal loading does not occur when loading from a topic to a table

SELECT cmdtime, errmsg, encode(rawbytes, 'escape') AS rawbytes 
FROM gp_read_persistent_error_log('"public"."whpgfs_ext_avro"') 
WHERE cmdtime >= '2025-09-15T12:18:02.009596Z' ORDER BY cmdtime DESC;

1.Setup EPAS DB ( Source DB )

sudo -iu enterprisedb vi as17/data/postgresql.conf

아래 사항 변경.

wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
max_connections = 100

sudo vi pg_hba.conf

host replication debezium_user 0.0.0.0/0 md5
host all debezium_user 0.0.0.0/0 md5
sudo systemctl restart edb-as-17

-- 1. Debezium 전용 역할(사용자) 생성 (비밀번호는 반드시 변경하세요)

CREATE ROLE debezium_user WITH REPLICATION LOGIN PASSWORD 'your_strong_password';

-- 2. 사용자가 데이터베이스에 접속할 수 있는 권한 부여

GRANT CONNECT ON DATABASE edb TO debezium_user;
GRANT CONNECT ON DATABASE enterprisedb TO debezium_user;

-- 3. 데이터베이스의 public 스키마 사용 권한 부여

GRANT USAGE ON SCHEMA public TO debezium_user;

-- 4. 추적할 모든 테이블에 대한 SELECT 권한 부여 -- (또는 특정 테이블만 지정할 수 있습니다)

GRANT SELECT ON TABLE public.products TO debezium_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium_user;

-- 5. (매우 중요) 논리적 복제 슬롯을 관리할 수 있는 특별 권한 부여 -- 이 권한이 없으면 Debezium이 시작되지 않습니다.

ALTER ROLE debezium_user WITH REPLICATION;

-- 6. 복제 PUBLICATION 생성.

CREATE PUBLICATION debezium_pub FOR TABLE public.customer,public.json_from_kafka;
or
CREATE PUBLICATION debezium_pub FOR ALL TABLES;

Check Replication at source DB (EPAS). -- for EPAS

SELECT relreplident FROM pg_class WHERE relname = 'avro';

-- 결과가 f,p 이여야함. d 이면 안됨. -- Primary Key가 있는 경우

ALTER TABLE public.products REPLICA IDENTITY DEFAULT;

-- Primary Key가 없는 경우

ALTER TABLE public.products REPLICA IDENTITY FULL;

-- 기타 확인 명령어

SELECT * FROM pg_publication_tables WHERE tablename = 'avro';
SELECT slot_name, plugin, slot_type, active FROM pg_replication_slots;
SELECT * FROM pg_replication_slots;
SELECT pg_drop_replication_slot('debezium');
SELECT * FROM pg_create_physical_replication_slot('standby_server_slot');

2. Install OS RPM / Setup Confluence Repo at Confluent Node

sudo yum install curl which sudo rpm --import https://packages.confluent.io/rpm/8.0/archive.key sudo update-crypto-policies --set DEFAULT:SHA1 sudo tee -a /etc/yum.repos.d/confluent.repo << EOF [Confluent] name=Confluent repository baseurl=https://packages.confluent.io/rpm/8.0 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/8.0/archive.key enabled=1 [Confluent-Clients] name=Confluent Clients repository baseurl=https://packages.confluent.io/clients/rpm/centos/\$releasever/\$basearch gpgcheck=1 gpgkey=https://packages.confluent.io/clients/rpm/archive.key enabled=1 EOF

3.Install Confluent-Community 8.0.0 at Confluent Node.

-- confluent-platform #sudo yum clean all && sudo yum install confluent-platform -- confluent-platform & RBAC #sudo yum clean all && sudo yum install confluent-platform && sudo yum install confluent-security -- confluent-community sudo yum clean all && sudo yum install confluent-community-2.13 -- /usr/bin 에 실행 명령어 설치 됨. -- /etc/kafka 에 설정 관련 파일들 설치 됨. -- 모든 로컬 서비스 시작 (ZooKeeper, Kafka, Connect, Schema Registry 등) sudo rm -rf /tmp/kraft-controller-logs/* sudo kafka-storage random-uuid q1Sh-9_ISia_zwGINzRvyQ sudo kafka-storage format -t Rj00LRmXTAStcGzPQ1GbNg --config /etc/kafka/server.properties --standalone #kafka-server-start $CONFLUENT_HOME/etc/kafka/controller.properties #kafka-server-start $CONFLUENT_HOME/etc/kafka/broker.properties 4.To start Confluent Kafka at Confluent Node

sudo vi /etc/kafka/server.properties

# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=1
# List of controller endpoints used connect to the controller cluster
controller.quorum.bootstrap.servers=localhost:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
#advertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093
advertised.listeners=PLAINTEXT://:9092,CONTROLLER://:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kraft-combined-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
share.coordinator.state.topic.replication.factor=1
share.coordinator.state.topic.min.isr=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
confluent.license.topic.replication.factor=1
confluent.metadata.topic.replication.factor=1
confluent.security.event.logger.exporter.kafka.topic.replicas=1
confluent.balancer.enable=true
confluent.balancer.topic.replication.factor=1
delete.topic.enable=true
kafka-server-start /etc/kafka/server.properties
sudo systemctl start confluent-server

sudo vi /usr/lib/systemd/system/confluent-kafka.service # 아래 수정.

User=root
Group=root
sudo systemctl start confluent-kafka

5.To start schema-registry at Confluent Node

sudo vi /etc/schema-registry/schema-registry.properties

# Specify the address the socket server listens on, e.g. listeners = PLAINTEXT://your.host.name:9092
listeners=http://0.0.0.0:8081
# The advertised host name. This must be specified if you are running Schema Registry
# with multiple nodes.
host.name=192.168.50.1
# List of Kafka brokers to connect to, e.g. PLAINTEXT://hostname:9092,SSL://hostname2:9092
kafkastore.bootstrap.servers=PLAINTEXT://hostname:9092,SSL://hostname2:9092
schema-registry-start $CONFLUENT_HOME/etc/schema-registry/chema-registry.properties
sudo vi /usr/lib/systemd/system/confluent-kafka.service 
sudo systemctl daemon-reload
sudo systemctl start confluent-schema-registry
sudo systemctl status confluent-schema-registry

6.To start Confluent Kafka-Connect at Confluent Node

sudo vi /etc/kafka/connect-distributed.properties

group.id=connect-cluster
group.id=debezium-connect-cluster
listeners=HTTP://0.0.0.0:8083
rest.advertised.host.name=localhost
rest.port=8083
plugin.path=/opt/kafka/current/plugins/debezium-connector-postgres
# --- 스키마 레지스트리 설정 (Apicurio Registry 예시) ---
# 이 부분은 사용할 스키마 레지스트리에 맞춰 설정합니다.
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
sudo vi /usr/lib/systemd/system/confluent-kafka-connect.service
sudo systemctl start confluent-kafka-connect
sudo systemctl status confluent-kafka-connect
sudo journalctl -u confluent-kafka-connect --since "1 minutes ago"

7.Install Debezium at Confluent-Connect at Confluent Node

-- debezium 설치

cd /tmp
wget https://packages.confluent.io/rpm/8.0/confluent-hub-client-8.0.0-1.noarch.rpm
sudo dnf install ./confluent-hub-client-8.0.0-1.noarch.rpm
sudo confluent-hub install debezium/debezium-connector-postgresql:latest
  /etc/kafka/connect-distributed.properties 
  /etc/kafka/connect-standalone.properties 
  /etc/schema-registry/connect-avro-distributed.properties 
  /etc/schema-registry/connect-avro-standalone.properties 
  /etc/kafka/connect-distributed.properties 
sudo systemctl restart confluent-kafka-connect
sudo systemctl status confluent-kafka-connect
sudo systemctl restart confluent-schema-registry
sudo systemctl status confluent-schema-registry

8.To prepare Test Data ant Source DB(EDB EPAS)

--- DB 준비 ----

CREATE TABLE customer (
    customer_id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
    email VARCHAR(255) NOT NULL UNIQUE,
    last_name VARCHAR(50) NOT NULL,
    first_name VARCHAR(50) NOT NULL,
    is_active BOOLEAN NOT NULL DEFAULT TRUE,
    phone_number VARCHAR(20),
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ
);
INSERT INTO customer (email, last_name, first_name) VALUES 
    ('gildong.000@example.com', '홍', '길동');
INSERT INTO customer (email, last_name, first_name, phone_number) VALUES 
    ('soonsin.000@example.com', '이', '순신', '010-1234-5678');
INSERT INTO customer (email, last_name, first_name, phone_number) VALUES 
    ('gwansun.000@example.com', '유', '관순', NULL);
INSERT INTO customer (email, last_name, first_name, phone_number) VALUES 
    ('sejong.000@example.com', '김', '세종', '010-9999-8888');
INSERT INTO customer (email, last_name, first_name, is_active, phone_number, created_at, updated_at)
VALUES ('chulsoo.kim@example.com', '김', '철수', false, '010-1111-2222', '2025-01-01 10:00:00 KST', NOW());
# 테이블 추가시.
CREATE TABLE avro_from_kafka( id int8, name varchar(100), month int4, amount_paid decimal(9,2));
ALTER TABLE public.avro_from_kafka REPLICA IDENTITY FULL;
ALTER PUBLICATION debezium_pub ADD TABLE public.avro_from_kafka;
SELECT * FROM pg_publication_tables WHERE pubname = 'debezium_pub';
INSERT INTO avro_from_kafka VALUES (1, '홍길동', 2501, 9348.33);

9.Create Confluent Connect for EDB EPAS17(example) at Confluent Node

---- PG Connect 생성 ----

curl -s http://localhost:8083/connector-plugins | jq '.[].class' | grep PostgresConnector "io.debezium.connector.postgresql.PostgresConnector"
sudo tee -a pg-connector.json << EOF
{
  "name": "epas17-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "10",
    "database.hostname": "10.0.31.196",  
    "database.port": "5444",
    "database.user": "debezium_user",
    "database.password": "your_strong_password",
    "database.dbname": "enterprisedb",
    "database.server.name": "epas17",
    "topic.prefix": "epas17",
    "schema.include.list": "public",
    "table.include.list": "public.*",
 #   "table.include.list": "public.customer,public.json_from_kafka",
    "publication.name": "debezium_pub",
    "plugin.name": "pgoutput",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081"
  }
}
EOF

-- connect 생성

curl -X POST -H "Content-Type: application/json" --data @pg-connector.json http://localhost:8083/connectors

-- connect 리스트 보기

curl -s http://localhost:8083/connectors | jq

-- 특정 connect 확인

curl http://localhost:8083/connectors/epas17-connector/status | jq

--- 특정 connect 설정 확인

curl -s http://localhost:8083/connectors/epas17-connector/config | jq

-- 특정 connect 삭제

curl -X DELETE http://localhost:8083/connectors/epas17-connector

-- 특정 connect 설정 변경.

curl -s -X PUT -H "Content-Type: application/json"  \
   http://localhost:8083/connectors/epas17-connector/config  \
   -d '{
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "10.0.31.196",  
    "database.port": "5444",
    "database.user": "debezium_user",
    "database.password": "your_strong_password",
    "database.dbname": "enterprisedb",
    "database.server.name": "epas17",
    "topic.prefix": "epas17",
    "schema.include.list": "public",
    "table.include.list": "public.*",
    "publication.name": "debezium_pub",
    "plugin.name": "pgoutput",
    "decimal.handling.mode": "double",
    "value.converter.schemas.enable": "false",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081"
  }'
  • 위 설정에서 아래를 추가하면 Topic의 포맷이 달라짐. "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.add.fields": "op,lsn,source.ts_ms", "transforms.unwrap.add.fields.prefix": "__", "transforms.unwrap.delete.handling.mode": "rewrite"
  • 위 포맷일 경우. makeyaml_kyobo.sh, generate_kyobo.awk 파일로 yaml 파일 생성해야 함.

-- 특정 connect 설정 변경 후 재시작.

curl -s -X POST http://localhost:8083/connectors/epas17-connector/restart

-- 'value' 스키마 삭제

curl -X DELETE http://localhost:8081/subjects/tpcc.public.bmsql_customer-value

-- (선택) 영구 삭제를 원할 경우

curl -X DELETE http://localhost:8081/subjects/tpcc.public.bmsql_customer-value?permanent=true

-- confluent PKG 삭제

rm -rf $CONFLUENT_HOME
rm -rf /var/lib/<confluent-platform-data-files>
sudo yum autoremove confluent-platform

10.Check and Verify Confluent(Kafka) Topic at Confluent Node

-- Topic 리스트 확인

kafka-topics --bootstrap-server localhost:9092 --list

-- 특정 Topic 구성 확인

kafka-topics --bootstrap-server localhost:9092 --describe --topic epas17.public.avro

-- 특정 Topic 삭제

kafka-topics --bootstrap-server localhost:9092 --delete --topic epas17.public.avro

-- 특정 Topic Consumer 실행으로 데이터 확인 (CSV, JSON)

kafka-console-consumer --bootstrap-server localhost:9092 --topic epas17.public.customer --from-beginning

-- Topic Consumer 실행으로 특정 topic 데이터 확인 (AVRO)

sudo kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic epas17.public.avro_from_kafka --from-beginning --property schema.registry.url=http://localhost:8081

-- Topic Consumer 실행으로 특정 topic의 특정 데이터(offset) 확인 (AVRO)

sudo kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic tpcc.public.bmsql_warehouse --partition 0 --offset 7 --max-messages 1

-- 특정 Topic 건수/Offset 확인

kafka-get-offsets --bootstrap-server localhost:9092 --topic epas17.public.customer

-- consumer-groups 리스트 확인

kafka-consumer-groups --bootstrap-server localhost:9092 --list

-- 특정 consumer-groups 확인,Partition, Offset

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group my-consumer-group

-- 특정 consumer-groups 내 Topic 확인

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group my-temp-group --topic epas17.public.customer

11.Setup Flow Server at Flow Server Node

sudo tee -a gpconfigs/flowservercfg.json << EOF
{
    "Host": "",
    "Port": 6060,
    "Gpfdist": {
        "Host": "10.0.1.254",   # Flow Server IP
        "Port": 6070,
        "ReuseTables": false    # 
    },
    "Prometheus": {
        "Host": "",
        "Port": 9080,
        "MetricsPath": "/flow_metrics"
    },
    "DebugPort": 6080,
    "Logging": {
        "SplitLogByJob": false,
        "FrontendLevel": "debug",
        "BackendLevel": "info"
    }
}
EOF

-- 환경 설정

source /usr/local/greenplum-db/greenplum_path.sh

-- Flow Server 시작

flowserver -c ./gpconfigs/flowservercfg.json -l ./gpAdminLogs > /dev/null 2>&1 &

-- Flow Server 로그 확인.

tail -f ./gpAdminLogs/whpg_flow_server-2025-09-09.log

12.Setup Job at Flow Server, need to one Job per one Topic

sudo tee -a ~/jsonload_cfg.yaml << EOF
version: v1.0
source:
  kafka:
    brokers:
      - localhost:9092
    topic: epas17.public.avro
    value:
      avro:
        column:
          name: jdata
          type: JSON
        schema_registry: localhost:8081
    control:
      consistency: exactly-once
      fallback_offset: earliest  
    task:
      batch_size:
        interval_ms: 100
        max_count: 10
      window_size: 0
      #window_statement: <udf_or_sql_to_run>
      #prepare_statement: <udf_or_sql_to_run>
      #teardown_statement: <udf_or_sql_to_run>
target:
  database:
    host: 10.0.27.155
    port: 5432
    user: gpadmin
    password: gpadmin
    dbname: whpg
    #staging_schema: public
    error_limit: "5"
    tables:
    - table: avro
      schema: public
      mode:
        # one of the below insert, update, or merge modes
        insert: {}
        #update:
        #  match_columns: id 
        #  update_columns: name, month, amount_paid
        #  update_condition: id = id
      mapping:
        id: ((((jdata->'after')::json->'epas17.public.avro.Value')::json->'id')::json->>'long')::int8
        name: ((((jdata->'after')::json->'epas17.public.avro.Value')::json->'name')::json->>'string')::varchar(100)
        month: ((((jdata->'after')::json->'epas17.public.avro.Value')::json->'month')::json->>'int')::int4
        amount_paid: ((((jdata->'after')::json->'epas17.public.avro.Value')::json->'amount_paid')::json->>'long')::int8
option:
  schedule:
    running_duration: 2s
    auto_stop_restart_interval: 2s
    max_restart_times: 1
EOF

-- Job 등록

flowcli submit --name kafkajson2gp ~/jsonload_cfg.yaml

-- Job 폴더에 있는 모든 job 동시 등록

flowcli submit ~/flowserver/

-- Job 실행 시작

flowcli start kafkajson2gp

-- Job 실행 시작. kafka topic 처음부터 다시 적제.

flowcli start kafkajson2gp --reset-to-earliest

-- Job 중지

flowcli stop kafkajson2gp

-- Job 삭제

flowcli remove kafkajson2gp

12-1. create UDF for JSON Handling

CREATE OR REPLACE FUNCTION fs_udf(jdata jsonb, field_name text)
RETURNS text AS $$
DECLARE
    data_source jsonb;
    payload_key text;
    payload jsonb;
    field_value jsonb;
BEGIN
    -- 1. 'op' 값에 따라 'before' 또는 'after' 객체를 data_source로 선택
    IF jdata->>'op' = 'd' THEN
        data_source := jdata->'before';
    ELSE
        data_source := jdata->'after';
    END IF;
    -- 2. data_source가 비어있으면 NULL 반환
    IF data_source IS NULL OR jsonb_typeof(data_source) = 'null' THEN
        RETURN NULL;
    END IF;
    -- 3. data_source 객체 안의 첫 번째 키(예: "tpcc.public.bmsql_oorder.Value")를 찾고,
    --    해당 키의 값(실제 데이터가 담긴 객체)을 payload로 가져옴
    SELECT key, value INTO payload_key, payload
    FROM jsonb_each(data_source) LIMIT 1;
    IF payload IS NULL THEN
        RETURN NULL;
    END IF;
    -- 4. payload에서 인자로 받은 field_name의 값을 추출
    field_value := payload->field_name;
    -- 필드가 없거나 null이면 NULL 반환
    IF field_value IS NULL OR jsonb_typeof(field_value) = 'null' THEN
        RETURN NULL;
    END IF;
    -- 5. 추출된 값이 JSON 객체인지 단순 값인지 확인하여 최종 값 반환
    IF jsonb_typeof(field_value) = 'object' THEN
        -- 값이 JSON 객체이면(예: {"int": 665}), 그 안의 첫 번째 값을 텍스트로 반환
        RETURN (SELECT value FROM jsonb_each_text(field_value) LIMIT 1);
    ELSE
        -- 값이 단순 값이면, 텍스트로 변환하여 반환
        RETURN field_value#>>'{}';
    END IF;
END;
$$ LANGUAGE plpgsql IMMUTABLE;

12-2. for Merge Mode (Insert, Update, Delete)

version: v1.0
source:
  kafka:
    brokers:
      - 10.0.1.254:9092
    topic: epas17.public.avro
    value:
      avro:
        column:
          name: jdata
          type: JSONB
        schema_registry: 10.0.1.254:8081
    control:
      consistency: exactly-once
      fallback_offset: earliest
    task:
      batch_size:
        interval_ms: 1000
        max_count: 100
      window_size: 0
      #window_statement: <udf_or_sql_to_run>
      #prepare_statement: <udf_or_sql_to_run>
      #teardown_statement: <udf_or_sql_to_run>
target:
  database:
    host: 10.0.27.155
    port: 5432
    user: gpadmin
    password: gpadmin
    dbname: whpg
    #staging_schema: public
    error_limit: "5"
    tables:
    - table: avro
      schema: public
      mode:
        merge:
          match_columns:
            - id
          #  - name
          #  - month
          #  - amount_paid
          #update_columns: [ name, month, amount_paid ]
          #update_condition: (jdata->>'op')::text = 'u'
          delete_condition: (jdata->>'op')::text = 'd'
      mapping:
        id: fs_udf(jdata, 'id')::int8
        name: fs_udf(jdata, 'name')::varchar(100)
        month: fs_udf(jdata, 'month')::int4
        amount_paid: fs_udf(jdata, 'amount_paid')::int8
option:
  schedule:
    running_duration: 2s
    auto_stop_restart_interval: 2s
    max_restart_times: 1

13.Flowcli Options.

flowcli Option. 
  completion  Generate the autocompletion script for the specified shell
  help        Help about any command
  list        List all jobs from the server
  monitor     Monitor flow server job(s)
  remove      Remove flow server job(s)
  start       Start flow server job(s)
  stop        Stop the flow server job(s)
  submit      Submit a configuration file/directory to the server for job submission
flowcli submit --name order_upload loadcfg.yaml
flowcli start order_upload
flowcli list --all
flowcli monitor

makeyaml.sh

#!/bin/bash
ghost="whpg-m"
gport="5432"
user="gpadmin"
password="gpadmin"
DB="whpg"
schema="public"
Dir_Yml="/home/gpadmin/yaml/"
broker="10.0.1.254:9092"
registry="10.0.1.254:8081"
psql -d "$DB" -At -c "SELECT table_name FROM information_schema.tables WHERE table_schema='$schema' AND table_type='BASE TABLE' AND table_name not like 'whpgfs_hs_%';" > schemalist.txt
if [ `cat schemalist.txt | wc -l` -eq 0 ] 
then
echo "Table doesn't exist. Please enter correct name"
echo "Script exiting"
exit;
fi
for i in `cat schemalist.txt`
do
echo "version: v1.0" > $Dir_Yml$i.yml
echo "source:" >> $Dir_Yml$i.yml 
echo "  kafka:" >> $Dir_Yml$i.yml
echo "    brokers:" >> $Dir_Yml$i.yml
echo "      - $broker" >> $Dir_Yml$i.yml
echo "    topic: tpcc.public.bmsql_item " >> $Dir_Yml$i.yml
echo "    value:" >> $Dir_Yml$i.yml
echo "      avro:" >> $Dir_Yml$i.yml
echo "        column:" >> $Dir_Yml$i.yml
echo "          name: jdata" >> $Dir_Yml$i.yml
echo "          type: JSONB" >> $Dir_Yml$i.yml
echo "        schema_registry: $registry" >> $Dir_Yml$i.yml
echo "    control:" >> $Dir_Yml$i.yml
echo "      consistency: exactly-once" >> $Dir_Yml$i.yml
echo "      fallback_offset: earliest" >> $Dir_Yml$i.yml 
echo "    task:" >> $Dir_Yml$i.yml
echo "      batch_size:" >> $Dir_Yml$i.yml
echo "        interval_ms: 100" >> $Dir_Yml$i.yml
echo "        max_count: 1000" >> $Dir_Yml$i.yml
echo "      window_size: 0" >> $Dir_Yml$i.yml
echo "target:" >> $Dir_Yml$i.yml
echo "  database:" >> $Dir_Yml$i.yml
echo "    host: $ghost" >> $Dir_Yml$i.yml
echo "    port: $gport" >> $Dir_Yml$i.yml
echo "    user: $user" >> $Dir_Yml$i.yml
echo "    password: $password" >> $Dir_Yml$i.yml
echo "    dbname: $DB" >> $Dir_Yml$i.yml
echo "    #staging_schema: public" >> $Dir_Yml$i.yml
echo "    error_limit: \"5\"" >> $Dir_Yml$i.yml
echo "    tables:" >> $Dir_Yml$i.yml
echo "    - table: $i" >> $Dir_Yml$i.yml
echo "      schema: $schema" >> $Dir_Yml$i.yml
echo "      mode:" >> $Dir_Yml$i.yml
echo "        merge:" >> $Dir_Yml$i.yml
echo "          match_columns:" >> $Dir_Yml$i.yml
# awk 스크립트를 AWK_SCRIPT 변수에 저장
AWK_SCRIPT='
/^Distributed by/ {
    gsub(/.*\(|\).*/, "")
    split($0, a, ", ")
    for(i in a) {
        print "          - " a[i]
    }
}'
# 변수를 사용하여 awk 명령어 실행
psql -d "$DB" -c "\d $gschema.$i" | awk "$AWK_SCRIPT" >> "$Dir_Yml$i.yml"
#psql -d $DB -c "\d $gschema.$i" | awk '/^Distributed by/ {gsub(/.*\(|\).*/,""); split($0, a, ", "); for(i in a) print "            - " a[i]}' >> $Dir_Yml$i.yml
echo "          delete_condition: (jdata->>'op')::text = 'd'" >> $Dir_Yml$i.yml
echo "      mapping:" >> $Dir_Yml$i.yml
psql -d $DB -c "\d $gschema.$i" | awk '
BEGIN { FS = "|" }
NR > 2 && NF > 1 && !/---/ {
    column = $1;
    gsub(/^[[:space:]]+|[[:space:]]+$/, "", column);
    type = $2;
    gsub(/^[[:space:]]+|[[:space:]]+$/, "", type);
    if (type == "") next;
    sub("character varying", "varchar", type);
    printf "        %s: fs_udf(jdata, \x27%s\x27)::%s\n", column, column, type;
}
' >> $Dir_Yml$i.yml
done

makeyaml_v2.sh for timestamp

#!/bin/bash
ghost="whpg-m"
gport="5432"
user="gpadmin"
password="gpadmin"
DB="whpg"
schema="public"
Dir_Yml="/home/gpadmin/yaml_v2/"
broker="10.0.1.254:9092"
registry="10.0.1.254:8081"
psql -d "$DB" -At -c "SELECT table_name FROM information_schema.tables WHERE table_schema='$schema' AND table_type='BASE TABLE' AND table_name not like 'whpgfs_hs_%';" > schemalist.txt
if [ `cat schemalist.txt | wc -l` -eq 0 ] 
then
echo "Table doesn't exist. Please enter correct name"
echo "Script exiting"
exit;
fi
for i in `cat schemalist.txt`
do
echo "version: v1.0" > $Dir_Yml$i.yml
echo "source:" >> $Dir_Yml$i.yml 
echo "  kafka:" >> $Dir_Yml$i.yml
echo "    brokers:" >> $Dir_Yml$i.yml
echo "      - $broker" >> $Dir_Yml$i.yml
echo "    topic: tpcc.public.bmsql_item " >> $Dir_Yml$i.yml
echo "    value:" >> $Dir_Yml$i.yml
echo "      avro:" >> $Dir_Yml$i.yml
echo "        column:" >> $Dir_Yml$i.yml
echo "          name: jdata" >> $Dir_Yml$i.yml
echo "          type: JSONB" >> $Dir_Yml$i.yml
echo "        schema_registry: $registry" >> $Dir_Yml$i.yml
echo "    control:" >> $Dir_Yml$i.yml
echo "      consistency: exactly-once" >> $Dir_Yml$i.yml
echo "      fallback_offset: earliest" >> $Dir_Yml$i.yml 
echo "    task:" >> $Dir_Yml$i.yml
echo "      batch_size:" >> $Dir_Yml$i.yml
echo "        interval_ms: 100" >> $Dir_Yml$i.yml
echo "        max_count: 1000" >> $Dir_Yml$i.yml
echo "      window_size: 0" >> $Dir_Yml$i.yml
echo "target:" >> $Dir_Yml$i.yml
echo "  database:" >> $Dir_Yml$i.yml
echo "    host: $ghost" >> $Dir_Yml$i.yml
echo "    port: $gport" >> $Dir_Yml$i.yml
echo "    user: $user" >> $Dir_Yml$i.yml
echo "    password: $password" >> $Dir_Yml$i.yml
echo "    dbname: $DB" >> $Dir_Yml$i.yml
echo "    #staging_schema: public" >> $Dir_Yml$i.yml
echo "    error_limit: \"5\"" >> $Dir_Yml$i.yml
echo "    tables:" >> $Dir_Yml$i.yml
echo "    - table: $i" >> $Dir_Yml$i.yml
echo "      schema: $schema" >> $Dir_Yml$i.yml
echo "      mode:" >> $Dir_Yml$i.yml
echo "        merge:" >> $Dir_Yml$i.yml
echo "          match_columns:" >> $Dir_Yml$i.yml
# awk 스크립트를 AWK_SCRIPT 변수에 저장
AWK_SCRIPT='
/^Distributed by/ {
    gsub(/.*\(|\).*/, "")
    split($0, a, ", ")
    for(i in a) {
        print "          - " a[i]
    }
}'
# 변수를 사용하여 awk 명령어 실행
psql -d "$DB" -c "\d $gschema.$i" | awk "$AWK_SCRIPT" >> "$Dir_Yml$i.yml"
#psql -d $DB -c "\d $gschema.$i" | awk '/^Distributed by/ {gsub(/.*\(|\).*/,""); split($0, a, ", "); for(i in a) print "            - " a[i]}' >> $Dir_Yml$i.yml
echo "          delete_condition: (jdata->>'op')::text = 'd'" >> $Dir_Yml$i.yml
echo "      mapping:" >> $Dir_Yml$i.yml
psql -d $DB -c "\d $gschema.$i" | awk '
BEGIN { FS = "|" }
NR > 2 && NF > 1 && !/---/ {
    # 1. 컬럼명과 타입을 추출하고 양쪽 공백을 제거합니다.
    column = $1;
    gsub(/^[[:space:]]+|[[:space:]]+$/, "", column);
    type = $2;
    gsub(/^[[:space:]]+|[[:space:]]+$/, "", type);
    if (type == "") next; # 유효하지 않은 줄은 건너뜁니다.
    # 2. 타입이 "timestamp"를 포함하는지 확인하여 분기 처리합니다.
    if (type ~ /timestamp/) {
        # Timestamp 타입일 경우 특별한 형식으로 출력
        printf "        %s: TO_TIMESTAMP(fs_udf(jdata::jsonb, \x27%s\x27)::bigint / 1000000.0)::timestamp\n", column, column;
    } else {
        # 그 외 모든 타입일 경우 일반 형식으로 출력
        sub("character varying", "varchar", type); # character varying -> varchar 변환
        printf "        %s: fs_udf(jdata, \x27%s\x27)::%s\n", column, column, type;
    }
}
' >> $Dir_Yml$i.yml
done

makeyaml_v3.sh

#!/bin/bash
ghost="whpg-m"
gport="5432"
user="gpadmin"
password="gpadmin"
DB="whpg"
schema="public"
Dir_Yml="/home/gpadmin/yaml_v3/"
broker="10.0.1.254:9092"
registry="10.0.1.254:8081"
topic="tpcc.public.bmsql_history"
psql -d "$DB" -At -c "SELECT table_name FROM information_schema.tables WHERE table_schema='$schema' AND table_type='BASE TABLE' AND table_name not like 'whpgfs_hs_%';" > schemalist.txt
if [ `cat schemalist.txt | wc -l` -eq 0 ] 
then
echo "Table doesn't exist. Please enter correct name"
echo "Script exiting"
exit;
fi
for i in `cat schemalist.txt`
do
echo "version: v1.0" > $Dir_Yml$i.yml
echo "source:" >> $Dir_Yml$i.yml 
echo "  kafka:" >> $Dir_Yml$i.yml
echo "    brokers:" >> $Dir_Yml$i.yml
echo "      - $broker" >> $Dir_Yml$i.yml
echo "    topic: tpcc.public.$i " >> $Dir_Yml$i.yml
echo "    value:" >> $Dir_Yml$i.yml
echo "      avro:" >> $Dir_Yml$i.yml
echo "        column:" >> $Dir_Yml$i.yml
echo "          name: jdata" >> $Dir_Yml$i.yml
echo "          type: json" >> $Dir_Yml$i.yml
echo "        schema_registry: $registry" >> $Dir_Yml$i.yml
echo "    control:" >> $Dir_Yml$i.yml
echo "      consistency: exactly-once" >> $Dir_Yml$i.yml
echo "      fallback_offset: earliest" >> $Dir_Yml$i.yml 
echo "    task:" >> $Dir_Yml$i.yml
echo "      batch_size:" >> $Dir_Yml$i.yml
echo "        interval_ms: 100" >> $Dir_Yml$i.yml
echo "        max_count: 1000" >> $Dir_Yml$i.yml
echo "      window_size: 0" >> $Dir_Yml$i.yml
echo "target:" >> $Dir_Yml$i.yml
echo "  database:" >> $Dir_Yml$i.yml
echo "    host: $ghost" >> $Dir_Yml$i.yml
echo "    port: $gport" >> $Dir_Yml$i.yml
echo "    user: $user" >> $Dir_Yml$i.yml
echo "    password: $password" >> $Dir_Yml$i.yml
echo "    dbname: $DB" >> $Dir_Yml$i.yml
echo "    #staging_schema: public" >> $Dir_Yml$i.yml
echo "    error_limit: \"5\"" >> $Dir_Yml$i.yml
echo "    tables:" >> $Dir_Yml$i.yml
echo "    - table: $i" >> $Dir_Yml$i.yml
echo "      schema: $schema" >> $Dir_Yml$i.yml
echo "      mode:" >> $Dir_Yml$i.yml
echo "        merge:" >> $Dir_Yml$i.yml
echo "          match_columns:" >> $Dir_Yml$i.yml
# awk 스크립트를 AWK_SCRIPT 변수에 저장
AWK_SCRIPT='
/^Distributed by/ {
    gsub(/.*\(|\).*/, "")
    split($0, a, ", ")
    for(i in a) {
        print "          - " a[i]
    }
}'
# 변수를 사용하여 awk 명령어 실행
psql -d "$DB" -c "\d $gschema.$i" | awk "$AWK_SCRIPT" >> "$Dir_Yml$i.yml"
echo "          delete_condition: (jdata->>'op')::text = 'd'" >> $Dir_Yml$i.yml
echo "      mapping:" >> $Dir_Yml$i.yml
psql -d $DB -c "\d $gschema.$i" > schema.txt
awk -v server_name="tpcc" -f generate_logic.awk schema.txt >> $Dir_Yml$i.yml
done

generate_logic.awk

# Debezium JSON 파싱 로직 생성 스크립트 (Nullable/Non-Nullable 모두 처리)
# 사용법: awk -v server_name="tpcc" -f generate_logic.awk schema.txt
# PostgreSQL 타입을 Avro/JSON 키(int, long 등)로 변환하는 함수
function get_avro_key(pg_type) {
    if (pg_type ~ /integer/) return "int";
    if (pg_type ~ /bigint/) return "long";
    if (pg_type ~ /timestamp/) return "long";
    if (pg_type ~ /numeric/) return "double";
    if (pg_type ~ /character varying|varchar|text/) return "string";
    return "string"; # 기본값
}
# 스크립트 시작 시 필드 구분자를 | 로 설정
BEGIN { FS = "|" }
# 첫 번째 줄에서 테이블/스키마 이름을 추출하여 JSON 경로 생성
NR == 1 {
    match($0, /"([^"]+)"/);
    full_table_name = substr($0, RSTART + 1, RLENGTH - 2);
    json_path = sprintf("->\x27%s.%s.Value\x27", server_name, full_table_name);
}
# 컬럼 정의 라인만 처리
NR > 2 && NF > 1 && !/---/ {
    # 컬럼명과 타입 추출 및 공백 제거
    column = $1;
    gsub(/^[[:space:]]+|[[:space:]]+$/, "", column);
    pg_type = $2;
    gsub(/^[[:space:]]+|[[:space:]]+$/, "", pg_type);
    if (pg_type == "") next;
    # 최종 SQL CAST 타입 설정
    sql_cast_type = pg_type;
    sub("character varying", "varchar", sql_cast_type);
    if (sql_cast_type == "integer") sql_cast_type = "int8";
    # Avro/JSON 키 가져오기
    avro_key = get_avro_key(pg_type);
    # 'before'와 'after' 블록의 데이터 컨테이너 경로
    before_container = sprintf("(((jdata->\x27before\x27)::json%s)::json)", json_path);
    after_container = sprintf("(((jdata->\x27after\x27)::json%s)::json)", json_path);
    # 1. Nullable 형식 경로, 2. Non-Nullable 형식 경로
    nested_before = sprintf("(%s->\x27%s\x27)::json->>\x27%s\x27", before_container, column, avro_key);
    direct_before = sprintf("(%s->>\x27%s\x27)", before_container, column);
    nested_after = sprintf("(%s->\x27%s\x27)::json->>\x27%s\x27", after_container, column, avro_key);
    direct_after = sprintf("(%s->>\x27%s\x27)", after_container, column);
    # COALESCE로 두 경로를 합침
    coalesced_before = sprintf("COALESCE(%s, %s)", nested_before, direct_before);
    coalesced_after = sprintf("COALESCE(%s, %s)", nested_after, direct_after);
    # timestamp 타입 특별 처리
    if (pg_type ~ /timestamp/) {
        final_before = sprintf("TO_TIMESTAMP((%s)::bigint / 1000000.0)::timestamp", coalesced_before);
        final_after = sprintf("TO_TIMESTAMP((%s)::bigint / 1000000.0)::timestamp", coalesced_after);
    } else {
        final_before = sprintf("(%s)::%s", coalesced_before, sql_cast_type);
        final_after = sprintf("(%s)::%s", coalesced_after, sql_cast_type);
    }
    # 최종 CASE 구문 출력
    printf "        %s: CASE WHEN (jdata->>\x27op\x27)=\x27d\x27 THEN %s ELSE %s END\n", column, final_before, final_after;
}

makeyaml_kyobo.sh

#!/bin/bash
ghost="whpg-m"
gport="5432"
user="gpadmin"
password="gpadmin"
DB="whpg"
schema="public"
Dir_Yml="/home/gpadmin/yaml_kyobo/"
broker="10.0.1.254:9092"
registry="10.0.1.254:8081"
topic="tpcc.public.bmsql_history"
psql -d "$DB" -At -c "SELECT table_name FROM information_schema.tables WHERE table_schema='$schema' AND table_type='BASE TABLE' AND table_name not like 'whpgfs_hs_%';" > schemalist.txt
if [ `cat schemalist.txt | wc -l` -eq 0 ] 
then
echo "Table doesn't exist. Please enter correct name"
echo "Script exiting"
exit;
fi
for i in `cat schemalist.txt`
do
echo "version: v1.0" > $Dir_Yml$i.yml
echo "source:" >> $Dir_Yml$i.yml 
echo "  kafka:" >> $Dir_Yml$i.yml
echo "    brokers:" >> $Dir_Yml$i.yml
echo "      - $broker" >> $Dir_Yml$i.yml
echo "    topic: tpcc.public.$i " >> $Dir_Yml$i.yml
echo "    value:" >> $Dir_Yml$i.yml
echo "      avro:" >> $Dir_Yml$i.yml
echo "        column:" >> $Dir_Yml$i.yml
echo "          name: jdata" >> $Dir_Yml$i.yml
echo "          type: json" >> $Dir_Yml$i.yml
echo "        schema_registry: $registry" >> $Dir_Yml$i.yml
echo "    control:" >> $Dir_Yml$i.yml
echo "      consistency: exactly-once" >> $Dir_Yml$i.yml
echo "      fallback_offset: earliest" >> $Dir_Yml$i.yml 
echo "    task:" >> $Dir_Yml$i.yml
echo "      batch_size:" >> $Dir_Yml$i.yml
echo "        interval_ms: 100" >> $Dir_Yml$i.yml
echo "        max_count: 1000" >> $Dir_Yml$i.yml
echo "      window_size: 0" >> $Dir_Yml$i.yml
echo "target:" >> $Dir_Yml$i.yml
echo "  database:" >> $Dir_Yml$i.yml
echo "    host: $ghost" >> $Dir_Yml$i.yml
echo "    port: $gport" >> $Dir_Yml$i.yml
echo "    user: $user" >> $Dir_Yml$i.yml
echo "    password: $password" >> $Dir_Yml$i.yml
echo "    dbname: $DB" >> $Dir_Yml$i.yml
echo "    #staging_schema: public" >> $Dir_Yml$i.yml
echo "    error_limit: \"5\"" >> $Dir_Yml$i.yml
echo "    tables:" >> $Dir_Yml$i.yml
echo "    - table: $i" >> $Dir_Yml$i.yml
echo "      schema: $schema" >> $Dir_Yml$i.yml
echo "      mode:" >> $Dir_Yml$i.yml
echo "        merge:" >> $Dir_Yml$i.yml
echo "          match_columns:" >> $Dir_Yml$i.yml
# awk 스크립트를 AWK_SCRIPT 변수에 저장
AWK_SCRIPT='
/^Distributed by/ {
    gsub(/.*\(|\).*/, "")
    split($0, a, ", ")
    for(i in a) {
        print "          - " a[i]
    }
}'
# 변수를 사용하여 awk 명령어 실행
psql -d "$DB" -c "\d $gschema.$i" | awk "$AWK_SCRIPT" >> "$Dir_Yml$i.yml"
echo "          delete_condition: ((jdata->'__op')::json->>'string')::text = 'd'" >> $Dir_Yml$i.yml
echo "      mapping:" >> $Dir_Yml$i.yml
psql -d $DB -c "\d $gschema.$i" > schema.txt
#awk -v server_name="tpcc" -f generate_kyobo.awk schema.txt >> $Dir_Yml$i.yml
awk -f generate_kyobo.awk schema.txt >> $Dir_Yml$i.yml
done

generate_kyobo.awk

# Debezium JSON 파싱 로직 생성 스크립트 v2
# 사용법: cat schema.txt | awk -f generate_logic.awk
# PostgreSQL 타입을 Avro/JSON 키(int, long 등)로 변환하는 함수
function get_avro_key(pg_type) {
    if (pg_type ~ /integer/) return "int";
    if (pg_type ~ /bigint/) return "long";
    if (pg_type ~ /timestamp/) return "long";
    if (pg_type ~ /numeric/) return "double";
    if (pg_type ~ /character varying|varchar|text/) return "string";
    return "string"; # 기본값
}
# 스크립트 시작 시 필드 구분자를 | 로 설정
BEGIN { FS = "|" }
# 컬럼 정의 라인만 처리
NR > 2 && NF > 1 && !/---/ {
    # 컬럼명과 타입 추출 및 공백 제거
    column = $1;
    gsub(/^[[:space:]]+|[[:space:]]+$/, "", column);
    pg_type = $2;
    gsub(/^[[:space:]]+|[[:space:]]+$/, "", pg_type);
    if (pg_type == "") next; # 유효하지 않으면 건너뛰기
    # 최종 SQL CAST 타입 설정
    sql_cast_type = pg_type;
    sub("character varying", "varchar", sql_cast_type);
    if (sql_cast_type == "integer") sql_cast_type = "int8";
    # Avro/JSON 키 가져오기
    avro_key = get_avro_key(pg_type);
    # 1. Nested 형식 경로, 2. Flat 형식 경로
    nested_path = sprintf("((jdata->\x27%s\x27)::json->>\x27%s\x27)", column, avro_key);
    flat_path = sprintf("(jdata->>\x27%s\x27)", column);
    # COALESCE로 두 경로를 합침
    json_expression = sprintf("COALESCE(%s, %s)", nested_path, flat_path);
    # timestamp 타입 특별 처리
    if (pg_type ~ /timestamp/) {
        final_expression = sprintf("TO_TIMESTAMP((%s)::bigint / 1000000.0)::timestamp", json_expression);
    } else {
        final_expression = sprintf("(%s)::%s", json_expression, sql_cast_type);
    }
    # 최종 라인 출력
    printf "        %s: %s\n", column, final_expression;
}