Skip to content

Commit 0ed465f

Browse files
committed
Merge branch 'paper' into main
2 parents b91b93b + 9f10c00 commit 0ed465f

File tree

4 files changed

+171
-131
lines changed

4 files changed

+171
-131
lines changed

paramgen/parameter_curation.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
table_dir = sys.argv[1]
2323
out_dir = sys.argv[2]
24+
random.seed(42)
2425

2526
THRESH_HOLD = 0
2627
THRESH_HOLD_6 = 0
@@ -676,19 +677,21 @@ def process_withdraw_query():
676677

677678

678679
def main():
679-
queries = [3, 1, 8, 7, 10, 11, 2, 5, 6]
680+
queries = [6, 2, 3, 5, 7, 11, 8, 10, 1]
680681
# queries = [3]
681682

682683
multiprocessing.set_start_method('forkserver')
683-
processes = []
684-
685-
for query_id in queries:
686-
p = multiprocessing.Process(target=process_query, args=(query_id,))
687-
p.start()
688-
processes.append(p)
689-
690-
for p in processes:
691-
p.join()
684+
685+
batch_size = 5
686+
for i in range(0, len(queries), batch_size):
687+
processes = []
688+
for query_id in queries[i:i + batch_size]:
689+
p = multiprocessing.Process(target=process_query, args=(query_id,))
690+
p.start()
691+
processes.append(p)
692+
693+
for p in processes:
694+
p.join()
692695

693696

694697
if __name__ == "__main__":

scripts/run_paramgen.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/bin/bash
22

33
LDBC_FINBENCH_DATAGEN_JAR=target/ldbc_finbench_datagen-0.2.0-SNAPSHOT-jar-with-dependencies.jar
4-
OUTPUT_DIR=out/sf3/
4+
OUTPUT_DIR=out/
55

66
# Note: generate factor tables with --generate-factors
77

src/main/scala/ldbc/finbench/datagen/factors/FactorGenerationStage.scala

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,43 +77,81 @@ object FactorGenerationStage extends DatagenStage {
7777
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
7878
.option("header", "true")
7979
.option("delimiter", "|")
80-
.load(s"${args.outputDir}/raw/transfer/*.csv")
81-
.select($"fromId", $"toId", $"amount".cast("double"), $"createTime")
80+
.load(s"${args.outputDir}/snapshot/AccountTransferAccount.csv")
81+
.select(
82+
$"fromId",
83+
$"toId",
84+
$"amount".cast("double"),
85+
(unix_timestamp(
86+
coalesce(
87+
to_timestamp($"createTime", "yyyy-MM-dd HH:mm:ss.SSS"),
88+
to_timestamp($"createTime", "yyyy-MM-dd HH:mm:ss")
89+
)
90+
) * 1000).alias("createTime")
91+
)
8292

8393
val withdrawRDD = spark.read
8494
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
8595
.option("header", "true")
8696
.option("delimiter", "|")
87-
.load(s"${args.outputDir}/raw/withdraw/*.csv")
88-
.select($"fromId", $"toId", $"amount".cast("double"), $"createTime")
97+
.load(s"${args.outputDir}/snapshot/AccountWithdrawAccount.csv")
98+
.select(
99+
$"fromId",
100+
$"toId",
101+
$"amount".cast("double"),
102+
(unix_timestamp(
103+
coalesce(
104+
to_timestamp($"createTime", "yyyy-MM-dd HH:mm:ss.SSS"),
105+
to_timestamp($"createTime", "yyyy-MM-dd HH:mm:ss")
106+
)
107+
) * 1000).alias("createTime")
108+
)
89109

90110
val depositRDD = spark.read
91111
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
92112
.option("header", "true")
93113
.option("delimiter", "|")
94-
.load(s"${args.outputDir}/raw/deposit/*.csv")
114+
.load(s"${args.outputDir}/snapshot/LoanDepositAccount.csv")
95115
.select($"accountId", $"loanId")
96116

97117
val personInvestRDD = spark.read
98118
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
99119
.option("header", "true")
100120
.option("delimiter", "|")
101-
.load(s"${args.outputDir}/raw/personInvest/*.csv")
102-
.select($"investorId", $"companyId", $"createTime")
121+
.load(s"${args.outputDir}/snapshot/PersonInvestCompany.csv")
122+
.select(
123+
$"investorId",
124+
$"companyId",
125+
(unix_timestamp(
126+
coalesce(
127+
to_timestamp($"createTime", "yyyy-MM-dd HH:mm:ss.SSS"),
128+
to_timestamp($"createTime", "yyyy-MM-dd HH:mm:ss")
129+
)
130+
) * 1000).alias("createTime")
131+
)
103132

104133
val OwnRDD = spark.read
105134
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
106135
.option("header", "true")
107136
.option("delimiter", "|")
108-
.load(s"${args.outputDir}/raw/personOwnAccount/*.csv")
137+
.load(s"${args.outputDir}/snapshot/PersonOwnAccount.csv")
109138
.select($"personId", $"accountId")
110139

111140
val personGuaranteeRDD = spark.read
112141
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
113142
.option("header", "true")
114143
.option("delimiter", "|")
115-
.load(s"${args.outputDir}/raw/personGuarantee/*.csv")
116-
.select($"fromId", $"toId", $"createTime")
144+
.load(s"${args.outputDir}/snapshot/PersonGuaranteePerson.csv")
145+
.select(
146+
$"fromId",
147+
$"toId",
148+
(unix_timestamp(
149+
coalesce(
150+
to_timestamp($"createTime", "yyyy-MM-dd HH:mm:ss.SSS"),
151+
to_timestamp($"createTime", "yyyy-MM-dd HH:mm:ss")
152+
)
153+
) * 1000).alias("createTime")
154+
)
117155

118156
def transformItems(
119157
df: DataFrame,

0 commit comments

Comments
 (0)