這是一個可配置的 ETL (Extract, Transform, Load) 工具,專為將資料從 MySQL 關係型資料庫遷移到 Neo4j 圖形資料庫而設計。
此工具透過讀取 YAML 設定檔來定義整個遷移流程,包括資料來源、資料轉換規則、目標節點 (Node) 與關聯 (Relationship) 的結構。它利用 SQLAlchemy 進行資料抽取,使用 Pandas DataFrame 進行資料處理,並透過 py2neo 將結構化資料高效地載入 Neo4j。
- 設定檔驅動:所有 ETL 邏輯皆由 YAML 檔定義,無需修改程式碼即可調整遷移流程。
- 彈性的資料轉換:支援透過
formatter自定義欄位名稱與對應,並可在載入前對資料集進行鏈式操作 (如去重、移除空值、篩選欄位)。 - 分頁式讀取:可設定
offset和limit,批次處理大量資料,避免記憶體耗盡。 - 節點與關聯的精確對應:能夠精確定義節點的標籤 (Label)、合併鍵 (Merge Key),以及節點之間關聯的起始點、終點與類型。
- 自動化與日誌記錄:主程式
neoImport.py可自動執行 YAML 中定義的所有任務,並記錄詳細的執行日誌,包括成功、失敗與錯誤資訊。 - 環境隔離:透過
.env檔管理資料庫連線資訊,將敏感的配置與主要程式碼分離。
- 資料庫 ORM:
SQLAlchemy - Neo4j 驅動:
py2neo - 資料處理:
pandas - 設定檔解析:
PyYAML - 環境變數管理:
python-dotenv - 核心設計模式:
- 工廠模式 (
node_map_factory.py): 用於生成節點和關聯資料的工廠。 - 抽象基礎類別 (
dfmodels.py): 定義了 DataFrame 創建者的通用介面。 - 模型化 (
base/models.py,env.py): 將 YAML 設定、環境變數及日誌格式都模型化,提高程式碼的穩定性與可讀性。
- 工廠模式 (
首先,請確認已安裝所有必要的 Python 套件。可以透過 requirements.txt 檔案進行安裝:
pip install -r requirements.txt在專案根目錄下建立一個 .env 檔案。此檔案用於存放 MySQL 和 Neo4j 資料庫的連線資訊。
# .env 檔案範例
# MySQL 資料庫連線資訊
dbengine="mysql+pymysql://{dbaccount}:{dbpasswd}@{host}/{db}"
host="your_mysql_host"
dbaccount="your_mysql_username"
dbpasswd="your_mysql_password"
db="your_mysql_database"
# Neo4j 資料庫連線資訊
neo_conn="bolt://your_neo4j_host:7687"
neo_account="your_neo4j_username"
neo_token="your_neo4j_password"
# ORM 和 Formatter 的 Python 模組路徑
data_base_model="path.to.your.sql_models"
formatter_base_model="path.to.your.format_models"在 yaml/ 資料夾中建立您的設定檔 (例如 demo.yaml)。此檔案是整個 ETL 流程的核心。
# yaml/demo.yaml
description: "Neo4j 節點與關聯的示範設定"
serviceVersion: 1.0
offset: 0
limit: 5000
# 定義要建立的節點
nodes:
- env: ETLSQL # 使用 .env 中的 ETLSQL 環境設定
model_name: DemoUser # SQLAlchemy ORM 模型名稱
formatter_conf: # 資料格式化設定
model_name: CustomerModel # Formatter 類別名稱
attrs: # 欄位對應: {新欄位名: 來源欄位名}
fullname: "完整姓名"
age: "年齡"
idcode: "客戶編碼"
df_action_map: # DataFrame 操作
deduplicates: # 根據以下欄位去重
- "客戶編碼"
remove_empty_rows: # 如果以下欄位為空,則移除該行
- "完整姓名"
- "客戶編碼"
merge_keys: # 在 Neo4j 中 Merge 節點時使用的鍵
- "客戶" # 節點 Label
- "完整姓名" # 節點屬性
node_label: # 為此節點設定的 Label
- "客戶"
# 定義要建立的關聯
relations:
- env: ETLSQL
model_name: DemoUser
relation_name: "客戶-其他資訊" # 關聯的名稱 (類型)
start_key: "完整姓名" # 來源資料中,代表起始節點的欄位
end_key: "姓名" # 來源資料中,代表結束節點的欄位
formatter_conf:
model_name: CustomerRel
attrs:
fullname: "完整姓名"
name: "姓名"
start_node_key: # 起始節點的查找鍵 (對應 node 的 merge_keys)
- "客戶"
- "完整姓名"
end_node_key: # 結束節點的查找鍵
- "客戶其他資訊"
- "姓名"一切準備就緒後,透過命令列執行 neoImport.py,並將您的設定檔路徑作為參數傳入。
python neoImport.py yaml/demo.yaml程式將會:
- 讀取並解析
yaml/demo.yaml。 - 連接到 MySQL 資料庫並根據設定分頁提取資料。
- 對資料進行格式化與清理。
- 連接到 Neo4j 並建立或更新對應的節點與關聯。
- 在
logs/資料夾中生成執行日誌。
.
├── logs/ # 存放執行日誌
├── models/
│ └── logFormatter/
│ └── formatter.py # 定義日誌的資料模型
├── neo4jmodels/
│ ├── base/ # 自定義 Model 和 Field 的基礎類別
│ ├── CreateDataFrame*.py # 從不同資料來源創建 DataFrame 的類別
│ ├── DataFrameActor.py # DataFrame 的操作封裝
│ ├── env.py # 環境變數載入與模型化
│ ├── node_data_factory.py # 資料提取與格式化的工廠
│ ├── node_map_factory.py # Neo4j 節點與關聯的生成與建立
│ └── utils.py # 資料庫連線等工具函式
├── settings/ # 專案設定
├── yaml/
│ └── demo.yaml # ETL 流程設定檔
├── neoImport.py # 主執行程式
├── requirements.txt # 依賴套件列表
└── README.md # 本文件