Skip to content

Commit d54347c

Browse files
author
Zhu
committed
Add converter drop duplicates experiment script
1 parent 5a56390 commit d54347c

File tree

1 file changed

+287
-0
lines changed

1 file changed

+287
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
import resource
2+
import numpy as np
3+
import pyarrow as pa
4+
import pyarrow.compute as pc
5+
import time
6+
import hashlib
7+
import secrets
8+
9+
# This benchmark script is meant for testing out performance results based on different combinations of Pyarrow.compute functions for converter.
10+
# Main performance factors are: latency and memory consumption.
11+
12+
def generate_random_str_pk_column_sha1(number_of_records):
13+
sha1_hashes = []
14+
for _ in range(number_of_records):
15+
# Generate a random string using secrets module for security
16+
random_string = secrets.token_urlsafe(10)
17+
18+
# Calculate SHA1 hash
19+
sha1_hash = hashlib.sha1(random_string.encode('utf-8')).hexdigest()
20+
sha1_hashes.append(sha1_hash)
21+
return pa.array(sha1_hashes)
22+
23+
24+
# Example 1:
25+
# 1. Invert used memory is negligible
26+
# 2. Group_by + aggregate used memory is subjective to duplication rate, less duplication, more replicate of the group_by columns.
27+
# 3. is_in is zero-copy, memory used also is subjective to duplication rate, worst case copy of sort column
28+
def basic_pyarrow_compute_function_used_performance_demo():
29+
memory_used_at_start = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
30+
31+
# Creating table
32+
pk_column = np.random.randint(0, 100000, size=1000000)
33+
data_table_1 = pa.table({'pk_hash': pk_column})
34+
num_rows_1 = data_table_1.num_rows
35+
row_numbers_1 = pa.array(np.arange(num_rows_1, dtype=np.int64))
36+
data_table_1 = data_table_1.append_column("record_idx", row_numbers_1)
37+
memory_used_after_creating_table = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
38+
print(f"memory_used_after_creating_table:{memory_used_after_creating_table}")
39+
40+
# Group_by + aggregate on unique sort column to drop duplicates within same pyarrow table
41+
data_table_1_indices = data_table_1.group_by("pk_hash", use_threads=False).aggregate([("record_idx", "max")])
42+
memory_used_after_group_by = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
43+
memory_used_by_group_by = memory_used_after_group_by - memory_used_after_creating_table
44+
print(f"memory_used_by_group_by:{memory_used_by_group_by}")
45+
46+
# is_in to get indices to keep
47+
mask_table_1 = pc.is_in(
48+
data_table_1["record_idx"],
49+
value_set=data_table_1_indices["record_idx_max"],
50+
)
51+
memory_used_after_is_in = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
52+
memory_used_by_is_in = memory_used_after_is_in - memory_used_after_group_by
53+
print(f"memory_used_by_is_in:{memory_used_by_is_in}")
54+
55+
# invert to get indices to delete
56+
mask_table_1_invert = pc.invert(mask_table_1)
57+
memory_used_after_invert = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
58+
memory_used_by_invert = memory_used_after_invert - memory_used_after_is_in
59+
print(f"memory_used_by_invert :{memory_used_by_invert}")
60+
61+
# filter to get the actual table
62+
table_to_delete_1 = data_table_1.filter(mask_table_1_invert)
63+
memory_used_after_filter = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
64+
memory_used_by_filter = memory_used_after_filter - memory_used_after_invert
65+
print(f"memory_used_by_filter:{memory_used_by_filter}")
66+
67+
68+
# Approach 1A: deterministic record to keep, extendable to support custom sort keys
69+
# Drop duplicates within each file, then global drop duplicates.
70+
# Pros: a.With high duplication rate table, global drop duplicates consume less memory
71+
# b.Reocrd index column appended is a required field by position delete file, so not using "additional" memory for pos column
72+
# Assumption: when concat table for final global drop duplicates, maintain ascending file_sequence_number: pa.concat_table([table_1,table_2, ...,table_N])
73+
def group_by_each_file_then_global_drop_duplicates_using_max_record_position():
74+
75+
pk_column = np.random.randint(0, 100, size=10000)
76+
77+
data_table_1 = pa.table({'pk_hash': pk_column})
78+
data_table_2 = pa.table({'pk_hash': pk_column})
79+
num_rows_1 = data_table_1.num_rows
80+
row_numbers_1 = pa.array(np.arange(num_rows_1, dtype=np.int64))
81+
data_table_1 = data_table_1.append_column("record_idx", row_numbers_1)
82+
num_rows_2 = data_table_2.num_rows
83+
row_numbers_2 = pa.array(np.arange(num_rows_2, dtype=np.int64))
84+
data_table_2 = data_table_2.append_column("record_idx", row_numbers_2)
85+
86+
87+
start = time.monotonic()
88+
data_table_1_indices = data_table_1.group_by("pk_hash", use_threads=False).aggregate([("record_idx", "max")])
89+
data_table_2_indices = data_table_2.group_by("pk_hash", use_threads=False).aggregate([("record_idx", "max")])
90+
91+
# Drop duplicates within same file
92+
mask_table_1 = pc.is_in(
93+
data_table_1["record_idx"],
94+
value_set=data_table_1_indices["record_idx_max"],
95+
)
96+
mask_table_1_invert = pc.invert(mask_table_1)
97+
table_to_delete_1 = data_table_1.filter(mask_table_1_invert)
98+
table_to_keep_1 = data_table_1.filter(mask_table_1)
99+
100+
mask_table_2 = pc.is_in(
101+
data_table_2["record_idx"],
102+
value_set=data_table_2_indices["record_idx_max"],
103+
)
104+
mask_table_2_invert = pc.invert(mask_table_2)
105+
table_to_delete_2 = data_table_2.filter(mask_table_2_invert)
106+
table_to_keep_2 = data_table_2.filter(mask_table_2)
107+
108+
# Global drop duplicates with
109+
final_data_table = pa.concat_tables([table_to_keep_1, table_to_keep_2])
110+
num_rows = final_data_table.num_rows
111+
row_numbers = pa.array(np.arange(num_rows, dtype=np.int64))
112+
final_data_table = final_data_table.append_column("record_idx_final", row_numbers)
113+
final_data_table_indices = final_data_table.group_by("pk_hash", use_threads=False).aggregate([("record_idx_final", "max")])
114+
end = time.monotonic()
115+
print(f"Time taken to perform two group_by:{end - start}")
116+
final_data_table_to_delete = final_data_table.filter(
117+
pc.invert(
118+
pc.is_in(
119+
final_data_table["record_idx_final"],
120+
value_set=final_data_table_indices["record_idx_final_max"],
121+
)
122+
))
123+
final_data_table_to_delete = final_data_table_to_delete.drop(["record_idx_final"])
124+
result = pa.concat_tables([final_data_table_to_delete, table_to_delete_1, table_to_delete_2])
125+
print(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
126+
return result
127+
128+
# Approach 1B: deterministic record to keep, extendable to support custom sort keys
129+
# Slight difference than 1A in using "last" record instead of "max" record in sort column, shows reduction in latency.
130+
def group_by_each_file_then_global_drop_duplicates_using_last_record_position():
131+
print(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
132+
133+
pk_column = np.random.randint(0, 100, size=10000)
134+
135+
data_table_1 = pa.table({'pk_hash': pk_column})
136+
data_table_2 = pa.table({'pk_hash': pk_column})
137+
num_rows_1 = data_table_1.num_rows
138+
row_numbers_1 = pa.array(np.arange(num_rows_1, dtype=np.int64))
139+
data_table_1 = data_table_1.append_column("record_idx", row_numbers_1)
140+
num_rows_2 = data_table_2.num_rows
141+
row_numbers_2 = pa.array(np.arange(num_rows_2, dtype=np.int64))
142+
data_table_2 = data_table_2.append_column("record_idx", row_numbers_2)
143+
144+
print(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
145+
146+
start = time.monotonic()
147+
data_table_1_indices = data_table_1.group_by("pk_hash", use_threads=False).aggregate([("record_idx", "last")])
148+
data_table_2_indices = data_table_2.group_by("pk_hash", use_threads=False).aggregate([("record_idx", "last")])
149+
150+
mask_table_1 = pc.is_in(
151+
data_table_1["record_idx"],
152+
value_set=data_table_1_indices["record_idx_last"],
153+
)
154+
155+
mask_table_1_invert = pc.invert(mask_table_1)
156+
table_to_delete_1 = data_table_1.filter(mask_table_1_invert)
157+
table_to_keep_1 = data_table_1.filter(mask_table_1)
158+
159+
mask_table_2 = pc.is_in(
160+
data_table_2["record_idx"],
161+
value_set=data_table_2_indices["record_idx_last"],
162+
)
163+
mask_table_2_invert = pc.invert(mask_table_2)
164+
table_to_delete_2 = data_table_2.filter(mask_table_2_invert)
165+
table_to_keep_2 = data_table_2.filter(mask_table_2)
166+
167+
final_data_table = pa.concat_tables([table_to_keep_1, table_to_keep_2])
168+
num_rows = final_data_table.num_rows
169+
row_numbers = pa.array(np.arange(num_rows, dtype=np.int64))
170+
final_data_table = final_data_table.append_column("record_idx_final", row_numbers)
171+
final_data_table_indices = final_data_table.group_by("pk_hash", use_threads=False).aggregate([("record_idx_final", "last")])
172+
end = time.monotonic()
173+
print(f"Time taken to perform two group_by:{end - start}")
174+
final_data_table_to_delete = final_data_table.filter(
175+
pc.invert(
176+
pc.is_in(
177+
final_data_table["record_idx_final"],
178+
value_set=final_data_table_indices["record_idx_final_last"],
179+
)
180+
))
181+
final_data_table_to_delete = final_data_table_to_delete.drop(["record_idx_final"])
182+
result = pa.concat_tables([final_data_table_to_delete, table_to_delete_1, table_to_delete_2])
183+
print(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
184+
return result
185+
186+
# Approach 2: deterministic record to keep, additional code changes may need to sort any custom sort keys
187+
# Global group_by based on two aggregation 1. file_sequence_number as primary 2. index column
188+
# Pros: With lower duplication rate table, compared with file-level drop duplicates then global drop duplicates in Approach 1, can save (N-1) group_by operation.
189+
# Pros: Table concat order doesn't have to be maintained as file_sequence_number will be appended as a column and used as criteria to aggregation.
190+
# Cons: Any custom sort keys other than record_idx may need verification of correctness before replacing the record_idx column.
191+
# Cons: higher risk of OOM with global dropping duplicates.
192+
def group_by_each_file_then_global_drop_duplicates_using_just_file_sequence_number():
193+
print(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
194+
195+
pk_column = np.random.randint(0, 10, size=10000)
196+
file_sequence_number_column_1 = np.repeat(1,(10000))
197+
file_sequence_number_column_2 = np.repeat(2, (10000))
198+
data_table_1 = pa.table({'pk_hash': pk_column, "file_sequence_number":file_sequence_number_column_1})
199+
data_table_2 = pa.table({'pk_hash': pk_column, "file_sequence_number":file_sequence_number_column_2})
200+
num_rows_1 = data_table_1.num_rows
201+
row_numbers_1 = pa.array(np.arange(num_rows_1, dtype=np.int64))
202+
data_table_1 = data_table_1.append_column("record_idx", row_numbers_1)
203+
num_rows_2 = data_table_2.num_rows
204+
row_numbers_2 = pa.array(np.arange(num_rows_2, dtype=np.int64))
205+
data_table_2 = data_table_2.append_column("record_idx", row_numbers_2)
206+
207+
start = time.monotonic()
208+
209+
final_data_table = pa.concat_tables([data_table_1, data_table_2])
210+
num_rows = final_data_table.num_rows
211+
row_numbers = pa.array(np.arange(num_rows, dtype=np.int64))
212+
final_data_table = final_data_table.append_column("record_idx_final", row_numbers)
213+
final_data_table_indices = final_data_table.group_by("pk_hash", use_threads=False).aggregate([("file_sequence_number", "max"), ("record_idx_final", "max")])
214+
end = time.monotonic()
215+
print(f"Time taken to perform two group_by:{end - start}")
216+
final_data_table_to_delete = final_data_table.filter(
217+
pc.is_in(
218+
final_data_table["record_idx_final"],
219+
value_set=final_data_table_indices["record_idx_final_max"],
220+
)
221+
)
222+
final_data_table_to_delete = final_data_table_to_delete.drop(["record_idx_final"])
223+
result = final_data_table_to_delete
224+
225+
print(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
226+
return result
227+
228+
# Approach 3: For large dataset: deterministic record to keep, extendable to support custom sort keys
229+
# To be able to process large dataset, we can repeat the process of:
230+
# From file sequence number N, to first file in descending order
231+
# 1. For file N-1, Aggregate by record_idx to drop duplicates within same file, res1 = records to delete because of duplicates
232+
# 2. use Pyarrow is_in to delete in File N-1 for records have same primary key as records in File N, res2 = records to delete because of same primary key
233+
# 3. delete for File N-1 = res1 + res2
234+
# 4. Append to primary keys the distinct keys appear in file N-1 and use this growing union of primary keys to delete for files N-2, N-3...1.
235+
def multiple_is_in_for_large_dataset():
236+
print(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
237+
238+
pk_column = np.random.randint(0, 10, size=10000)
239+
file_sequence_number_column_1 = np.repeat(1,(10000))
240+
file_sequence_number_column_2 = np.repeat(2, (10000))
241+
file_sequence_number_column_3 = np.repeat(3, (10000))
242+
data_table_1 = pa.table({'pk_hash': pk_column, "file_sequence_number":file_sequence_number_column_1})
243+
data_table_2 = pa.table({'pk_hash': pk_column, "file_sequence_number":file_sequence_number_column_2})
244+
data_table_3 = pa.table({'pk_hash': pk_column, "file_sequence_number": file_sequence_number_column_3})
245+
num_rows_1 = data_table_1.num_rows
246+
row_numbers_1 = pa.array(np.arange(num_rows_1, dtype=np.int64))
247+
data_table_1 = data_table_1.append_column("record_idx", row_numbers_1)
248+
num_rows_2 = data_table_2.num_rows
249+
row_numbers_2 = pa.array(np.arange(num_rows_2, dtype=np.int64))
250+
data_table_2 = data_table_2.append_column("record_idx", row_numbers_2)
251+
num_rows_3 = data_table_3.num_rows
252+
row_numbers_3 = pa.array(np.arange(num_rows_3, dtype=np.int64))
253+
data_table_3 = data_table_3.append_column("record_idx", row_numbers_3)
254+
255+
print(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
256+
257+
start = time.monotonic()
258+
259+
# Drop duplicates in table 3 (table with largest file sequence number)
260+
data_table_3_indices = data_table_3.group_by("pk_hash", use_threads=False).aggregate([("record_idx", "max")])
261+
262+
mask_table_3 = pc.is_in(
263+
data_table_1["record_idx"],
264+
value_set=data_table_3_indices["record_idx_max"],
265+
)
266+
mask_table_3_invert = pc.invert(mask_table_3)
267+
table_to_delete_3 = data_table_3.filter(mask_table_3_invert)
268+
table_to_keep_3 = data_table_3.filter(mask_table_3)
269+
270+
271+
# drop duplicates in data table 2
272+
data_table_2_indices = data_table_2.group_by("pk_hash", use_threads=False).aggregate([("record_idx", "max")])
273+
data_table_2_to_delete_for_duplicates = data_table_2.filter(
274+
pc.invert(
275+
pc.is_in(data_table_2["record_idx"], value_set=data_table_2_indices["record_idx_max"]))
276+
)
277+
data_table_2 = data_table_2.filter(pc.is_in(data_table_2["record_idx"], value_set=data_table_2_indices["record_idx_max"]))
278+
table_to_delete_2_indices = pc.is_in(
279+
data_table_2["pk_hash"],
280+
value_set=table_to_keep_3["pk_hash"],
281+
)
282+
new_indices_in_table_2 = pc.invert(table_to_delete_2_indices)
283+
table_to_delete_2 = data_table_2.filter(table_to_delete_2_indices)
284+
new_table_to_keep = data_table_2.filter(new_indices_in_table_2)
285+
primary_key_indices_to_keep_after_2 = pa.concat_tables([new_table_to_keep, table_to_keep_3])
286+
287+
result_for_file_2 = pa.concat_tables([table_to_delete_2, data_table_2_to_delete_for_duplicates])

0 commit comments

Comments
 (0)