11#!/usr/bin/env python3
22"""
3- Load TPC-H SF1 parquet files into Microsoft SQL Server with
3+ Generate TPC-H data using DuckDB and load it into Microsoft SQL Server with
44primary keys, proper types, and foreign key constraints.
55
6+ Uses DuckDB's built-in TPC-H generator (SF=0.1) — no large file downloads.
7+
68Creation order respects FK dependencies:
79 region → nation → part, supplier, customer
810 supplier + part → partsupp
911 customer → orders → lineitem
1012"""
1113
1214import time
13- import pyarrow .parquet as pq
14- import pyarrow as pa
15+ import duckdb
1516import pyodbc
1617import os
1718
2122MSSQL_USER = os .getenv ("MSSQL_USER" , "sa" )
2223MSSQL_PASSWORD = os .getenv ("MSSQL_PASSWORD" , "SpiceDemo1!" )
2324
24- DATA_DIR = "/data/tpch_sf1"
25+ # Scale factor: 0.1 → ~600K lineitem rows, fast to load
26+ TPCH_SF = float (os .getenv ("TPCH_SF" , "0.1" ))
2527
26- BATCH_SIZE = 500
28+ BATCH_SIZE = 1000
2729
2830# ---------------------------------------------------------------------------
2931# Connection helpers
@@ -41,28 +43,21 @@ def make_conn_str(database: str) -> str:
4143
4244
4345def wait_for_mssql (max_retries : int = 30 , delay : int = 2 ):
44- """Retry until SQL Server is accepting connections."""
4546 for attempt in range (max_retries ):
4647 try :
47- conn = pyodbc .connect (make_conn_str ("master" ), autocommit = True )
48+ conn = pyodbc .connect (make_conn_str ("master" ), autocommit = True , timeout = 5 )
4849 conn .close ()
4950 print ("SQL Server is ready." )
5051 return
51- except pyodbc . OperationalError :
52+ except Exception :
5253 print (f"Waiting for SQL Server... ({ attempt + 1 } /{ max_retries } )" )
5354 time .sleep (delay )
5455 raise RuntimeError ("SQL Server did not become ready in time." )
5556
5657
5758def create_database ():
58- """Create the tpch database if it does not already exist."""
5959 conn = pyodbc .connect (make_conn_str ("master" ), autocommit = True )
60- cur = conn .cursor ()
61- cur .execute (
62- "IF DB_ID(?) IS NULL CREATE DATABASE tpch" ,
63- (MSSQL_DB ,),
64- )
65- cur .close ()
60+ conn .cursor ().execute ("IF DB_ID(?) IS NULL CREATE DATABASE tpch" , (MSSQL_DB ,))
6661 conn .close ()
6762 print (f"Database '{ MSSQL_DB } ' ready." )
6863
@@ -72,7 +67,6 @@ def create_database():
7267# ---------------------------------------------------------------------------
7368
7469DDL_STATEMENTS = [
75- # region
7670 """
7771 IF OBJECT_ID('dbo.region', 'U') IS NULL
7872 CREATE TABLE dbo.region (
@@ -82,8 +76,6 @@ def create_database():
8276 CONSTRAINT pk_region PRIMARY KEY (r_regionkey)
8377 )
8478 """ ,
85-
86- # nation
8779 """
8880 IF OBJECT_ID('dbo.nation', 'U') IS NULL
8981 CREATE TABLE dbo.nation (
@@ -96,8 +88,6 @@ def create_database():
9688 FOREIGN KEY (n_regionkey) REFERENCES dbo.region (r_regionkey)
9789 )
9890 """ ,
99-
100- # part
10191 """
10292 IF OBJECT_ID('dbo.part', 'U') IS NULL
10393 CREATE TABLE dbo.part (
@@ -113,8 +103,6 @@ def create_database():
113103 CONSTRAINT pk_part PRIMARY KEY (p_partkey)
114104 )
115105 """ ,
116-
117- # supplier
118106 """
119107 IF OBJECT_ID('dbo.supplier', 'U') IS NULL
120108 CREATE TABLE dbo.supplier (
@@ -130,8 +118,6 @@ def create_database():
130118 FOREIGN KEY (s_nationkey) REFERENCES dbo.nation (n_nationkey)
131119 )
132120 """ ,
133-
134- # customer
135121 """
136122 IF OBJECT_ID('dbo.customer', 'U') IS NULL
137123 CREATE TABLE dbo.customer (
@@ -148,8 +134,6 @@ def create_database():
148134 FOREIGN KEY (c_nationkey) REFERENCES dbo.nation (n_nationkey)
149135 )
150136 """ ,
151-
152- # partsupp
153137 """
154138 IF OBJECT_ID('dbo.partsupp', 'U') IS NULL
155139 CREATE TABLE dbo.partsupp (
@@ -165,8 +149,6 @@ def create_database():
165149 FOREIGN KEY (ps_suppkey) REFERENCES dbo.supplier (s_suppkey)
166150 )
167151 """ ,
168-
169- # orders
170152 """
171153 IF OBJECT_ID('dbo.orders', 'U') IS NULL
172154 CREATE TABLE dbo.orders (
@@ -184,8 +166,6 @@ def create_database():
184166 FOREIGN KEY (o_custkey) REFERENCES dbo.customer (c_custkey)
185167 )
186168 """ ,
187-
188- # lineitem
189169 """
190170 IF OBJECT_ID('dbo.lineitem', 'U') IS NULL
191171 CREATE TABLE dbo.lineitem (
@@ -214,65 +194,58 @@ def create_database():
214194 """ ,
215195]
216196
217- # Tables in load order (parent tables before child tables)
218197TABLES = ["region" , "nation" , "part" , "supplier" , "customer" , "partsupp" , "orders" , "lineitem" ]
219198
220199
221200# ---------------------------------------------------------------------------
222201# Data loading
223202# ---------------------------------------------------------------------------
224203
225- def load_table (conn , table_name : str ):
226- path = f"{ DATA_DIR } /{ table_name } .parquet"
227- print (f" Reading { path } ..." )
228- arrow_table = pq .read_table (path )
229- print (f" { arrow_table .num_rows :,} rows, schema: { arrow_table .schema } " )
230-
231- # Cast decimal columns to float64; pyodbc will insert them as floats and
232- # SQL Server will coerce back to DECIMAL via the table DDL.
233- arrays = []
234- for field in arrow_table .schema :
235- col = arrow_table .column (field .name )
236- if pa .types .is_decimal (field .type ):
237- col = col .cast (pa .float64 ())
238- arrays .append (col )
239- arrow_table = pa .table (dict (zip (arrow_table .schema .names , arrays )))
204+ def load_table (duck , mssql_conn , table_name : str ):
205+ result = duck .execute (f"SELECT * FROM { table_name } " )
206+ columns = [desc [0 ] for desc in result .description ]
207+ rows = result .fetchall ()
208+ num_rows = len (rows )
209+ print (f" { num_rows :,} rows" )
240210
241- columns = arrow_table .schema .names
242211 placeholders = ", " .join ("?" for _ in columns )
243212 col_list = ", " .join (columns )
244213 insert_sql = f"INSERT INTO dbo.{ table_name } ({ col_list } ) VALUES ({ placeholders } )"
245214
246- # Convert arrow table to list of Python tuples
247- rows = list (zip (* [arrow_table .column (c ).to_pylist () for c in columns ]))
248-
249- cur = conn .cursor ()
215+ cur = mssql_conn .cursor ()
250216 for i in range (0 , len (rows ), BATCH_SIZE ):
251217 cur .executemany (insert_sql , rows [i : i + BATCH_SIZE ])
252- conn .commit ()
218+ mssql_conn .commit ()
253219 cur .close ()
254- print (f" Loaded { arrow_table . num_rows :,} rows into { table_name } ." )
220+ print (f" Loaded { num_rows :,} rows into { table_name } ." )
255221
256222
257223def main ():
258224 wait_for_mssql ()
259225 create_database ()
260226
261- conn = pyodbc .connect (make_conn_str (MSSQL_DB ), autocommit = False )
227+ print (f"Generating TPC-H SF={ TPCH_SF } with DuckDB ..." )
228+ duck = duckdb .connect ()
229+ duck .execute ("INSTALL tpch; LOAD tpch" )
230+ duck .execute (f"CALL dbgen(sf={ TPCH_SF } )" )
231+ print ("TPC-H data generated." )
232+
233+ mssql_conn = pyodbc .connect (make_conn_str (MSSQL_DB ), autocommit = False )
262234
263235 print ("Creating schema ..." )
264- cur = conn .cursor ()
236+ cur = mssql_conn .cursor ()
265237 for stmt in DDL_STATEMENTS :
266238 cur .execute (stmt )
267- conn .commit ()
239+ mssql_conn .commit ()
268240 cur .close ()
269241 print ("Schema created." )
270242
271243 for table in TABLES :
272244 print (f"\n Loading { table } ..." )
273- load_table (conn , table )
245+ load_table (duck , mssql_conn , table )
274246
275- conn .close ()
247+ duck .close ()
248+ mssql_conn .close ()
276249 print ("\n All TPC-H tables loaded successfully!" )
277250
278251
0 commit comments