Skip to content

Commit bf8b360

Browse files
Zhuyiqinzhu
Zhu
authored andcommitted
Add converter drop duplicates experiment script
1 parent 5a56390 commit bf8b360

File tree

1 file changed

+340
-0
lines changed

1 file changed

+340
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,340 @@
1+
import resource
2+
import numpy as np
3+
import pyarrow as pa
4+
import pyarrow.compute as pc
5+
import time
6+
import hashlib
7+
import secrets
8+
9+
# This benchmark script is meant for testing out performance results based on different combinations of Pyarrow.compute functions for converter.
10+
# Main performance factors are: latency and memory consumption.
11+
12+
13+
def generate_random_str_pk_column_sha1(number_of_records):
14+
sha1_hashes = []
15+
for _ in range(number_of_records):
16+
# Generate a random string using secrets module for security
17+
random_string = secrets.token_urlsafe(10)
18+
19+
# Calculate SHA1 hash
20+
sha1_hash = hashlib.sha1(random_string.encode("utf-8")).hexdigest()
21+
sha1_hashes.append(sha1_hash)
22+
return pa.array(sha1_hashes)
23+
24+
25+
# Example 1:
26+
# 1. Invert used memory is negligible
27+
# 2. Group_by + aggregate used memory is subjective to duplication rate, less duplication, more replicate of the group_by columns.
28+
# 3. is_in is zero-copy, memory used also is subjective to duplication rate, worst case copy of sort column
29+
def basic_pyarrow_compute_function_used_performance_demo():
30+
# Creating table
31+
pk_column = np.random.randint(0, 100000, size=1000000)
32+
data_table_1 = pa.table({"pk_hash": pk_column})
33+
num_rows_1 = data_table_1.num_rows
34+
row_numbers_1 = pa.array(np.arange(num_rows_1, dtype=np.int64))
35+
data_table_1 = data_table_1.append_column("record_idx", row_numbers_1)
36+
memory_used_after_creating_table = resource.getrusage(
37+
resource.RUSAGE_SELF
38+
).ru_maxrss
39+
print(f"memory_used_after_creating_table:{memory_used_after_creating_table}")
40+
41+
# Group_by + aggregate on unique sort column to drop duplicates within same pyarrow table
42+
data_table_1_indices = data_table_1.group_by(
43+
"pk_hash", use_threads=False
44+
).aggregate([("record_idx", "max")])
45+
memory_used_after_group_by = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
46+
memory_used_by_group_by = (
47+
memory_used_after_group_by - memory_used_after_creating_table
48+
)
49+
print(f"memory_used_by_group_by:{memory_used_by_group_by}")
50+
51+
# is_in to get indices to keep
52+
mask_table_1 = pc.is_in(
53+
data_table_1["record_idx"],
54+
value_set=data_table_1_indices["record_idx_max"],
55+
)
56+
memory_used_after_is_in = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
57+
memory_used_by_is_in = memory_used_after_is_in - memory_used_after_group_by
58+
print(f"memory_used_by_is_in:{memory_used_by_is_in}")
59+
60+
# invert to get indices to delete
61+
mask_table_1_invert = pc.invert(mask_table_1)
62+
memory_used_after_invert = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
63+
memory_used_by_invert = memory_used_after_invert - memory_used_after_is_in
64+
print(f"memory_used_by_invert :{memory_used_by_invert}")
65+
66+
# filter to get the actual table
67+
table_to_delete_1 = data_table_1.filter(mask_table_1_invert)
68+
memory_used_after_filter = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
69+
memory_used_by_filter = memory_used_after_filter - memory_used_after_invert
70+
print(f"memory_used_by_filter:{memory_used_by_filter}")
71+
res = table_to_delete_1
72+
return res
73+
74+
75+
# Approach 1A: deterministic record to keep, extendable to support custom sort keys
76+
# Drop duplicates within each file, then global drop duplicates.
77+
# Pros: a.With high duplication rate table, global drop duplicates consume less memory
78+
# b.Reocrd index column appended is a required field by position delete file, so not using "additional" memory for pos column
79+
# Assumption: when concat table for final global drop duplicates, maintain ascending file_sequence_number: pa.concat_table([table_1,table_2, ...,table_N])
80+
def group_by_each_file_then_global_drop_duplicates_using_max_record_position():
81+
82+
pk_column = np.random.randint(0, 100, size=10000)
83+
84+
data_table_1 = pa.table({"pk_hash": pk_column})
85+
data_table_2 = pa.table({"pk_hash": pk_column})
86+
num_rows_1 = data_table_1.num_rows
87+
row_numbers_1 = pa.array(np.arange(num_rows_1, dtype=np.int64))
88+
data_table_1 = data_table_1.append_column("record_idx", row_numbers_1)
89+
num_rows_2 = data_table_2.num_rows
90+
row_numbers_2 = pa.array(np.arange(num_rows_2, dtype=np.int64))
91+
data_table_2 = data_table_2.append_column("record_idx", row_numbers_2)
92+
93+
start = time.monotonic()
94+
data_table_1_indices = data_table_1.group_by(
95+
"pk_hash", use_threads=False
96+
).aggregate([("record_idx", "max")])
97+
data_table_2_indices = data_table_2.group_by(
98+
"pk_hash", use_threads=False
99+
).aggregate([("record_idx", "max")])
100+
101+
# Drop duplicates within same file
102+
mask_table_1 = pc.is_in(
103+
data_table_1["record_idx"],
104+
value_set=data_table_1_indices["record_idx_max"],
105+
)
106+
mask_table_1_invert = pc.invert(mask_table_1)
107+
table_to_delete_1 = data_table_1.filter(mask_table_1_invert)
108+
table_to_keep_1 = data_table_1.filter(mask_table_1)
109+
110+
mask_table_2 = pc.is_in(
111+
data_table_2["record_idx"],
112+
value_set=data_table_2_indices["record_idx_max"],
113+
)
114+
mask_table_2_invert = pc.invert(mask_table_2)
115+
table_to_delete_2 = data_table_2.filter(mask_table_2_invert)
116+
table_to_keep_2 = data_table_2.filter(mask_table_2)
117+
118+
# Global drop duplicates with
119+
final_data_table = pa.concat_tables([table_to_keep_1, table_to_keep_2])
120+
num_rows = final_data_table.num_rows
121+
row_numbers = pa.array(np.arange(num_rows, dtype=np.int64))
122+
final_data_table = final_data_table.append_column("record_idx_final", row_numbers)
123+
final_data_table_indices = final_data_table.group_by(
124+
"pk_hash", use_threads=False
125+
).aggregate([("record_idx_final", "max")])
126+
end = time.monotonic()
127+
print(f"Time taken to perform two group_by:{end - start}")
128+
final_data_table_to_delete = final_data_table.filter(
129+
pc.invert(
130+
pc.is_in(
131+
final_data_table["record_idx_final"],
132+
value_set=final_data_table_indices["record_idx_final_max"],
133+
)
134+
)
135+
)
136+
final_data_table_to_delete = final_data_table_to_delete.drop(["record_idx_final"])
137+
result = pa.concat_tables(
138+
[final_data_table_to_delete, table_to_delete_1, table_to_delete_2]
139+
)
140+
print(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
141+
return result
142+
143+
144+
# Approach 1B: deterministic record to keep, extendable to support custom sort keys
145+
# Slight difference than 1A in using "last" record instead of "max" record in sort column, shows reduction in latency.
146+
def group_by_each_file_then_global_drop_duplicates_using_last_record_position():
147+
print(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
148+
149+
pk_column = np.random.randint(0, 100, size=10000)
150+
151+
data_table_1 = pa.table({"pk_hash": pk_column})
152+
data_table_2 = pa.table({"pk_hash": pk_column})
153+
num_rows_1 = data_table_1.num_rows
154+
row_numbers_1 = pa.array(np.arange(num_rows_1, dtype=np.int64))
155+
data_table_1 = data_table_1.append_column("record_idx", row_numbers_1)
156+
num_rows_2 = data_table_2.num_rows
157+
row_numbers_2 = pa.array(np.arange(num_rows_2, dtype=np.int64))
158+
data_table_2 = data_table_2.append_column("record_idx", row_numbers_2)
159+
160+
print(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
161+
162+
start = time.monotonic()
163+
data_table_1_indices = data_table_1.group_by(
164+
"pk_hash", use_threads=False
165+
).aggregate([("record_idx", "last")])
166+
data_table_2_indices = data_table_2.group_by(
167+
"pk_hash", use_threads=False
168+
).aggregate([("record_idx", "last")])
169+
170+
mask_table_1 = pc.is_in(
171+
data_table_1["record_idx"],
172+
value_set=data_table_1_indices["record_idx_last"],
173+
)
174+
175+
mask_table_1_invert = pc.invert(mask_table_1)
176+
table_to_delete_1 = data_table_1.filter(mask_table_1_invert)
177+
table_to_keep_1 = data_table_1.filter(mask_table_1)
178+
179+
mask_table_2 = pc.is_in(
180+
data_table_2["record_idx"],
181+
value_set=data_table_2_indices["record_idx_last"],
182+
)
183+
mask_table_2_invert = pc.invert(mask_table_2)
184+
table_to_delete_2 = data_table_2.filter(mask_table_2_invert)
185+
table_to_keep_2 = data_table_2.filter(mask_table_2)
186+
187+
final_data_table = pa.concat_tables([table_to_keep_1, table_to_keep_2])
188+
num_rows = final_data_table.num_rows
189+
row_numbers = pa.array(np.arange(num_rows, dtype=np.int64))
190+
final_data_table = final_data_table.append_column("record_idx_final", row_numbers)
191+
final_data_table_indices = final_data_table.group_by(
192+
"pk_hash", use_threads=False
193+
).aggregate([("record_idx_final", "last")])
194+
end = time.monotonic()
195+
print(f"Time taken to perform two group_by:{end - start}")
196+
final_data_table_to_delete = final_data_table.filter(
197+
pc.invert(
198+
pc.is_in(
199+
final_data_table["record_idx_final"],
200+
value_set=final_data_table_indices["record_idx_final_last"],
201+
)
202+
)
203+
)
204+
final_data_table_to_delete = final_data_table_to_delete.drop(["record_idx_final"])
205+
result = pa.concat_tables(
206+
[final_data_table_to_delete, table_to_delete_1, table_to_delete_2]
207+
)
208+
print(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
209+
return result
210+
211+
212+
# Approach 2: deterministic record to keep, additional code changes may need to sort any custom sort keys
213+
# Global group_by based on two aggregation 1. file_sequence_number as primary 2. index column
214+
# Pros: With lower duplication rate table, compared with file-level drop duplicates then global drop duplicates in Approach 1, can save (N-1) group_by operation.
215+
# Pros: Table concat order doesn't have to be maintained as file_sequence_number will be appended as a column and used as criteria to aggregation.
216+
# Cons: Any custom sort keys other than record_idx may need verification of correctness before replacing the record_idx column.
217+
# Cons: higher risk of OOM with global dropping duplicates.
218+
def group_by_each_file_then_global_drop_duplicates_using_just_file_sequence_number():
219+
print(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
220+
221+
pk_column = np.random.randint(0, 10, size=10000)
222+
file_sequence_number_column_1 = np.repeat(1, (10000))
223+
file_sequence_number_column_2 = np.repeat(2, (10000))
224+
data_table_1 = pa.table(
225+
{"pk_hash": pk_column, "file_sequence_number": file_sequence_number_column_1}
226+
)
227+
data_table_2 = pa.table(
228+
{"pk_hash": pk_column, "file_sequence_number": file_sequence_number_column_2}
229+
)
230+
num_rows_1 = data_table_1.num_rows
231+
row_numbers_1 = pa.array(np.arange(num_rows_1, dtype=np.int64))
232+
data_table_1 = data_table_1.append_column("record_idx", row_numbers_1)
233+
num_rows_2 = data_table_2.num_rows
234+
row_numbers_2 = pa.array(np.arange(num_rows_2, dtype=np.int64))
235+
data_table_2 = data_table_2.append_column("record_idx", row_numbers_2)
236+
237+
start = time.monotonic()
238+
239+
final_data_table = pa.concat_tables([data_table_1, data_table_2])
240+
num_rows = final_data_table.num_rows
241+
row_numbers = pa.array(np.arange(num_rows, dtype=np.int64))
242+
final_data_table = final_data_table.append_column("record_idx_final", row_numbers)
243+
final_data_table_indices = final_data_table.group_by(
244+
"pk_hash", use_threads=False
245+
).aggregate([("file_sequence_number", "max"), ("record_idx_final", "max")])
246+
end = time.monotonic()
247+
print(f"Time taken to perform two group_by:{end - start}")
248+
final_data_table_to_delete = final_data_table.filter(
249+
pc.is_in(
250+
final_data_table["record_idx_final"],
251+
value_set=final_data_table_indices["record_idx_final_max"],
252+
)
253+
)
254+
final_data_table_to_delete = final_data_table_to_delete.drop(["record_idx_final"])
255+
result = final_data_table_to_delete
256+
257+
print(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
258+
return result
259+
260+
261+
# Approach 3: For large dataset: deterministic record to keep, extendable to support custom sort keys
262+
# To be able to process large dataset, we can repeat the process of:
263+
# From file sequence number N, to first file in descending order
264+
# 1. For file N-1, Aggregate by record_idx to drop duplicates within same file, res1 = records to delete because of duplicates
265+
# 2. use Pyarrow is_in to delete in File N-1 for records have same primary key as records in File N, res2 = records to delete because of same primary key
266+
# 3. delete for File N-1 = res1 + res2
267+
# 4. Append to primary keys the distinct keys appear in file N-1 and use this growing union of primary keys to delete for files N-2, N-3...1.
268+
def multiple_is_in_for_large_dataset():
269+
print(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
270+
271+
pk_column = np.random.randint(0, 10, size=10000)
272+
file_sequence_number_column_1 = np.repeat(1, (10000))
273+
file_sequence_number_column_2 = np.repeat(2, (10000))
274+
file_sequence_number_column_3 = np.repeat(3, (10000))
275+
data_table_1 = pa.table(
276+
{"pk_hash": pk_column, "file_sequence_number": file_sequence_number_column_1}
277+
)
278+
data_table_2 = pa.table(
279+
{"pk_hash": pk_column, "file_sequence_number": file_sequence_number_column_2}
280+
)
281+
data_table_3 = pa.table(
282+
{"pk_hash": pk_column, "file_sequence_number": file_sequence_number_column_3}
283+
)
284+
num_rows_1 = data_table_1.num_rows
285+
row_numbers_1 = pa.array(np.arange(num_rows_1, dtype=np.int64))
286+
data_table_1 = data_table_1.append_column("record_idx", row_numbers_1)
287+
num_rows_2 = data_table_2.num_rows
288+
row_numbers_2 = pa.array(np.arange(num_rows_2, dtype=np.int64))
289+
data_table_2 = data_table_2.append_column("record_idx", row_numbers_2)
290+
num_rows_3 = data_table_3.num_rows
291+
row_numbers_3 = pa.array(np.arange(num_rows_3, dtype=np.int64))
292+
data_table_3 = data_table_3.append_column("record_idx", row_numbers_3)
293+
294+
print(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
295+
296+
# Drop duplicates in table 3 (table with largest file sequence number)
297+
data_table_3_indices = data_table_3.group_by(
298+
"pk_hash", use_threads=False
299+
).aggregate([("record_idx", "max")])
300+
301+
mask_table_3 = pc.is_in(
302+
data_table_1["record_idx"],
303+
value_set=data_table_3_indices["record_idx_max"],
304+
)
305+
mask_table_3_invert = pc.invert(mask_table_3)
306+
table_to_delete_3 = data_table_3.filter(mask_table_3_invert)
307+
table_to_keep_3 = data_table_3.filter(mask_table_3)
308+
309+
# drop duplicates in data table 2
310+
data_table_2_indices = data_table_2.group_by(
311+
"pk_hash", use_threads=False
312+
).aggregate([("record_idx", "max")])
313+
data_table_2_to_delete_for_duplicates = data_table_2.filter(
314+
pc.invert(
315+
pc.is_in(
316+
data_table_2["record_idx"],
317+
value_set=data_table_2_indices["record_idx_max"],
318+
)
319+
)
320+
)
321+
data_table_2 = data_table_2.filter(
322+
pc.is_in(
323+
data_table_2["record_idx"], value_set=data_table_2_indices["record_idx_max"]
324+
)
325+
)
326+
table_to_delete_2_indices = pc.is_in(
327+
data_table_2["pk_hash"],
328+
value_set=table_to_keep_3["pk_hash"],
329+
)
330+
new_indices_in_table_2 = pc.invert(table_to_delete_2_indices)
331+
table_to_delete_2 = data_table_2.filter(table_to_delete_2_indices)
332+
new_table_to_keep = data_table_2.filter(new_indices_in_table_2)
333+
primary_key_indices_to_keep_after_2 = pa.concat_tables(
334+
[new_table_to_keep, table_to_keep_3]
335+
)
336+
337+
result_for_file_2 = pa.concat_tables(
338+
[table_to_delete_2, data_table_2_to_delete_for_duplicates]
339+
)
340+
return table_to_delete_3, result_for_file_2, primary_key_indices_to_keep_after_2

0 commit comments

Comments
 (0)