Skip to content

Latest commit

 

History

History
355 lines (275 loc) · 7.37 KB

File metadata and controls

355 lines (275 loc) · 7.37 KB

Kafka + EDB WarehousePG + Flow Server 연동 가이드

Schema Registry 없이 Avro 포맷을 사용하여 Kafka 메시지를 EDB WarehousePG에 적재하는 전체 작업 순서입니다.


작업 순서

  1. WarehousePG 테스트 테이블 생성
  2. Flow Server 실행
  3. Kafka 환경 구성 및 테스트 토픽 구성
  4. Schema Registry 없이 Avro 포맷 사용을 위한 스키마 파일 생성
  5. producer.py 작성 및 실행
  6. Topic 확인
  7. Flow Server Job 생성 및 Submit, Start
  8. WarehousePG 테스트 테이블 확인

1. WarehousePG 테스트 테이블 생성

CREATE TABLE user_events (
    event_id    TEXT        NOT NULL,
    user_id     BIGINT      NOT NULL,
    event_type  TEXT        NOT NULL,
    event_ts    TIMESTAMPTZ NOT NULL
)
WITH (
    appendoptimized = true,
    orientation     = column,
    compresstype    = zstd,
    compresslevel   = 1
)
DISTRIBUTED BY (user_id);

2. Avro 스키마 파일 생성

Kafka Producer와 Consumer에서 모두 사용합니다.

tee user-events.avsc << EOF
{
  "type": "record",
  "name": "UserEvent",
  "namespace": "com.example.events",
  "fields": [
    {"name": "event_id",   "type": "string"},
    {"name": "user_id",    "type": "long"},
    {"name": "event_type", "type": "string"},
    {"name": "event_ts",   "type": {"type": "long", "logicalType": "timestamp-millis"}}
  ]
}
EOF

3. Kafka 토픽 관리

토픽 리스트 확인

kafka-topics --bootstrap-server localhost:9092 --list

토픽 생성

kafka-topics --bootstrap-server localhost:9092 --create \
  --partitions 3 --replication-factor 1 --topic user-events

토픽 삭제

kafka-topics --bootstrap-server localhost:9092 --delete --topic user-events

토픽 상세 확인

kafka-topics --bootstrap-server localhost:9092 --describe --topic user-events

특정 Topic 건수/Offset 확인

kafka-get-offsets --bootstrap-server localhost:9092 --topic user-events

특정 Topic 데이터 확인 (처음부터) — 텍스트/JSON

kafka-console-consumer --bootstrap-server localhost:9092 \
  --topic user-events --from-beginning

특정 Topic 데이터 확인 (처음부터) — Avro (Schema Registry 사용 시)

kafka-avro-console-consumer --bootstrap-server localhost:9092 \
  --topic user-events --from-beginning \
  --property schema.registry.url=http://localhost:8081

Consumer Group 리스트 확인

kafka-consumer-groups --bootstrap-server localhost:9092 --list

특정 Consumer Group 확인 (Partition, Offset)

kafka-consumer-groups --bootstrap-server localhost:9092 \
  --describe --group user_events

4. Python Avro 환경 구성

pip install avro
pip install confluent_kafka
pip install fastavro

5. Producer 작성 및 실행

producer.py 파일 생성

tee producer.py << EOF
# producer.py
import io, time, uuid, struct, random
import avro.schema, avro.io
from confluent_kafka import Producer

MAGIC_BYTE  = 0
SCHEMA_ID   = 1
TOTAL       = 1000
EVENT_TYPES = ["purchase", "view", "click", "cart", "search"]

with open("user-events.avsc") as f:
    schema = avro.schema.parse(f.read())

def to_avro(record):
    buf = io.BytesIO()
    buf.write(struct.pack(">bI", MAGIC_BYTE, SCHEMA_ID))
    avro.io.DatumWriter(schema).write(record, avro.io.BinaryEncoder(buf))
    return buf.getvalue()

def delivery_report(err, msg):
    if err:
        print(f"[ERROR] 전송 실패: {err}")

producer = Producer({"bootstrap.servers": "localhost:9092"})

