Skip to content

Commit 5d60eb9

Browse files
committed
added black formatting
1 parent a364296 commit 5d60eb9

File tree

4 files changed

+541
-419
lines changed

4 files changed

+541
-419
lines changed

datasets/wikiconv-corpus/merge_parallel.py

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,21 @@
33
"""
44

55
import sys
6-
sys.path.insert(0, '/home/jonathan/research/Cornell-Conversational-Analysis-Toolkit')
6+
7+
sys.path.insert(0, "/home/jonathan/research/Cornell-Conversational-Analysis-Toolkit")
78
from convokit import Corpus
89
import os
910
import shutil
1011
from concurrent.futures import ProcessPoolExecutor, as_completed
1112
from functools import partial
1213

14+
1315
def main():
1416
data_directory_intermediate = "/kitchen/wikiconv-convokit-processing/store_test_merging/" # intermediate directory where the split Convokit files are kept
1517
data_directory_output = "/kitchen/wikiconv-convokit-processing/final/English/" # directory to output the merged Convokit files
1618
delete_intermediate_files = True # set to True to delete intermediate files after merging
1719
max_workers = 12 # number of threads/years to run in parallel
18-
20+
1921
print("Starting merge process...")
2022
print(f"Reading from: {data_directory_intermediate}")
2123
print(f"Writing to: {data_directory_output}")
@@ -24,9 +26,9 @@ def main():
2426
os.makedirs(data_directory_output, exist_ok=True)
2527

2628
merge_files(data_directory_output, data_directory_intermediate, max_workers)
27-
29+
2830
print("\nMerge completed successfully!")
29-
31+
3032
if delete_intermediate_files:
3133
print(f"Deleting intermediate files from {data_directory_intermediate}")
3234
shutil.rmtree(data_directory_intermediate)
@@ -36,24 +38,24 @@ def main():
3638
def merge_files(final_directory, input_directory, max_workers):
3739
# build full list
3840
input_subdirectory_paths = [x[0] for x in os.walk(input_directory)]
39-
41+
4042
# organize files by year
4143
files_by_year = {}
4244
for year_x in range(2006, 2021):
4345
year_str = str(year_x)
4446
files_by_year[year_str] = [path for path in input_subdirectory_paths if year_str in path]
45-
47+
4648
# years in parallel
4749
process_year_func = partial(process_single_year, final_directory=final_directory)
48-
50+
4951
with ProcessPoolExecutor(max_workers=max_workers) as executor:
5052
future_to_year = {}
5153
for year_x in range(2007, 2019):
5254
year_str = str(year_x)
5355
if len(files_by_year[year_str]) > 0:
5456
future = executor.submit(process_year_func, year_str, files_by_year[year_str])
5557
future_to_year[future] = year_str
56-
58+
5759
# process results as they complete
5860
for future in as_completed(future_to_year):
5961
year = future_to_year[future]
@@ -68,27 +70,27 @@ def process_single_year(year, paths_lst, final_directory):
6870
"""process a single year"""
6971
if len(paths_lst) == 0:
7072
return f"Skipped - no files"
71-
73+
7274
print(f"\n[Year {year}] Processing {len(paths_lst)} corpus file(s)")
73-
75+
7476
if len(paths_lst) == 1:
7577
print(f"[Year {year}] Loading single corpus")
7678
corpus_1 = Corpus(filename=paths_lst[0])
77-
output_path = final_directory + 'wikiconv_corpus_merged_' + year
79+
output_path = final_directory + "wikiconv_corpus_merged_" + year
7880
corpus_1.dump(output_path)
7981
return f"Saved single corpus"
80-
82+
8183
else:
8284
print(f"[Year {year}] Merging {len(paths_lst)} corpus files")
83-
85+
8486
# load all corpora
8587
corpora = []
8688
for idx, path in enumerate(paths_lst, start=1):
8789
print(f"[Year {year}] Loading corpus {idx}/{len(paths_lst)}")
8890
corpora.append(Corpus(filename=path))
89-
91+
9092
print(f"[Year {year}] Starting merge of {len(corpora)} corpora")
91-
93+
9294
# merge in a balanced binary tree pattern for increased efficiency
9395
round_num = 1
9496
while len(corpora) > 1:
@@ -106,12 +108,12 @@ def process_single_year(year, paths_lst, final_directory):
106108
round_num += 1
107109

108110
merged_corpus = corpora[0]
109-
110-
output_path = final_directory + 'wikiconv_corpus_merged_' + str(year)
111+
112+
output_path = final_directory + "wikiconv_corpus_merged_" + str(year)
111113
print(f"[Year {year}] Saving merged corpus")
112114
merged_corpus.dump(output_path)
113115
return f"Saved merged corpus ({len(paths_lst)} files merged)"
114116

115117

116-
if __name__ == '__main__':
117-
main()
118+
if __name__ == "__main__":
119+
main()

datasets/wikiconv-corpus/raw_data.py

Lines changed: 50 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -14,92 +14,93 @@
1414

1515
# Global lock for thread-safe printing and counter for matches
1616
print_lock = Lock()
17-
matches_found = {'count': 0, 'files': []}
17+
matches_found = {"count": 0, "files": []}
18+
1819

1920
def get_file_list(figshare_article_id):
2021
"""Fetch the list of ALL files from Figshare API (handles pagination)."""
2122
base_url = f"https://api.figshare.com/v2/articles/{figshare_article_id}/files"
2223
all_files = []
2324
page = 1
2425
page_size = 100 # Max allowed by Figshare API
25-
26+
2627
try:
2728
while True:
28-
params = {
29-
'page': page,
30-
'page_size': page_size
31-
}
29+
params = {"page": page, "page_size": page_size}
3230
response = requests.get(base_url, params=params)
3331
response.raise_for_status()
3432
files = response.json()
35-
33+
3634
if not files:
3735
break
38-
36+
3937
all_files.extend(files)
4038
print(f" Fetched page {page}: {len(files)} files (total so far: {len(all_files)})")
41-
39+
4240
if len(files) < page_size:
4341
# Last page
4442
break
45-
43+
4644
page += 1
47-
45+
4846
return all_files
4947
except requests.exceptions.RequestException as e:
5048
print(f"Error fetching file list: {e}")
5149
sys.exit(1)
5250

51+
5352
def download_and_check_file(file_info, search_string, download_dir, idx, total):
5453
"""
5554
Download a file, check for search string while streaming, and handle accordingly.
5655
Returns (found, file_name) tuple.
5756
"""
58-
file_name = file_info['name']
59-
file_url = file_info['download_url']
57+
file_name = file_info["name"]
58+
file_url = file_info["download_url"]
6059
file_path = os.path.join(download_dir, file_name)
61-
60+
6261
with print_lock:
63-
print(f"[{idx}/{total}] Downloading: {file_name} ({file_info['size'] / (1024*1024):.2f} MB)...")
64-
62+
print(
63+
f"[{idx}/{total}] Downloading: {file_name} ({file_info['size'] / (1024*1024):.2f} MB)..."
64+
)
65+
6566
try:
6667
# Download file with streaming
6768
response = requests.get(file_url, stream=True, timeout=60)
6869
response.raise_for_status()
69-
70+
7071
# Search while downloading (more efficient for large files)
7172
found = False
7273
chunk_size = 8192
73-
buffer = b''
74-
search_bytes = search_string.encode('utf-8')
75-
76-
with open(file_path, 'wb') as f:
74+
buffer = b""
75+
search_bytes = search_string.encode("utf-8")
76+
77+
with open(file_path, "wb") as f:
7778
for chunk in response.iter_content(chunk_size=chunk_size):
7879
f.write(chunk)
79-
80+
8081
# Search in overlapping buffer to catch strings across chunk boundaries
8182
buffer += chunk
8283
if search_bytes in buffer:
8384
found = True
8485
# Continue downloading but we know we found it
85-
86+
8687
# Keep last part of buffer for overlap check
8788
if len(buffer) > len(search_bytes) * 2:
88-
buffer = buffer[-(len(search_bytes) * 2):]
89-
89+
buffer = buffer[-(len(search_bytes) * 2) :]
90+
9091
if found:
9192
with print_lock:
9293
print(f" ✓ FOUND '{search_string}' in {file_name}!")
9394
print(f" File saved at: {file_path}")
94-
matches_found['count'] += 1
95-
matches_found['files'].append(file_name)
95+
matches_found["count"] += 1
96+
matches_found["files"].append(file_name)
9697
return (True, file_name)
9798
else:
9899
with print_lock:
99100
print(f" String not found in {file_name}. Deleting...")
100101
os.remove(file_path)
101102
return (False, None)
102-
103+
103104
except requests.exceptions.RequestException as e:
104105
with print_lock:
105106
print(f" Error downloading {file_name}: {e}")
@@ -113,43 +114,44 @@ def download_and_check_file(file_info, search_string, download_dir, idx, total):
113114
os.remove(file_path)
114115
return (False, None)
115116

117+
116118
def main():
117119
FIGSHARE_ARTICLE_ID = "7376003" # english dataset, change for other languages
118120
SEARCH_STRING = "2052702.7345.7345"
119121
DOWNLOAD_DIR = "./wikiconv_downloads"
120122
MAX_WORKERS = 10 # Adjust based on your server's bandwidth and CPU
121-
123+
122124
print("=" * 60)
123125
print("WikiConv File Finder (Parallel - Keep All Matches)")
124126
print("=" * 60)
125127
print(f"Search string: '{SEARCH_STRING}'")
126128
print(f"Download directory: {DOWNLOAD_DIR}")
127129
print(f"Parallel workers: {MAX_WORKERS}")
128130
print()
129-
131+
130132
# Create download directory
131133
Path(DOWNLOAD_DIR).mkdir(parents=True, exist_ok=True)
132-
134+
133135
# Get file list
134136
print("Fetching file list from Figshare...")
135137
files = get_file_list(FIGSHARE_ARTICLE_ID)
136-
138+
137139
if not files:
138140
print("No files found!")
139141
sys.exit(1)
140-
142+
141143
print(f"Found {len(files)} files.")
142144
print()
143-
145+
144146
start_time = time.time()
145-
147+
146148
# Process files in parallel
147149
START_INDEX = 1 # 1-based index, meaning skip first 88
148150
if START_INDEX > len(files):
149151
print(f"Start index ({START_INDEX}) is beyond available files ({len(files)}). Exiting.")
150152
sys.exit(1)
151153

152-
files_to_process = files[START_INDEX - 1:] # slice from the 89th file onward
154+
files_to_process = files[START_INDEX - 1 :] # slice from the 89th file onward
153155
total_files = len(files_to_process)
154156
print(f"Processing files {START_INDEX}{len(files)} ({total_files} total)...\n")
155157

@@ -162,35 +164,38 @@ def main():
162164
file_info,
163165
SEARCH_STRING,
164166
DOWNLOAD_DIR,
165-
idx + START_INDEX - 1,
166-
len(files)
167+
idx + START_INDEX - 1,
168+
len(files),
167169
): file_info
168170
for idx, file_info in enumerate(files_to_process, 1)
169171
}
170-
172+
171173
# process completed tasks
172174
for future in as_completed(future_to_file):
173175
found, file_name = future.result()
174176
completed += 1
175-
177+
176178
if completed % 50 == 0:
177179
with print_lock:
178-
print(f"\n--- Progress: {completed}/{len(files)} files processed, {matches_found['count']} matches found ---\n")
179-
180+
print(
181+
f"\n--- Progress: {completed}/{len(files)} files processed, {matches_found['count']} matches found ---\n"
182+
)
183+
180184
elapsed = time.time() - start_time
181185
print()
182186
print("=" * 60)
183187
print(f"COMPLETED: Processed all {len(files)} files.")
184188
print(f"Matches found: {matches_found['count']}")
185-
if matches_found['files']:
189+
if matches_found["files"]:
186190
print(f"\nFiles containing '{SEARCH_STRING}':")
187-
for match_file in matches_found['files']:
191+
for match_file in matches_found["files"]:
188192
print(f" - {match_file}")
189193
else:
190194
print(f"\nSearch string '{SEARCH_STRING}' was NOT found in any file.")
191195
print(f"\nTime elapsed: {elapsed:.2f} seconds")
192196
print(f"Average: {elapsed/len(files):.2f} seconds per file")
193197
print("=" * 60)
194198

199+
195200
if __name__ == "__main__":
196-
main()
201+
main()

0 commit comments

Comments
 (0)