Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions datasets/wikiconv-corpus/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# WikiConv Language Dataset Processing

This directory contains the updated files used to process the English, German, Russian, Chinese, and Greek WikiConv datasets. These datasets were released with "WikiConv: A Corpus of the Complete Conversational History of a Large Online Collaborative Community", and can be found [here](https://figshare.com/projects/WikiConv_A_Corpus_of_the_Complete_Conversational_History_of_a_Large_Online_Collaborative_Community/57110).

The files have been updated to handle datasets of all 5 languages; the original files were tailored for the English dataset only. In addition, the original English dataset contained significants amount of data containing empty or trivial text and no modification metadata (which suggests that the data was modified, deleted, or restored). This data has essentially no substance, so wikiconv_conversion_with_merging_04_28_20.py has been updated to filter these utterances out. Finally, optimizations were made to improve runtime of processing given the size of the raw data.

## Files:
to_jsonlist_lang.py: Downloads raw data from the figshare and converts it into jsonlist format, deleting the raw files afterwards
wikiconv_conversion_with_merging_04_28_20.py: Updated code for converting jsonlist formatted files to convokit format. Includes the filter for utterances with empty text and no modifications.
merge_parallel: parallelizes the merging portion of wikiconv_conversion_with_merging_04_28_20.py since it takes too long to run for datasets of large size.


## Current status
The English dataset, which is by far the largest, has been reprocessed. The processing for the Greek dataset is currently running, and over the next month the others will be completed as well.
119 changes: 119 additions & 0 deletions datasets/wikiconv-corpus/merge_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""
Handles merging portion of conversion separately. Runs in parallel on each year to speed up processing.
"""

import sys

sys.path.insert(0, "/home/jonathan/research/Cornell-Conversational-Analysis-Toolkit")
from convokit import Corpus
import os
import shutil
from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import partial


def main():
data_directory_intermediate = "/kitchen/wikiconv-convokit-processing/store_test_merging/" # intermediate directory where the split Convokit files are kept
data_directory_output = "/kitchen/wikiconv-convokit-processing/final/English/" # directory to output the merged Convokit files
delete_intermediate_files = True # set to True to delete intermediate files after merging
max_workers = 12 # number of threads/years to run in parallel

print("Starting merge process...")
print(f"Reading from: {data_directory_intermediate}")
print(f"Writing to: {data_directory_output}")
print(f"Using {max_workers} parallel workers")

os.makedirs(data_directory_output, exist_ok=True)

merge_files(data_directory_output, data_directory_intermediate, max_workers)

print("\nMerge completed successfully!")

if delete_intermediate_files:
print(f"Deleting intermediate files from {data_directory_intermediate}")
shutil.rmtree(data_directory_intermediate)
print("Intermediate files deleted.")


def merge_files(final_directory, input_directory, max_workers):
# build full list
input_subdirectory_paths = [x[0] for x in os.walk(input_directory)]

# organize files by year
files_by_year = {}
for year_x in range(2006, 2021):
year_str = str(year_x)
files_by_year[year_str] = [path for path in input_subdirectory_paths if year_str in path]

# years in parallel
process_year_func = partial(process_single_year, final_directory=final_directory)

with ProcessPoolExecutor(max_workers=max_workers) as executor:
future_to_year = {}
for year_x in range(2007, 2019):
year_str = str(year_x)
if len(files_by_year[year_str]) > 0:
future = executor.submit(process_year_func, year_str, files_by_year[year_str])
future_to_year[future] = year_str

# process results as they complete
for future in as_completed(future_to_year):
year = future_to_year[future]
try:
result = future.result()
print(f"\n✓ Completed year {year}: {result}")
except Exception as e:
print(f"\n✗ Error processing year {year}: {e}")


def process_single_year(year, paths_lst, final_directory):
"""process a single year"""
if len(paths_lst) == 0:
return f"Skipped - no files"

print(f"\n[Year {year}] Processing {len(paths_lst)} corpus file(s)")

if len(paths_lst) == 1:
print(f"[Year {year}] Loading single corpus")
corpus_1 = Corpus(filename=paths_lst[0])
output_path = final_directory + "wikiconv_corpus_merged_" + year
corpus_1.dump(output_path)
return f"Saved single corpus"

else:
print(f"[Year {year}] Merging {len(paths_lst)} corpus files")

# load all corpora
corpora = []
for idx, path in enumerate(paths_lst, start=1):
print(f"[Year {year}] Loading corpus {idx}/{len(paths_lst)}")
corpora.append(Corpus(filename=path))

print(f"[Year {year}] Starting merge of {len(corpora)} corpora")

# merge in a balanced binary tree pattern for increased efficiency
round_num = 1
while len(corpora) > 1:
next_round = []
pairs = (len(corpora) + 1) // 2
for i in range(0, len(corpora), 2):
if i + 1 < len(corpora):
print(f"[Year {year}] Round {round_num}: Merging pair {i//2 + 1}/{pairs}")
merged = Corpus.merge(corpora[i], corpora[i + 1])
next_round.append(merged)
else:
# Odd one out, carry forward
next_round.append(corpora[i])
corpora = next_round
round_num += 1

merged_corpus = corpora[0]

output_path = final_directory + "wikiconv_corpus_merged_" + str(year)
print(f"[Year {year}] Saving merged corpus")
merged_corpus.dump(output_path)
return f"Saved merged corpus ({len(paths_lst)} files merged)"


if __name__ == "__main__":
main()
201 changes: 201 additions & 0 deletions datasets/wikiconv-corpus/raw_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
#!/usr/bin/env python3
"""
Downloads raw wikiconv datasets, and can also search for specific strings.
Parallelized for faster processing.
"""

import requests
import os
import sys
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
import time

# Global lock for thread-safe printing and counter for matches
print_lock = Lock()
matches_found = {"count": 0, "files": []}


def get_file_list(figshare_article_id):
"""Fetch the list of ALL files from Figshare API (handles pagination)."""
base_url = f"https://api.figshare.com/v2/articles/{figshare_article_id}/files"
all_files = []
page = 1
page_size = 100 # Max allowed by Figshare API

try:
while True:
params = {"page": page, "page_size": page_size}
response = requests.get(base_url, params=params)
response.raise_for_status()
files = response.json()

if not files:
break

all_files.extend(files)
print(f" Fetched page {page}: {len(files)} files (total so far: {len(all_files)})")

if len(files) < page_size:
# Last page
break

page += 1

return all_files
except requests.exceptions.RequestException as e:
print(f"Error fetching file list: {e}")
sys.exit(1)


def download_and_check_file(file_info, search_string, download_dir, idx, total):
"""
Download a file, check for search string while streaming, and handle accordingly.
Returns (found, file_name) tuple.
"""
file_name = file_info["name"]
file_url = file_info["download_url"]
file_path = os.path.join(download_dir, file_name)

with print_lock:
print(
f"[{idx}/{total}] Downloading: {file_name} ({file_info['size'] / (1024*1024):.2f} MB)..."
)

try:
# Download file with streaming
response = requests.get(file_url, stream=True, timeout=60)
response.raise_for_status()

# Search while downloading (more efficient for large files)
found = False
chunk_size = 8192
buffer = b""
search_bytes = search_string.encode("utf-8")

with open(file_path, "wb") as f:
for chunk in response.iter_content(chunk_size=chunk_size):
f.write(chunk)

# Search in overlapping buffer to catch strings across chunk boundaries
buffer += chunk
if search_bytes in buffer:
found = True
# Continue downloading but we know we found it

# Keep last part of buffer for overlap check
if len(buffer) > len(search_bytes) * 2:
buffer = buffer[-(len(search_bytes) * 2) :]

if found:
with print_lock:
print(f" ✓ FOUND '{search_string}' in {file_name}!")
print(f" File saved at: {file_path}")
matches_found["count"] += 1
matches_found["files"].append(file_name)
return (True, file_name)
else:
with print_lock:
print(f" String not found in {file_name}. Deleting...")
os.remove(file_path)
return (False, None)

except requests.exceptions.RequestException as e:
with print_lock:
print(f" Error downloading {file_name}: {e}")
if os.path.exists(file_path):
os.remove(file_path)
return (False, None)
except Exception as e:
with print_lock:
print(f" Error processing {file_name}: {e}")
if os.path.exists(file_path):
os.remove(file_path)
return (False, None)


def main():
FIGSHARE_ARTICLE_ID = "7376003" # english dataset, change for other languages
SEARCH_STRING = "2052702.7345.7345"
DOWNLOAD_DIR = "./wikiconv_downloads"
MAX_WORKERS = 10 # Adjust based on your server's bandwidth and CPU

print("=" * 60)
print("WikiConv File Finder (Parallel - Keep All Matches)")
print("=" * 60)
print(f"Search string: '{SEARCH_STRING}'")
print(f"Download directory: {DOWNLOAD_DIR}")
print(f"Parallel workers: {MAX_WORKERS}")
print()

# Create download directory
Path(DOWNLOAD_DIR).mkdir(parents=True, exist_ok=True)

# Get file list
print("Fetching file list from Figshare...")
files = get_file_list(FIGSHARE_ARTICLE_ID)

if not files:
print("No files found!")
sys.exit(1)

print(f"Found {len(files)} files.")
print()

start_time = time.time()

# Process files in parallel
START_INDEX = 1 # 1-based index, meaning skip first 88
if START_INDEX > len(files):
print(f"Start index ({START_INDEX}) is beyond available files ({len(files)}). Exiting.")
sys.exit(1)

files_to_process = files[START_INDEX - 1 :] # slice from the 89th file onward
total_files = len(files_to_process)
print(f"Processing files {START_INDEX}–{len(files)} ({total_files} total)...\n")

completed = 0
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# Submit tasks for remaining files only
future_to_file = {
executor.submit(
download_and_check_file,
file_info,
SEARCH_STRING,
DOWNLOAD_DIR,
idx + START_INDEX - 1,
len(files),
): file_info
for idx, file_info in enumerate(files_to_process, 1)
}

# process completed tasks
for future in as_completed(future_to_file):
found, file_name = future.result()
completed += 1

if completed % 50 == 0:
with print_lock:
print(
f"\n--- Progress: {completed}/{len(files)} files processed, {matches_found['count']} matches found ---\n"
)

elapsed = time.time() - start_time
print()
print("=" * 60)
print(f"COMPLETED: Processed all {len(files)} files.")
print(f"Matches found: {matches_found['count']}")
if matches_found["files"]:
print(f"\nFiles containing '{SEARCH_STRING}':")
for match_file in matches_found["files"]:
print(f" - {match_file}")
else:
print(f"\nSearch string '{SEARCH_STRING}' was NOT found in any file.")
print(f"\nTime elapsed: {elapsed:.2f} seconds")
print(f"Average: {elapsed/len(files):.2f} seconds per file")
print("=" * 60)


if __name__ == "__main__":
main()
Loading
Loading