for i in range(TOTAL):
    event = {
        "event_id":   str(uuid.uuid4()),
        "user_id":    random.randint(1001, 1100),
        "event_type": random.choice(EVENT_TYPES),
        "event_ts":   int(time.time() * 1000),
    }
    raw = to_avro(event)
    producer.produce(
        topic    = "user-events",
        key      = str(event["user_id"]),
        value    = raw,
        callback = delivery_report,
    )
    if (i + 1) % 100 == 0:
        producer.flush()
        print(f"진행: {i + 1}/{TOTAL} 건 발행 완료")

producer.flush()
print(f"\n완료: 총 {TOTAL}건 발행")
EOF

Producer 실행

python3 producer.py

6. Flow Server 설정 및 실행

Flow Server 설정 파일 생성

tee flowservercfg.json << EOF
{
    "Host": "",
    "Port": 6060,
    "Gpfdist": {
        "Host": "10.0.3.8",
        "Port": 6070,
        "ReuseTables": false
    },
    "Prometheus": {
        "Host": "",
        "Port": 9080,
        "MetricsPath": "/flow_metrics"
    },
    "DebugPort": 6080,
    "Logging": {
        "SplitLogByJob": false,
        "FrontendLevel": "debug",
        "BackendLevel": "info"
    }
}
EOF

Flow Server 실행

flowserver -c ./gpconfigs/flowservercfg.json -l ./gpAdminLogs &

7. Flow Server Job 관리

Job 설정 파일 생성

tee user_events.yaml << EOF
version: v1.0
source:
  kafka:
    brokers:
      - 10.0.1.254:9092
    topic: user-events
    value:
      avro:
        column:
          name: jdata
          type: json
        schema_file: /home/gpadmin/flowserver/avro_schema/user-events.avsc
    control:
      consistency: exactly-once
      fallback_offset: earliest
    task:
      batch_size:
        interval_ms: 100
        max_count: 10
      window_size: 0
target:
  database:
    host: whpg-m
    port: 5432
    user: gpadmin
    password: gpadmin
    dbname: gpadmin
    staging_schema: public
    error_limit: "1"
    tables:
    - table: user_events
      schema: tpcds
      mode:
        insert: {}
      mapping:
        event_id:   (jdata->>'event_id')::text
        user_id:    (jdata->>'user_id')::bigint
        event_type: (jdata->>'event_type')::text
        event_ts:   to_timestamp((jdata->>'event_ts')::bigint / 1000.0)
EOF

Job 목록 확인

flowcli list

Job Submit (제출)

flowcli submit ./user_events.yaml

Job 실행

flowcli start user_events

Job 중지

flowcli stop user_events

Job 제거(삭제)

flowcli remove user_events

8. Flow Server 로그 확인

cd gpAdminLogs
ls -l whpg*

출력 예시:

-rw-r--r-- 1 gpadmin gpadmin  3122 May  6 10:41 whpg_flow_cli-2026-05-06.log
-rw-r--r-- 1 gpadmin gpadmin  5460 May  7 05:20 whpg_flow_cli-2026-05-07.log
-rw-r--r-- 1 gpadmin gpadmin   510 May  8 00:57 whpg_flow_cli-2026-05-08.log
-rw-r--r-- 1 gpadmin gpadmin  6416 May 18 17:49 whpg_flow_cli-2026-05-18.log
-rw-r--r-- 1 gpadmin gpadmin 46822 May 18 17:49 whpg_flow_server-2026-05-06.log

Troubleshooting

5 byte header for value is missing or does not have 0 magic byte

2026-05-18T17:37:17.599Z WARN [user_events] Failed to process message:
failed to get schema ID from AVRO data:
5 byte header for value is missing or does not have 0 magic byte

원인: producer.py에서 Confluent 5바이트 헤더(0x00 + schema_id 4바이트)가 누락된 채 순수 Avro binary로 발행됨.

해결: producer.pyto_avro() 함수에서 struct.pack(">bI", MAGIC_BYTE, SCHEMA_ID)로 헤더를 추가한 후 재실행.

def to_avro(record):
    buf = io.BytesIO()
    buf.write(struct.pack(">bI", MAGIC_BYTE, SCHEMA_ID))  # 5바이트 헤더 추가
    avro.io.DatumWriter(schema).write(record, avro.io.BinaryEncoder(buf))
    return buf.getvalue()

→ 헤더 추가 후 정상 처리됨.