From a3642965f00e557964837083e03c44e7b08b58ba Mon Sep 17 00:00:00 2001 From: ex36 <96800594+Torchee@users.noreply.github.com> Date: Mon, 22 Dec 2025 01:45:16 -0500 Subject: [PATCH 1/2] added updated wikiconv processing code --- datasets/wikiconv-corpus/README.md | 14 + datasets/wikiconv-corpus/merge_parallel.py | 117 +++++ datasets/wikiconv-corpus/raw_data.py | 196 ++++++++ datasets/wikiconv-corpus/to_jsonlist_lang.py | 354 +++++++++++++ ...kiconv_conversion_with_merging_04_28_20.py | 464 ++++++++++++++++++ 5 files changed, 1145 insertions(+) create mode 100644 datasets/wikiconv-corpus/README.md create mode 100644 datasets/wikiconv-corpus/merge_parallel.py create mode 100644 datasets/wikiconv-corpus/raw_data.py create mode 100644 datasets/wikiconv-corpus/to_jsonlist_lang.py create mode 100644 datasets/wikiconv-corpus/wikiconv_conversion_with_merging_04_28_20.py diff --git a/datasets/wikiconv-corpus/README.md b/datasets/wikiconv-corpus/README.md new file mode 100644 index 000000000..3883ad9dd --- /dev/null +++ b/datasets/wikiconv-corpus/README.md @@ -0,0 +1,14 @@ +# WikiConv Language Dataset Processing + +This directory contains the updated files used to process the English, German, Russian, Chinese, and Greek WikiConv datasets. These datasets were released with "WikiConv: A Corpus of the Complete Conversational History of a Large Online Collaborative Community", and can be found [here](https://figshare.com/projects/WikiConv_A_Corpus_of_the_Complete_Conversational_History_of_a_Large_Online_Collaborative_Community/57110). + +The files have been updated to handle datasets of all 5 languages; the original files were tailored for the English dataset only. In addition, the original English dataset contained significants amount of data containing empty or trivial text and no modification metadata (which suggests that the data was modified, deleted, or restored). This data has essentially no substance, so wikiconv_conversion_with_merging_04_28_20.py has been updated to filter these utterances out. Finally, optimizations were made to improve runtime of processing given the size of the raw data. + +## Files: +to_jsonlist_lang.py: Downloads raw data from the figshare and converts it into jsonlist format, deleting the raw files afterwards +wikiconv_conversion_with_merging_04_28_20.py: Updated code for converting jsonlist formatted files to convokit format. Includes the filter for utterances with empty text and no modifications. +merge_parallel: parallelizes the merging portion of wikiconv_conversion_with_merging_04_28_20.py since it takes too long to run for datasets of large size. + + +## Current status +The English dataset, which is by far the largest, has been reprocessed. The processing for the Greek dataset is currently running, and over the next month the others will be completed as well. \ No newline at end of file diff --git a/datasets/wikiconv-corpus/merge_parallel.py b/datasets/wikiconv-corpus/merge_parallel.py new file mode 100644 index 000000000..6c59e6e64 --- /dev/null +++ b/datasets/wikiconv-corpus/merge_parallel.py @@ -0,0 +1,117 @@ +""" +Handles merging portion of conversion separately. Runs in parallel on each year to speed up processing. +""" + +import sys +sys.path.insert(0, '/home/jonathan/research/Cornell-Conversational-Analysis-Toolkit') +from convokit import Corpus +import os +import shutil +from concurrent.futures import ProcessPoolExecutor, as_completed +from functools import partial + +def main(): + data_directory_intermediate = "/kitchen/wikiconv-convokit-processing/store_test_merging/" # intermediate directory where the split Convokit files are kept + data_directory_output = "/kitchen/wikiconv-convokit-processing/final/English/" # directory to output the merged Convokit files + delete_intermediate_files = True # set to True to delete intermediate files after merging + max_workers = 12 # number of threads/years to run in parallel + + print("Starting merge process...") + print(f"Reading from: {data_directory_intermediate}") + print(f"Writing to: {data_directory_output}") + print(f"Using {max_workers} parallel workers") + + os.makedirs(data_directory_output, exist_ok=True) + + merge_files(data_directory_output, data_directory_intermediate, max_workers) + + print("\nMerge completed successfully!") + + if delete_intermediate_files: + print(f"Deleting intermediate files from {data_directory_intermediate}") + shutil.rmtree(data_directory_intermediate) + print("Intermediate files deleted.") + + +def merge_files(final_directory, input_directory, max_workers): + # build full list + input_subdirectory_paths = [x[0] for x in os.walk(input_directory)] + + # organize files by year + files_by_year = {} + for year_x in range(2006, 2021): + year_str = str(year_x) + files_by_year[year_str] = [path for path in input_subdirectory_paths if year_str in path] + + # years in parallel + process_year_func = partial(process_single_year, final_directory=final_directory) + + with ProcessPoolExecutor(max_workers=max_workers) as executor: + future_to_year = {} + for year_x in range(2007, 2019): + year_str = str(year_x) + if len(files_by_year[year_str]) > 0: + future = executor.submit(process_year_func, year_str, files_by_year[year_str]) + future_to_year[future] = year_str + + # process results as they complete + for future in as_completed(future_to_year): + year = future_to_year[future] + try: + result = future.result() + print(f"\n✓ Completed year {year}: {result}") + except Exception as e: + print(f"\n✗ Error processing year {year}: {e}") + + +def process_single_year(year, paths_lst, final_directory): + """process a single year""" + if len(paths_lst) == 0: + return f"Skipped - no files" + + print(f"\n[Year {year}] Processing {len(paths_lst)} corpus file(s)") + + if len(paths_lst) == 1: + print(f"[Year {year}] Loading single corpus") + corpus_1 = Corpus(filename=paths_lst[0]) + output_path = final_directory + 'wikiconv_corpus_merged_' + year + corpus_1.dump(output_path) + return f"Saved single corpus" + + else: + print(f"[Year {year}] Merging {len(paths_lst)} corpus files") + + # load all corpora + corpora = [] + for idx, path in enumerate(paths_lst, start=1): + print(f"[Year {year}] Loading corpus {idx}/{len(paths_lst)}") + corpora.append(Corpus(filename=path)) + + print(f"[Year {year}] Starting merge of {len(corpora)} corpora") + + # merge in a balanced binary tree pattern for increased efficiency + round_num = 1 + while len(corpora) > 1: + next_round = [] + pairs = (len(corpora) + 1) // 2 + for i in range(0, len(corpora), 2): + if i + 1 < len(corpora): + print(f"[Year {year}] Round {round_num}: Merging pair {i//2 + 1}/{pairs}") + merged = Corpus.merge(corpora[i], corpora[i + 1]) + next_round.append(merged) + else: + # Odd one out, carry forward + next_round.append(corpora[i]) + corpora = next_round + round_num += 1 + + merged_corpus = corpora[0] + + output_path = final_directory + 'wikiconv_corpus_merged_' + str(year) + print(f"[Year {year}] Saving merged corpus") + merged_corpus.dump(output_path) + return f"Saved merged corpus ({len(paths_lst)} files merged)" + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/datasets/wikiconv-corpus/raw_data.py b/datasets/wikiconv-corpus/raw_data.py new file mode 100644 index 000000000..878ce7cdd --- /dev/null +++ b/datasets/wikiconv-corpus/raw_data.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python3 +""" +Downloads raw wikiconv datasets, and can also search for specific strings. +Parallelized for faster processing. +""" + +import requests +import os +import sys +from pathlib import Path +from concurrent.futures import ThreadPoolExecutor, as_completed +from threading import Lock +import time + +# Global lock for thread-safe printing and counter for matches +print_lock = Lock() +matches_found = {'count': 0, 'files': []} + +def get_file_list(figshare_article_id): + """Fetch the list of ALL files from Figshare API (handles pagination).""" + base_url = f"https://api.figshare.com/v2/articles/{figshare_article_id}/files" + all_files = [] + page = 1 + page_size = 100 # Max allowed by Figshare API + + try: + while True: + params = { + 'page': page, + 'page_size': page_size + } + response = requests.get(base_url, params=params) + response.raise_for_status() + files = response.json() + + if not files: + break + + all_files.extend(files) + print(f" Fetched page {page}: {len(files)} files (total so far: {len(all_files)})") + + if len(files) < page_size: + # Last page + break + + page += 1 + + return all_files + except requests.exceptions.RequestException as e: + print(f"Error fetching file list: {e}") + sys.exit(1) + +def download_and_check_file(file_info, search_string, download_dir, idx, total): + """ + Download a file, check for search string while streaming, and handle accordingly. + Returns (found, file_name) tuple. + """ + file_name = file_info['name'] + file_url = file_info['download_url'] + file_path = os.path.join(download_dir, file_name) + + with print_lock: + print(f"[{idx}/{total}] Downloading: {file_name} ({file_info['size'] / (1024*1024):.2f} MB)...") + + try: + # Download file with streaming + response = requests.get(file_url, stream=True, timeout=60) + response.raise_for_status() + + # Search while downloading (more efficient for large files) + found = False + chunk_size = 8192 + buffer = b'' + search_bytes = search_string.encode('utf-8') + + with open(file_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=chunk_size): + f.write(chunk) + + # Search in overlapping buffer to catch strings across chunk boundaries + buffer += chunk + if search_bytes in buffer: + found = True + # Continue downloading but we know we found it + + # Keep last part of buffer for overlap check + if len(buffer) > len(search_bytes) * 2: + buffer = buffer[-(len(search_bytes) * 2):] + + if found: + with print_lock: + print(f" ✓ FOUND '{search_string}' in {file_name}!") + print(f" File saved at: {file_path}") + matches_found['count'] += 1 + matches_found['files'].append(file_name) + return (True, file_name) + else: + with print_lock: + print(f" String not found in {file_name}. Deleting...") + os.remove(file_path) + return (False, None) + + except requests.exceptions.RequestException as e: + with print_lock: + print(f" Error downloading {file_name}: {e}") + if os.path.exists(file_path): + os.remove(file_path) + return (False, None) + except Exception as e: + with print_lock: + print(f" Error processing {file_name}: {e}") + if os.path.exists(file_path): + os.remove(file_path) + return (False, None) + +def main(): + FIGSHARE_ARTICLE_ID = "7376003" # english dataset, change for other languages + SEARCH_STRING = "2052702.7345.7345" + DOWNLOAD_DIR = "./wikiconv_downloads" + MAX_WORKERS = 10 # Adjust based on your server's bandwidth and CPU + + print("=" * 60) + print("WikiConv File Finder (Parallel - Keep All Matches)") + print("=" * 60) + print(f"Search string: '{SEARCH_STRING}'") + print(f"Download directory: {DOWNLOAD_DIR}") + print(f"Parallel workers: {MAX_WORKERS}") + print() + + # Create download directory + Path(DOWNLOAD_DIR).mkdir(parents=True, exist_ok=True) + + # Get file list + print("Fetching file list from Figshare...") + files = get_file_list(FIGSHARE_ARTICLE_ID) + + if not files: + print("No files found!") + sys.exit(1) + + print(f"Found {len(files)} files.") + print() + + start_time = time.time() + + # Process files in parallel + START_INDEX = 1 # 1-based index, meaning skip first 88 + if START_INDEX > len(files): + print(f"Start index ({START_INDEX}) is beyond available files ({len(files)}). Exiting.") + sys.exit(1) + + files_to_process = files[START_INDEX - 1:] # slice from the 89th file onward + total_files = len(files_to_process) + print(f"Processing files {START_INDEX}–{len(files)} ({total_files} total)...\n") + + completed = 0 + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + # Submit tasks for remaining files only + future_to_file = { + executor.submit( + download_and_check_file, + file_info, + SEARCH_STRING, + DOWNLOAD_DIR, + idx + START_INDEX - 1, + len(files) + ): file_info + for idx, file_info in enumerate(files_to_process, 1) + } + + # process completed tasks + for future in as_completed(future_to_file): + found, file_name = future.result() + completed += 1 + + if completed % 50 == 0: + with print_lock: + print(f"\n--- Progress: {completed}/{len(files)} files processed, {matches_found['count']} matches found ---\n") + + elapsed = time.time() - start_time + print() + print("=" * 60) + print(f"COMPLETED: Processed all {len(files)} files.") + print(f"Matches found: {matches_found['count']}") + if matches_found['files']: + print(f"\nFiles containing '{SEARCH_STRING}':") + for match_file in matches_found['files']: + print(f" - {match_file}") + else: + print(f"\nSearch string '{SEARCH_STRING}' was NOT found in any file.") + print(f"\nTime elapsed: {elapsed:.2f} seconds") + print(f"Average: {elapsed/len(files):.2f} seconds per file") + print("=" * 60) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/datasets/wikiconv-corpus/to_jsonlist_lang.py b/datasets/wikiconv-corpus/to_jsonlist_lang.py new file mode 100644 index 000000000..9514e5fe6 --- /dev/null +++ b/datasets/wikiconv-corpus/to_jsonlist_lang.py @@ -0,0 +1,354 @@ +#!/usr/bin/env python3 +""" +Downloads raw wikiconv dataset based on specify id and converts them to JSONLIST format in parallel. +Deletes raw files after successful conversion. +""" + +import requests +import os +import sys +import json +from pathlib import Path +from concurrent.futures import ThreadPoolExecutor, as_completed +from threading import Lock +from datetime import datetime +from collections import defaultdict +import time + +print_lock = Lock() +stats = { + 'downloaded': 0, + 'converted': 0, + 'failed': 0, + 'total_conversations': 0 +} + +def get_file_list(figshare_article_id): + """Fetch list of all files from Figshare API""" + base_url = f"https://api.figshare.com/v2/articles/{figshare_article_id}/files" + all_files = [] + page = 1 + page_size = 100 + + try: + while True: + params = {'page': page, 'page_size': page_size} + response = requests.get(base_url, params=params) + response.raise_for_status() + files = response.json() + + if not files: + break + + all_files.extend(files) + print(f" Fetched page {page}: {len(files)} files (total: {len(all_files)})") + + if len(files) < page_size: + break + + page += 1 + + return all_files + except requests.exceptions.RequestException as e: + print(f"Error fetching file list: {e}") + sys.exit(1) + +def parse_timestamp(timestamp_str): + """Convert timestamp string to unix timestamp""" + formats = [ + "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%d %H:%M:%S UTC", + "%Y-%m-%d %H:%M:%S" + ] + + timestamp_str = timestamp_str.replace(" UTC", "") + + for fmt in formats: + try: + dt = datetime.strptime(timestamp_str, fmt) + return dt.timestamp() + except: + continue + + return 0.0 + +def extract_page_type_and_title(page_title): + """Extract page type and clean title from the full page title""" + namespace_mappings = { + 'User talk:': 'user_talk', + 'User:': 'user', + 'Talk:': 'talk', + 'Wikipedia talk:': 'wikipedia_talk', + 'Wikipedia:': 'wikipedia', + 'File talk:': 'file_talk', + 'File:': 'file', + 'Template talk:': 'template_talk', + 'Template:': 'template', + 'Help talk:': 'help_talk', + 'Help:': 'help', + 'Category talk:': 'category_talk', + 'Category:': 'category', + 'Project talk:': 'project_talk', + 'Project:': 'project', + 'Συζήτηση χρήστη:': 'user_talk', + 'Συζήτηση:': 'talk', + 'Χρήστης:': 'user', + } + + for prefix in sorted(namespace_mappings.keys(), key=len, reverse=True): + if page_title.startswith(prefix): + clean_title = page_title[len(prefix):] + page_type = namespace_mappings[prefix] + return page_type, clean_title, page_title + + return 'article', page_title, page_title + +def process_comment(comment_data): + """Process a single comment from plain text format to expected format""" + timestamp = parse_timestamp(comment_data.get('timestamp', '')) + page_type, clean_title, raw_title = extract_page_type_and_title( + comment_data.get('page_title', '') + ) + + toxicity = float(comment_data.get('toxicity', 0.0)) + sever_toxicity = float(comment_data.get('sever_toxicity', 0.0)) + + processed_comment = { + 'conversation_id': comment_data.get('conversation_id'), + 'id': comment_data.get('id'), + 'indentation': str(comment_data.get('indentation', 0)), + 'type': comment_data.get('type', 'CREATION').upper(), + 'page_id': str(comment_data.get('page_id', '')), + 'page_title': raw_title, + 'parent_id': comment_data.get('parent_id'), + 'ancestor_id': comment_data.get('ancestor_id'), + 'replyTo_id': comment_data.get('replyTo_id'), + 'rev_id': str(comment_data.get('rev_id', '')), + 'user_id': str(comment_data.get('user_id', '')), + 'user_text': comment_data.get('user_text', ''), + 'toxicity': toxicity, + 'sever_toxicity': sever_toxicity, + 'raw_text': comment_data.get('content', ''), + 'text': comment_data.get('cleaned_content', ''), + 'timestamp': timestamp, + 'is_unchanged': comment_data.get('isUnchanged', False), + 'wiki_links': [] + } + + return processed_comment, page_type, clean_title, raw_title + +def convert_to_jsonlist(input_path, output_path): + """Convert a raw file to jsonlist format""" + conversations = defaultdict(lambda: { + 'comments': [], + 'authors': set(), + 'page_info': {} + }) + + with open(input_path, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip() + if not line: + continue + + try: + comment_data = json.loads(line) + processed_comment, page_type, clean_title, raw_title = process_comment(comment_data) + + conv_id = comment_data.get('conversation_id') + if not conv_id: + continue + + conversations[conv_id]['comments'].append(processed_comment) + + if not conversations[conv_id]['page_info']: + conversations[conv_id]['page_info'] = { + 'page_id': str(comment_data.get('page_id', '')), + 'page_type': page_type, + 'page_title': clean_title, + 'raw_page_title': raw_title + } + + if 'authors' in comment_data: + for author_str in comment_data.get('authors', []): + parts = author_str.split(':', 1) + if len(parts) == 2: + user_id, user_text = parts + conversations[conv_id]['authors'].add((user_id, user_text)) + else: + user_id = str(comment_data.get('user_id', '')) + user_text = comment_data.get('user_text', '') + if user_id and user_text: + conversations[conv_id]['authors'].add((user_id, user_text)) + + except (json.JSONDecodeError, Exception): + continue + + # sort and write convos + with open(output_path, 'w', encoding='utf-8') as f: + for conv_id, conv_data in sorted(conversations.items()): + if not conv_data['comments']: + continue + + creation_comments = [c for c in conv_data['comments'] if c['type'] == 'CREATION'] + other_comments = [c for c in conv_data['comments'] if c['type'] != 'CREATION'] + creation_comments.sort(key=lambda x: (x['timestamp'], x['id'])) + other_comments.sort(key=lambda x: (x['timestamp'], x['id'])) + sorted_comments = creation_comments + other_comments + + authors = [ + {'user_text': user_text, 'user_id': user_id} + for user_id, user_text in sorted(conv_data['authors']) + ] + + conversation = { + 'conversation_id': conv_id, + 'page_id': conv_data['page_info']['page_id'], + 'raw_page_title': conv_data['page_info']['raw_page_title'], + 'page_type': conv_data['page_info']['page_type'], + 'page_title': conv_data['page_info']['page_title'], + 'section_title': None, + 'comments': sorted_comments, + 'authors': authors + } + + json.dump(conversation, f, ensure_ascii=False) + f.write('\n') + + return len(conversations) + +def download_and_convert_file(file_info, raw_dir, output_dir, idx, total): + """Downloads a file, converts it to JSONLIST, and then deletes the raw file.""" + file_name = file_info['name'] + file_url = file_info['download_url'] + raw_path = os.path.join(raw_dir, file_name) + + # output filename + base_name = os.path.splitext(file_name)[0] + output_filename = f"{base_name}.jsonlist" + output_path = os.path.join(output_dir, output_filename) + + with print_lock: + print(f"[{idx}/{total}] Processing: {file_name} ({file_info['size'] / (1024*1024):.2f} MB)") + + try: + # download + response = requests.get(file_url, stream=True, timeout=120) + response.raise_for_status() + + with open(raw_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + with print_lock: + stats['downloaded'] += 1 + + conv_count = convert_to_jsonlist(raw_path, output_path) + + # delete raw file after successful conversion + os.remove(raw_path) + + with print_lock: + stats['converted'] += 1 + stats['total_conversations'] += conv_count + print(f" ✓ Converted {file_name} → {output_filename} ({conv_count} conversations)") + + return (True, conv_count) + + except requests.exceptions.RequestException as e: + with print_lock: + stats['failed'] += 1 + print(f" ✗ Download error for {file_name}: {e}") + if os.path.exists(raw_path): + os.remove(raw_path) + return (False, 0) + except Exception as e: + with print_lock: + stats['failed'] += 1 + print(f" ✗ Processing error for {file_name}: {e}") + if os.path.exists(raw_path): + os.remove(raw_path) + if os.path.exists(output_path): + os.remove(output_path) + return (False, 0) + +def main(): + FIGSHARE_ARTICLE_ID = "7376003" # English dataset id, change for other datasets + RAW_DIR = "./raw_data/English" + OUTPUT_DIR = "./output/English" + MAX_WORKERS = 10 # adjust as needed + + print("=" * 70) + print("WikiConv Download & Convert Pipeline") + print("=" * 70) + print(f"Raw files directory (temporary): {RAW_DIR}") + print(f"Output JSONLIST directory: {OUTPUT_DIR}") + print(f"Parallel workers: {MAX_WORKERS}") + print() + + # create directories + Path(RAW_DIR).mkdir(parents=True, exist_ok=True) + Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True) + + print("Fetching file list from Figshare...") + files = get_file_list(FIGSHARE_ARTICLE_ID) + + if not files: + print("No files found!") + sys.exit(1) + + print(f"Found {len(files)} files to process.") + print() + + start_time = time.time() + + # process files in parallel + completed = 0 + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + future_to_file = { + executor.submit( + download_and_convert_file, + file_info, + RAW_DIR, + OUTPUT_DIR, + idx, + len(files) + ): file_info + for idx, file_info in enumerate(files, 1) + } + + for future in as_completed(future_to_file): + success, conv_count = future.result() + completed += 1 + + # checks every 25 files + if completed % 25 == 0: + with print_lock: + print(f"\n--- Progress: {completed}/{len(files)} files | " + f"Downloaded: {stats['downloaded']} | " + f"Converted: {stats['converted']} | " + f"Failed: {stats['failed']} | " + f"Conversations: {stats['total_conversations']:,} ---\n") + + elapsed = time.time() - start_time + + try: + os.rmdir(RAW_DIR) + except: + pass + + print() + print("=" * 70) + print(f"PIPELINE COMPLETED") + print(f"Files processed: {len(files)}") + print(f"Successfully downloaded: {stats['downloaded']}") + print(f"Successfully converted: {stats['converted']}") + print(f"Failed: {stats['failed']}") + print(f"Total conversations: {stats['total_conversations']:,}") + print(f"Time elapsed: {elapsed:.2f} seconds ({elapsed/60:.2f} minutes)") + print(f"Average: {elapsed/len(files):.2f} seconds per file") + print(f"\nOutput location: {OUTPUT_DIR}") + print("=" * 70) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/datasets/wikiconv-corpus/wikiconv_conversion_with_merging_04_28_20.py b/datasets/wikiconv-corpus/wikiconv_conversion_with_merging_04_28_20.py new file mode 100644 index 000000000..8460d5717 --- /dev/null +++ b/datasets/wikiconv-corpus/wikiconv_conversion_with_merging_04_28_20.py @@ -0,0 +1,464 @@ +import json +import sys + +from matplotlib import text +sys.path.insert(0, '/home/jonathan/research/Cornell-Conversational-Analysis-Toolkit') +from convokit import Corpus, Speaker, Utterance, Conversation +import os +from datetime import datetime, timedelta +import copy +import shutil + +def main(): + + #PARAMETERS FOR USER TO SET + data_directory_input = "/kitchen/wikiconv-convokit-processing/output/English" #directory where the files to transform into the Convokit format are kept + data_directory_intermediate = "/kitchen/wikiconv-convokit-processing/store_test_merging/"#intermediate directory where the split Convokit files are also kept + data_directory_output = "/kitchen/wikiconv-convokit-processing/final/English/" #directory to output the merged Convokit files + delete_intermediate_files = True #set to True for data_directory_intermediate to be immediately deleted; otherwise intermediate directory is also stored + utterances_before_output = 5*(10 ** 5) #number of utterances before output is generated to limit memory consumption, initalized to half a million + + #Setting up transformation script varialbes + complete_utterance_list = [] + master_conversation_level_dict = {} + input_files_read = set() + count_of_data_splits = 0 + #Read all files from the data input + for input_filename in os.listdir(data_directory_input): + if ((input_filename.endswith(".jsonlist") and (not(input_filename in input_files_read)))): + input_files_read.add(input_filename) + final_input_path = os.path.join(data_directory_input, input_filename) + print (str(final_input_path)) #print the current file as an indicator of progress + + individual_utterance_list, conversation_individual_dict = create_utterances_from_individual_file(final_input_path) + master_conversation_level_dict.update(conversation_individual_dict) + + #Error checking - see check_for_dictionary_vals method + # for value in individual_utterance_list: + # check_for_dictionary_vals(value) + + #to reduce memory consumption, reset the utterance list if it gets too long + complete_utterance_list.extend(individual_utterance_list) + if (len(complete_utterance_list)> utterances_before_output): + dictionary_of_year_lists = separate_by_year(complete_utterance_list) + for key, value in dictionary_of_year_lists.items(): + set_meta_dump_corpus(value, master_conversation_level_dict, data_directory_intermediate, + 'conv_corpus_year_'+str(key)+'_data_split_'+str(count_of_data_splits)) + complete_utterance_list = [] + master_conversation_level_dict = {} + count_of_data_splits +=1 + + + #Once all the data has been converted to a list of utterances, break up the data by the year + dictionary_of_year_lists = separate_by_year(complete_utterance_list) + for key, value in dictionary_of_year_lists.items(): + set_meta_dump_corpus(value, master_conversation_level_dict, data_directory_intermediate, 'conv_corpus_year_'+str(key)+'_data_split_'+str(count_of_data_splits)) + + + #Now merge files into the final master and remove intermediate files if neccessary + merge_files(data_directory_output, data_directory_intermediate) + if (delete_intermediate_files): + shutil.rmtree(data_directory_intermediate) + + +# def check_dict_for_values(dict_val): +# list_of_keys = dict_val.keys(): + + +# Un-comment for error checking, ensure that every value 3 in the utterance's metadata is a dictionary; +# def check_for_dictionary_vals(utterance): +# modification_list = utterance.meta['modification'] +# deletion_list = utterance.meta['deletion'] +# restoration_list = utterance.meta['restoration'] + +# if (len(modification_list)>0): +# for value in modification_list: +# # print(value) +# if (not(type(value) is dict)): +# print (type(value)) +# if (len(deletion_list)>0): +# for value in deletion_list: +# if (not(type(value) is dict)): +# print (type(value)) + +# if (len(restoration_list)>0): +# for value in restoration_list: +# if (not(type(value) is dict)): +# print (type(value)) + +#function for files to merge +def merge_files(final_directory, input_directory): + input_subdirectory_paths = [x[0] for x in os.walk(input_directory)] + for year_x in range(1990, 2021): + print(year_x) + full_lst_corpora = [] + full_lst_corpora.extend(search_list_for_year(input_subdirectory_paths, str(year_x))) + master_corpora(final_directory, full_lst_corpora, str(year_x)) + + +def search_list_for_year(lst_elements, year): + matching_elements = [] + for file in lst_elements: + if year in file: + matching_elements.append(file) + return matching_elements + +def master_corpora(final_directory, paths_lst, year): + # print ('YEAR: ' + str(year) + ' list is ' + str(paths_lst)) + if (len(paths_lst)==0): + pass + + elif (len(paths_lst)== 1): + corpus_1 = Corpus(filename =paths_lst[0]) + corpus_1.dump(final_directory + 'wikiconv_corpus_merged_'+year) + + else: + corpus_1 = Corpus(filename =paths_lst[0]) + corpus_2 = Corpus(filename = paths_lst[1]) + # merged_corpus = corpus_1.merge(corpus_2) + merged_corpus = Corpus.merge(corpus_1, corpus_2) + if (len(paths_lst) >2): + for val in paths_lst[2:]: + merged_corpus = merged_corpus.merge(merged_corpus, Corpus(filename = val)) + merged_corpus.dump(final_directory + 'wikiconv_corpus_merged_'+str(year)) + +def is_empty_utterance(utterance): + """check if an utterance is empty (both text and original are empty/None)""" + # Check if text is empty or None + text_is_empty = not utterance.text or utterance.text.strip() == "" or utterance.text.strip() == '-' + + # Check if original is empty or None + # original = utterance.get('original') + # original_is_empty = True + + # if original is not None: + # # If original exists, check if its text is empty + # original_text = getattr(original, 'text', None) + # if original_text: + # original_is_empty = False + + # Filter out only if BOTH are empty + if text_is_empty and utterance.meta['original'] is None: + # print("breaK") + print(utterance.text) + print(utterance.meta['original']) + return text_is_empty and utterance.meta['original'] is None + + +def set_meta_dump_corpus(complete_utterance_list,master_conversation_level_dict, data_directory_output, corpus_name): + filtered_utterance_list = [u for u in complete_utterance_list if not is_empty_utterance(u)] + + print(f"Filtered {len(complete_utterance_list) - len(filtered_utterance_list)} empty utterances") + print(f"Remaining utterances: {len(filtered_utterance_list)}") + + if len(filtered_utterance_list) == 0: + print(f"Warning: No utterances remaining for corpus {corpus_name}") + return + + conversation_corpus = Corpus(utterances = filtered_utterance_list) + #Set the conversation level meta data in the corpus + for conversation in conversation_corpus.iter_conversations(): + conversation.meta = master_conversation_level_dict[conversation.id] + conversation_corpus.dump(data_directory_output + corpus_name) + + + # conversation_corpus = Corpus(utterances = complete_utterance_list) + # #Set the conversation level meta data in the corpus + # for conversation in conversation_corpus.iter_conversations(): + # conversation.meta = master_conversation_level_dict[conversation.id] + # conversation_corpus.dump(data_directory_output + corpus_name) + +#Separate utterances into lists by year +def separate_by_year(individual_utterance_list): + different_year_timestamps = set() + dictionary_of_lists = {} + + for utterance in individual_utterance_list: + timestamp_value = utterance.timestamp + datetime_value = datetime.fromtimestamp(timestamp_value) + year_value = datetime_value.year + if (year_value in dictionary_of_lists): + dictionary_of_lists[year_value].append(utterance) + else: + dictionary_of_lists[year_value] = [utterance] + + return dictionary_of_lists + +def create_utterances_from_individual_file(name_of_file_to_convert): + list_of_utterances= [] + conversation_level_info = {} + with open(name_of_file_to_convert, "r") as f: + for line in f: + dict_of_utterances = {} + list_of_order_identification_values = [] + json_val = json.loads(line) + + #conversational level data + conversation_id = json_val['conversation_id'] + + #Create the conversation meta values + page_id = json_val['page_id'] + page_title = json_val['page_title'] + page_type = json_val['page_type'] + conversation_meta_dict = {'page_id':page_id, 'page_title': page_title, 'page_type': page_type} + conversation_level_info[conversation_id] = conversation_meta_dict + + + #Reformat each set of conversations in the final form + comments = json_val['comments'] + if (len(comments) > 0): + for comment_number in range(len(comments)): + comment_val = comments[comment_number] + if (comment_number == 0): + section_header_value = True + else: + section_header_value = False + utterance_val, order_identification_val = reformat_comment(comment_val, section_header_value) + + dict_of_utterances[utterance_val.id] = utterance_val + list_of_order_identification_values.append(order_identification_val) + + correct_order_of_comments = correct_comment_order(list_of_order_identification_values) + + + final_utterance_list = final_dict(dict_of_utterances, correct_order_of_comments) + flat_utterance_list = [utterance_sub for sublist in final_utterance_list for utterance_sub in sublist] + list_of_utterances.extend(flat_utterance_list) + + return (list_of_utterances, conversation_level_info) + +#Return the correct dictionary order +def final_dict(dict_of_utterances, correct_order_identification_values): + final_list = [] + if (len(correct_order_identification_values) == 1): + first_comment = correct_order_identification_values[0][0] + final_list.append([dict_of_utterances[first_comment['id']]]) + return final_list + + for list_of_comments in correct_order_identification_values: + #print (list_of_comments) + if (len(list_of_comments) == 1): + first_comment = list_of_comments[0] + final_list.append([dict_of_utterances[first_comment['id']]]) + else: + #In all cases the original comment is the first comment in the thread (the original comment's modification/deletion/restoration will always be empty) + original_comment_order_id = list_of_comments[0] + original_utterance = dict_of_utterances[original_comment_order_id['id']] + #original_utterance.meta['original'] = original_utterance + final_list.append([original_utterance]) + + #Now fill out the modification/deletion/restoration objects and add to the final list if its an addition object + for x in range(1, (len(list_of_comments))): + current_comment_order_id = list_of_comments[x] + current_comment_order_id_type = current_comment_order_id['type'].lower() + #print (current_comment_order_id_type) + if (current_comment_order_id_type == 'addition'): + utterance_to_append = dict_of_utterances[current_comment_order_id['id']] + final_list.append([utterance_to_append]) + + for finalized_utterance_list in final_list: + for utterance in finalized_utterance_list: + #print ((current_comment_order_id['parent_id'])) + # print('hiiiiiii') + if (current_comment_order_id['parent_id'] == utterance.id): + original_utterance_value = copy.deepcopy(utterance) + comment_to_append = dict_of_utterances[current_comment_order_id['id']] + utterance.meta[current_comment_order_id_type].append(comment_to_append) + if (current_comment_order_id_type == 'deletion'): + rewrite_utterance_deletion(utterance, comment_to_append) + else: + rewrite_utterance_data(utterance, comment_to_append) + if (utterance.meta['original'] is None): + # print("hm") + utterance.meta['original'] = utterance_to_dict(original_utterance_value) + check_action_lists(utterance, current_comment_order_id, dict_of_utterances, current_comment_order_id_type) + convert_utterance_values_to_dict(utterance, current_comment_order_id, dict_of_utterances, current_comment_order_id_type) + + # return utterance_list + return final_list + +#If an utterance is deleted, make the final utterance text empty and change the id, speaker, reply-to and timestamp +def rewrite_utterance_deletion (utterance, comment_to_append): + utterance.id = comment_to_append.id + utterance.speaker = comment_to_append.speaker + utterance.reply_to = comment_to_append.reply_to + utterance.timestamp = comment_to_append.timestamp + utterance.text = " " + +#If an utterance is modified, make the final utteranxe text equal to the modified text and change the id, speaker, reply-to and timestamp +def rewrite_utterance_data(utterance, comment_to_append): + utterance.id = comment_to_append.id + utterance.speaker = comment_to_append.speaker + utterance.reply_to = comment_to_append.reply_to + utterance.timestamp = comment_to_append.timestamp + utterance.text = comment_to_append.text + + +def create_utterance_list(list_of_comments): + final_list_of_utterances = [] + for individual_comment_list in list_of_comments: + ind_com = individual_comment_list[0] + + #Construct the speaker value of type Speaker Class + speaker_dict = ind_com['speaker_info'] + speaker_value = Speaker(id = speaker_dict['speaker'], meta = {'speaker_id': speaker_dict['speaker_id']}) + + #Construct the utterance value of type Utterance Class + utterance_value = Utterance(ind_com['id'], speaker_value, ind_com['root'], ind_com['reply-to'], ind_com['timestamp'], ind_com['text'], None, meta = ind_com['meta']) + final_list_of_utterances.append(utterance_value) + return final_list_of_utterances + + +#Check within the lists +def check_action_lists(utterance, current_comment_order_id, dict_of_utterances, current_comment_order_id_type): + modification_list = utterance.meta['modification'] + deletion_list = utterance.meta['deletion'] + restoration_list = utterance.meta['restoration'] + comment_to_append = dict_of_utterances[current_comment_order_id['id']] + if (len(modification_list)>0): + for utterance_val in modification_list: + check_id_add(utterance, utterance_val, current_comment_order_id, current_comment_order_id_type, comment_to_append) + if (len(deletion_list)>0): + for utterance_val in deletion_list: + check_id_add(utterance, utterance_val, current_comment_order_id, current_comment_order_id_type, comment_to_append) + + if (len(restoration_list)>0): + for utterance_val in restoration_list: + check_id_add(utterance, utterance_val, current_comment_order_id, current_comment_order_id_type, comment_to_append) + + +# Convert the utterance's data into a dictionary +def utterance_to_dict(utterance_val): + dict_rep = {} + if (type(utterance_val) == Utterance): + dict_rep['id'] = utterance_val.id + dict_rep['speaker'] = {'id':utterance_val.speaker.id, 'speaker_id':utterance_val.speaker.meta['speaker_id']} + + # Speaker(id = speaker, meta = {'speaker_id': speaker_id_val}) + dict_rep['root'] = utterance_val.conversation_id + dict_rep['reply_to'] = utterance_val.reply_to + dict_rep['timestamp'] = utterance_val.timestamp + dict_rep['text'] = utterance_val.text + dict_rep['meta_dict'] = utterance_val.meta + # print (str((utterance_val.meta))) + return dict_rep + else: + return None + +def convert_utterance_values_to_dict(utterance, current_comment_order_id, dict_of_utterances, current_comment_order_id_type): + modification_list = utterance.meta['modification'] + deletion_list = utterance.meta['deletion'] + restoration_list = utterance.meta['restoration'] + new_modification_list = [] + new_deletion_list = [] + new_restoration_list = [] + + if (len(modification_list)>0): + for utterance_val in modification_list: + returned_val = utterance_to_dict(utterance_val) + if (returned_val is not None): + new_modification_list.append(returned_val) + if (len(deletion_list)>0): + for utterance_val in deletion_list: + returned_val = utterance_to_dict(utterance_val) + if (returned_val is not None): + new_deletion_list.append(returned_val) + if (len(restoration_list)>0): + for utterance_val in restoration_list: + returned_val = utterance_to_dict(utterance_val) + if (returned_val is not None): + new_restoration_list.append(returned_val) + + utterance.meta['modification'] = new_modification_list + utterance.meta['deletion'] = new_deletion_list + utterance.meta['restoration'] = new_restoration_list + +#Since top level data is stored as an utterance and metadata as a dictionary, uniform method to get the id +def get_id_from_utt_or_dict(utterance_val): + #find the utterance value + if (isinstance(utterance_val, dict)): + id_val_utterance = utterance_val.get('id') + else: + id_val_utterance = utterance_val.id + + return id_val_utterance + +#Add the comment to the action list +def check_id_add(utterance, utterance_val, current_comment_order_id, current_comment_order_id_type, comment_to_append): + + if (current_comment_order_id['parent_id'] == get_id_from_utt_or_dict(utterance_val) and check_not_in_list(utterance.meta[current_comment_order_id_type], comment_to_append)): + utterance.meta[current_comment_order_id_type].append(comment_to_append) + if (current_comment_order_id_type == 'deletion'): + rewrite_utterance_deletion(utterance, comment_to_append) + else: + rewrite_utterance_data(utterance, comment_to_append) + +#Make sure that this comment ahs not already been added +def check_not_in_list(list_of_values, utterance): + utterance_id = utterance.id + timestamp_val = utterance.timestamp + for value in list_of_values: + if (get_id_from_utt_or_dict(value) == utterance_id and value.timestamp == timestamp_val): + return False + return True + +#Find the correct order of comments to display +def correct_comment_order(list_of_order_identification_values): + correct_order = [] + for value in list_of_order_identification_values: + current_type = value['type'].lower() + if (current_type == 'creation' or current_type == 'addition'): + correct_order.append([value]) + elif (current_type == 'modification' or current_type == 'deletion' or current_type == 'restoration'): + id_changed = value['parent_id'] + for each_ordered_list in correct_order: + if (len(each_ordered_list) > 0): + for each_comment in each_ordered_list: + if (each_comment['id'] == id_changed): + each_ordered_list.append(value) + + return correct_order + + +#Create common information doc from the json val and store as an utterance +def reformat_comment(json_val, section_header_value): + #Top level/ required information + id_val = json_val['id'] + root_val = json_val['conversation_id'] + reply_to = json_val['replyTo_id'] + text = json_val['text'] + timestamp = json_val['timestamp'] #not required but placed in the top level + + #Access the infrmoation necessary for the speaker class + speaker = json_val['user_text'] + speaker_id_val = json_val['user_id'] + + #Construct the Speaker value of type Speaker Class + speaker_value = Speaker(id = speaker, meta = {'speaker_id': speaker_id_val}) + # speaker_value = {'id':speaker; 'speaker_id':speaker_id} + + #Values for the meta dictionary + is_section_header = section_header_value + indentation = json_val['indentation'] + toxicity = json_val['toxicity'] + sever_toxicity = json_val['sever_toxicity'] + ancestor_id = json_val['ancestor_id'] + rev_id = json_val['rev_id'] + + #Used to identify order + parent_id = json_val['parent_id'] + type_val = json_val['type'] + + #Build the meta dict + meta_dict = {'is_section_header':is_section_header, 'indentation':indentation, 'toxicity':toxicity, 'sever_toxicity':sever_toxicity, 'ancestor_id':ancestor_id, 'rev_id':rev_id, 'parent_id':parent_id, + 'original': None, 'modification': [], 'deletion': [], 'restoration':[]} + + #Construct the utterance value of type Utterance Class + utterance_value = Utterance(id = id_val,speaker= speaker_value, conversation_id = root_val, reply_to = reply_to,timestamp = timestamp, text= text, meta = meta_dict) + order_identification = {'id':id_val, 'parent_id':parent_id, 'type':type_val, 'timestamp': timestamp} + + return (utterance_value, order_identification) + + +if __name__ == '__main__': + main() From 5d60eb94f59835e023d70444196acb5b2220e32b Mon Sep 17 00:00:00 2001 From: ex36 <96800594+Torchee@users.noreply.github.com> Date: Mon, 22 Dec 2025 02:04:07 -0500 Subject: [PATCH 2/2] added black formatting --- datasets/wikiconv-corpus/merge_parallel.py | 40 +- datasets/wikiconv-corpus/raw_data.py | 95 ++-- datasets/wikiconv-corpus/to_jsonlist_lang.py | 314 ++++++----- ...kiconv_conversion_with_merging_04_28_20.py | 511 +++++++++++------- 4 files changed, 541 insertions(+), 419 deletions(-) diff --git a/datasets/wikiconv-corpus/merge_parallel.py b/datasets/wikiconv-corpus/merge_parallel.py index 6c59e6e64..2e6cba9a4 100644 --- a/datasets/wikiconv-corpus/merge_parallel.py +++ b/datasets/wikiconv-corpus/merge_parallel.py @@ -3,19 +3,21 @@ """ import sys -sys.path.insert(0, '/home/jonathan/research/Cornell-Conversational-Analysis-Toolkit') + +sys.path.insert(0, "/home/jonathan/research/Cornell-Conversational-Analysis-Toolkit") from convokit import Corpus import os import shutil from concurrent.futures import ProcessPoolExecutor, as_completed from functools import partial + def main(): data_directory_intermediate = "/kitchen/wikiconv-convokit-processing/store_test_merging/" # intermediate directory where the split Convokit files are kept data_directory_output = "/kitchen/wikiconv-convokit-processing/final/English/" # directory to output the merged Convokit files delete_intermediate_files = True # set to True to delete intermediate files after merging max_workers = 12 # number of threads/years to run in parallel - + print("Starting merge process...") print(f"Reading from: {data_directory_intermediate}") print(f"Writing to: {data_directory_output}") @@ -24,9 +26,9 @@ def main(): os.makedirs(data_directory_output, exist_ok=True) merge_files(data_directory_output, data_directory_intermediate, max_workers) - + print("\nMerge completed successfully!") - + if delete_intermediate_files: print(f"Deleting intermediate files from {data_directory_intermediate}") shutil.rmtree(data_directory_intermediate) @@ -36,16 +38,16 @@ def main(): def merge_files(final_directory, input_directory, max_workers): # build full list input_subdirectory_paths = [x[0] for x in os.walk(input_directory)] - + # organize files by year files_by_year = {} for year_x in range(2006, 2021): year_str = str(year_x) files_by_year[year_str] = [path for path in input_subdirectory_paths if year_str in path] - + # years in parallel process_year_func = partial(process_single_year, final_directory=final_directory) - + with ProcessPoolExecutor(max_workers=max_workers) as executor: future_to_year = {} for year_x in range(2007, 2019): @@ -53,7 +55,7 @@ def merge_files(final_directory, input_directory, max_workers): if len(files_by_year[year_str]) > 0: future = executor.submit(process_year_func, year_str, files_by_year[year_str]) future_to_year[future] = year_str - + # process results as they complete for future in as_completed(future_to_year): year = future_to_year[future] @@ -68,27 +70,27 @@ def process_single_year(year, paths_lst, final_directory): """process a single year""" if len(paths_lst) == 0: return f"Skipped - no files" - + print(f"\n[Year {year}] Processing {len(paths_lst)} corpus file(s)") - + if len(paths_lst) == 1: print(f"[Year {year}] Loading single corpus") corpus_1 = Corpus(filename=paths_lst[0]) - output_path = final_directory + 'wikiconv_corpus_merged_' + year + output_path = final_directory + "wikiconv_corpus_merged_" + year corpus_1.dump(output_path) return f"Saved single corpus" - + else: print(f"[Year {year}] Merging {len(paths_lst)} corpus files") - + # load all corpora corpora = [] for idx, path in enumerate(paths_lst, start=1): print(f"[Year {year}] Loading corpus {idx}/{len(paths_lst)}") corpora.append(Corpus(filename=path)) - + print(f"[Year {year}] Starting merge of {len(corpora)} corpora") - + # merge in a balanced binary tree pattern for increased efficiency round_num = 1 while len(corpora) > 1: @@ -106,12 +108,12 @@ def process_single_year(year, paths_lst, final_directory): round_num += 1 merged_corpus = corpora[0] - - output_path = final_directory + 'wikiconv_corpus_merged_' + str(year) + + output_path = final_directory + "wikiconv_corpus_merged_" + str(year) print(f"[Year {year}] Saving merged corpus") merged_corpus.dump(output_path) return f"Saved merged corpus ({len(paths_lst)} files merged)" -if __name__ == '__main__': - main() \ No newline at end of file +if __name__ == "__main__": + main() diff --git a/datasets/wikiconv-corpus/raw_data.py b/datasets/wikiconv-corpus/raw_data.py index 878ce7cdd..94273be39 100644 --- a/datasets/wikiconv-corpus/raw_data.py +++ b/datasets/wikiconv-corpus/raw_data.py @@ -14,7 +14,8 @@ # Global lock for thread-safe printing and counter for matches print_lock = Lock() -matches_found = {'count': 0, 'files': []} +matches_found = {"count": 0, "files": []} + def get_file_list(figshare_article_id): """Fetch the list of ALL files from Figshare API (handles pagination).""" @@ -22,84 +23,84 @@ def get_file_list(figshare_article_id): all_files = [] page = 1 page_size = 100 # Max allowed by Figshare API - + try: while True: - params = { - 'page': page, - 'page_size': page_size - } + params = {"page": page, "page_size": page_size} response = requests.get(base_url, params=params) response.raise_for_status() files = response.json() - + if not files: break - + all_files.extend(files) print(f" Fetched page {page}: {len(files)} files (total so far: {len(all_files)})") - + if len(files) < page_size: # Last page break - + page += 1 - + return all_files except requests.exceptions.RequestException as e: print(f"Error fetching file list: {e}") sys.exit(1) + def download_and_check_file(file_info, search_string, download_dir, idx, total): """ Download a file, check for search string while streaming, and handle accordingly. Returns (found, file_name) tuple. """ - file_name = file_info['name'] - file_url = file_info['download_url'] + file_name = file_info["name"] + file_url = file_info["download_url"] file_path = os.path.join(download_dir, file_name) - + with print_lock: - print(f"[{idx}/{total}] Downloading: {file_name} ({file_info['size'] / (1024*1024):.2f} MB)...") - + print( + f"[{idx}/{total}] Downloading: {file_name} ({file_info['size'] / (1024*1024):.2f} MB)..." + ) + try: # Download file with streaming response = requests.get(file_url, stream=True, timeout=60) response.raise_for_status() - + # Search while downloading (more efficient for large files) found = False chunk_size = 8192 - buffer = b'' - search_bytes = search_string.encode('utf-8') - - with open(file_path, 'wb') as f: + buffer = b"" + search_bytes = search_string.encode("utf-8") + + with open(file_path, "wb") as f: for chunk in response.iter_content(chunk_size=chunk_size): f.write(chunk) - + # Search in overlapping buffer to catch strings across chunk boundaries buffer += chunk if search_bytes in buffer: found = True # Continue downloading but we know we found it - + # Keep last part of buffer for overlap check if len(buffer) > len(search_bytes) * 2: - buffer = buffer[-(len(search_bytes) * 2):] - + buffer = buffer[-(len(search_bytes) * 2) :] + if found: with print_lock: print(f" ✓ FOUND '{search_string}' in {file_name}!") print(f" File saved at: {file_path}") - matches_found['count'] += 1 - matches_found['files'].append(file_name) + matches_found["count"] += 1 + matches_found["files"].append(file_name) return (True, file_name) else: with print_lock: print(f" String not found in {file_name}. Deleting...") os.remove(file_path) return (False, None) - + except requests.exceptions.RequestException as e: with print_lock: print(f" Error downloading {file_name}: {e}") @@ -113,12 +114,13 @@ def download_and_check_file(file_info, search_string, download_dir, idx, total): os.remove(file_path) return (False, None) + def main(): FIGSHARE_ARTICLE_ID = "7376003" # english dataset, change for other languages SEARCH_STRING = "2052702.7345.7345" DOWNLOAD_DIR = "./wikiconv_downloads" MAX_WORKERS = 10 # Adjust based on your server's bandwidth and CPU - + print("=" * 60) print("WikiConv File Finder (Parallel - Keep All Matches)") print("=" * 60) @@ -126,30 +128,30 @@ def main(): print(f"Download directory: {DOWNLOAD_DIR}") print(f"Parallel workers: {MAX_WORKERS}") print() - + # Create download directory Path(DOWNLOAD_DIR).mkdir(parents=True, exist_ok=True) - + # Get file list print("Fetching file list from Figshare...") files = get_file_list(FIGSHARE_ARTICLE_ID) - + if not files: print("No files found!") sys.exit(1) - + print(f"Found {len(files)} files.") print() - + start_time = time.time() - + # Process files in parallel START_INDEX = 1 # 1-based index, meaning skip first 88 if START_INDEX > len(files): print(f"Start index ({START_INDEX}) is beyond available files ({len(files)}). Exiting.") sys.exit(1) - files_to_process = files[START_INDEX - 1:] # slice from the 89th file onward + files_to_process = files[START_INDEX - 1 :] # slice from the 89th file onward total_files = len(files_to_process) print(f"Processing files {START_INDEX}–{len(files)} ({total_files} total)...\n") @@ -162,29 +164,31 @@ def main(): file_info, SEARCH_STRING, DOWNLOAD_DIR, - idx + START_INDEX - 1, - len(files) + idx + START_INDEX - 1, + len(files), ): file_info for idx, file_info in enumerate(files_to_process, 1) } - + # process completed tasks for future in as_completed(future_to_file): found, file_name = future.result() completed += 1 - + if completed % 50 == 0: with print_lock: - print(f"\n--- Progress: {completed}/{len(files)} files processed, {matches_found['count']} matches found ---\n") - + print( + f"\n--- Progress: {completed}/{len(files)} files processed, {matches_found['count']} matches found ---\n" + ) + elapsed = time.time() - start_time print() print("=" * 60) print(f"COMPLETED: Processed all {len(files)} files.") print(f"Matches found: {matches_found['count']}") - if matches_found['files']: + if matches_found["files"]: print(f"\nFiles containing '{SEARCH_STRING}':") - for match_file in matches_found['files']: + for match_file in matches_found["files"]: print(f" - {match_file}") else: print(f"\nSearch string '{SEARCH_STRING}' was NOT found in any file.") @@ -192,5 +196,6 @@ def main(): print(f"Average: {elapsed/len(files):.2f} seconds per file") print("=" * 60) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/datasets/wikiconv-corpus/to_jsonlist_lang.py b/datasets/wikiconv-corpus/to_jsonlist_lang.py index 9514e5fe6..3d5879c44 100644 --- a/datasets/wikiconv-corpus/to_jsonlist_lang.py +++ b/datasets/wikiconv-corpus/to_jsonlist_lang.py @@ -16,12 +16,8 @@ import time print_lock = Lock() -stats = { - 'downloaded': 0, - 'converted': 0, - 'failed': 0, - 'total_conversations': 0 -} +stats = {"downloaded": 0, "converted": 0, "failed": 0, "total_conversations": 0} + def get_file_list(figshare_article_id): """Fetch list of all files from Figshare API""" @@ -29,242 +25,239 @@ def get_file_list(figshare_article_id): all_files = [] page = 1 page_size = 100 - + try: while True: - params = {'page': page, 'page_size': page_size} + params = {"page": page, "page_size": page_size} response = requests.get(base_url, params=params) response.raise_for_status() files = response.json() - + if not files: break - + all_files.extend(files) print(f" Fetched page {page}: {len(files)} files (total: {len(all_files)})") - + if len(files) < page_size: break - + page += 1 - + return all_files except requests.exceptions.RequestException as e: print(f"Error fetching file list: {e}") sys.exit(1) + def parse_timestamp(timestamp_str): """Convert timestamp string to unix timestamp""" - formats = [ - "%Y-%m-%dT%H:%M:%SZ", - "%Y-%m-%d %H:%M:%S UTC", - "%Y-%m-%d %H:%M:%S" - ] - + formats = ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%d %H:%M:%S UTC", "%Y-%m-%d %H:%M:%S"] + timestamp_str = timestamp_str.replace(" UTC", "") - + for fmt in formats: try: dt = datetime.strptime(timestamp_str, fmt) return dt.timestamp() except: continue - + return 0.0 + def extract_page_type_and_title(page_title): """Extract page type and clean title from the full page title""" namespace_mappings = { - 'User talk:': 'user_talk', - 'User:': 'user', - 'Talk:': 'talk', - 'Wikipedia talk:': 'wikipedia_talk', - 'Wikipedia:': 'wikipedia', - 'File talk:': 'file_talk', - 'File:': 'file', - 'Template talk:': 'template_talk', - 'Template:': 'template', - 'Help talk:': 'help_talk', - 'Help:': 'help', - 'Category talk:': 'category_talk', - 'Category:': 'category', - 'Project talk:': 'project_talk', - 'Project:': 'project', - 'Συζήτηση χρήστη:': 'user_talk', - 'Συζήτηση:': 'talk', - 'Χρήστης:': 'user', + "User talk:": "user_talk", + "User:": "user", + "Talk:": "talk", + "Wikipedia talk:": "wikipedia_talk", + "Wikipedia:": "wikipedia", + "File talk:": "file_talk", + "File:": "file", + "Template talk:": "template_talk", + "Template:": "template", + "Help talk:": "help_talk", + "Help:": "help", + "Category talk:": "category_talk", + "Category:": "category", + "Project talk:": "project_talk", + "Project:": "project", + "Συζήτηση χρήστη:": "user_talk", + "Συζήτηση:": "talk", + "Χρήστης:": "user", } - + for prefix in sorted(namespace_mappings.keys(), key=len, reverse=True): if page_title.startswith(prefix): - clean_title = page_title[len(prefix):] + clean_title = page_title[len(prefix) :] page_type = namespace_mappings[prefix] return page_type, clean_title, page_title - return 'article', page_title, page_title + return "article", page_title, page_title + def process_comment(comment_data): """Process a single comment from plain text format to expected format""" - timestamp = parse_timestamp(comment_data.get('timestamp', '')) + timestamp = parse_timestamp(comment_data.get("timestamp", "")) page_type, clean_title, raw_title = extract_page_type_and_title( - comment_data.get('page_title', '') + comment_data.get("page_title", "") ) - - toxicity = float(comment_data.get('toxicity', 0.0)) - sever_toxicity = float(comment_data.get('sever_toxicity', 0.0)) - + + toxicity = float(comment_data.get("toxicity", 0.0)) + sever_toxicity = float(comment_data.get("sever_toxicity", 0.0)) + processed_comment = { - 'conversation_id': comment_data.get('conversation_id'), - 'id': comment_data.get('id'), - 'indentation': str(comment_data.get('indentation', 0)), - 'type': comment_data.get('type', 'CREATION').upper(), - 'page_id': str(comment_data.get('page_id', '')), - 'page_title': raw_title, - 'parent_id': comment_data.get('parent_id'), - 'ancestor_id': comment_data.get('ancestor_id'), - 'replyTo_id': comment_data.get('replyTo_id'), - 'rev_id': str(comment_data.get('rev_id', '')), - 'user_id': str(comment_data.get('user_id', '')), - 'user_text': comment_data.get('user_text', ''), - 'toxicity': toxicity, - 'sever_toxicity': sever_toxicity, - 'raw_text': comment_data.get('content', ''), - 'text': comment_data.get('cleaned_content', ''), - 'timestamp': timestamp, - 'is_unchanged': comment_data.get('isUnchanged', False), - 'wiki_links': [] + "conversation_id": comment_data.get("conversation_id"), + "id": comment_data.get("id"), + "indentation": str(comment_data.get("indentation", 0)), + "type": comment_data.get("type", "CREATION").upper(), + "page_id": str(comment_data.get("page_id", "")), + "page_title": raw_title, + "parent_id": comment_data.get("parent_id"), + "ancestor_id": comment_data.get("ancestor_id"), + "replyTo_id": comment_data.get("replyTo_id"), + "rev_id": str(comment_data.get("rev_id", "")), + "user_id": str(comment_data.get("user_id", "")), + "user_text": comment_data.get("user_text", ""), + "toxicity": toxicity, + "sever_toxicity": sever_toxicity, + "raw_text": comment_data.get("content", ""), + "text": comment_data.get("cleaned_content", ""), + "timestamp": timestamp, + "is_unchanged": comment_data.get("isUnchanged", False), + "wiki_links": [], } - + return processed_comment, page_type, clean_title, raw_title + def convert_to_jsonlist(input_path, output_path): """Convert a raw file to jsonlist format""" - conversations = defaultdict(lambda: { - 'comments': [], - 'authors': set(), - 'page_info': {} - }) - - with open(input_path, 'r', encoding='utf-8') as f: + conversations = defaultdict(lambda: {"comments": [], "authors": set(), "page_info": {}}) + + with open(input_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue - + try: comment_data = json.loads(line) processed_comment, page_type, clean_title, raw_title = process_comment(comment_data) - - conv_id = comment_data.get('conversation_id') + + conv_id = comment_data.get("conversation_id") if not conv_id: continue - - conversations[conv_id]['comments'].append(processed_comment) - - if not conversations[conv_id]['page_info']: - conversations[conv_id]['page_info'] = { - 'page_id': str(comment_data.get('page_id', '')), - 'page_type': page_type, - 'page_title': clean_title, - 'raw_page_title': raw_title + + conversations[conv_id]["comments"].append(processed_comment) + + if not conversations[conv_id]["page_info"]: + conversations[conv_id]["page_info"] = { + "page_id": str(comment_data.get("page_id", "")), + "page_type": page_type, + "page_title": clean_title, + "raw_page_title": raw_title, } - - if 'authors' in comment_data: - for author_str in comment_data.get('authors', []): - parts = author_str.split(':', 1) + + if "authors" in comment_data: + for author_str in comment_data.get("authors", []): + parts = author_str.split(":", 1) if len(parts) == 2: user_id, user_text = parts - conversations[conv_id]['authors'].add((user_id, user_text)) + conversations[conv_id]["authors"].add((user_id, user_text)) else: - user_id = str(comment_data.get('user_id', '')) - user_text = comment_data.get('user_text', '') + user_id = str(comment_data.get("user_id", "")) + user_text = comment_data.get("user_text", "") if user_id and user_text: - conversations[conv_id]['authors'].add((user_id, user_text)) - + conversations[conv_id]["authors"].add((user_id, user_text)) + except (json.JSONDecodeError, Exception): continue - + # sort and write convos - with open(output_path, 'w', encoding='utf-8') as f: + with open(output_path, "w", encoding="utf-8") as f: for conv_id, conv_data in sorted(conversations.items()): - if not conv_data['comments']: + if not conv_data["comments"]: continue - - creation_comments = [c for c in conv_data['comments'] if c['type'] == 'CREATION'] - other_comments = [c for c in conv_data['comments'] if c['type'] != 'CREATION'] - creation_comments.sort(key=lambda x: (x['timestamp'], x['id'])) - other_comments.sort(key=lambda x: (x['timestamp'], x['id'])) + + creation_comments = [c for c in conv_data["comments"] if c["type"] == "CREATION"] + other_comments = [c for c in conv_data["comments"] if c["type"] != "CREATION"] + creation_comments.sort(key=lambda x: (x["timestamp"], x["id"])) + other_comments.sort(key=lambda x: (x["timestamp"], x["id"])) sorted_comments = creation_comments + other_comments - + authors = [ - {'user_text': user_text, 'user_id': user_id} - for user_id, user_text in sorted(conv_data['authors']) + {"user_text": user_text, "user_id": user_id} + for user_id, user_text in sorted(conv_data["authors"]) ] - + conversation = { - 'conversation_id': conv_id, - 'page_id': conv_data['page_info']['page_id'], - 'raw_page_title': conv_data['page_info']['raw_page_title'], - 'page_type': conv_data['page_info']['page_type'], - 'page_title': conv_data['page_info']['page_title'], - 'section_title': None, - 'comments': sorted_comments, - 'authors': authors + "conversation_id": conv_id, + "page_id": conv_data["page_info"]["page_id"], + "raw_page_title": conv_data["page_info"]["raw_page_title"], + "page_type": conv_data["page_info"]["page_type"], + "page_title": conv_data["page_info"]["page_title"], + "section_title": None, + "comments": sorted_comments, + "authors": authors, } - + json.dump(conversation, f, ensure_ascii=False) - f.write('\n') - + f.write("\n") + return len(conversations) + def download_and_convert_file(file_info, raw_dir, output_dir, idx, total): """Downloads a file, converts it to JSONLIST, and then deletes the raw file.""" - file_name = file_info['name'] - file_url = file_info['download_url'] + file_name = file_info["name"] + file_url = file_info["download_url"] raw_path = os.path.join(raw_dir, file_name) - + # output filename base_name = os.path.splitext(file_name)[0] output_filename = f"{base_name}.jsonlist" output_path = os.path.join(output_dir, output_filename) - + with print_lock: print(f"[{idx}/{total}] Processing: {file_name} ({file_info['size'] / (1024*1024):.2f} MB)") - + try: # download response = requests.get(file_url, stream=True, timeout=120) response.raise_for_status() - - with open(raw_path, 'wb') as f: + + with open(raw_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) - + with print_lock: - stats['downloaded'] += 1 - + stats["downloaded"] += 1 + conv_count = convert_to_jsonlist(raw_path, output_path) - + # delete raw file after successful conversion os.remove(raw_path) - + with print_lock: - stats['converted'] += 1 - stats['total_conversations'] += conv_count + stats["converted"] += 1 + stats["total_conversations"] += conv_count print(f" ✓ Converted {file_name} → {output_filename} ({conv_count} conversations)") - + return (True, conv_count) - + except requests.exceptions.RequestException as e: with print_lock: - stats['failed'] += 1 + stats["failed"] += 1 print(f" ✗ Download error for {file_name}: {e}") if os.path.exists(raw_path): os.remove(raw_path) return (False, 0) except Exception as e: with print_lock: - stats['failed'] += 1 + stats["failed"] += 1 print(f" ✗ Processing error for {file_name}: {e}") if os.path.exists(raw_path): os.remove(raw_path) @@ -272,12 +265,13 @@ def download_and_convert_file(file_info, raw_dir, output_dir, idx, total): os.remove(output_path) return (False, 0) + def main(): FIGSHARE_ARTICLE_ID = "7376003" # English dataset id, change for other datasets RAW_DIR = "./raw_data/English" OUTPUT_DIR = "./output/English" MAX_WORKERS = 10 # adjust as needed - + print("=" * 70) print("WikiConv Download & Convert Pipeline") print("=" * 70) @@ -285,58 +279,55 @@ def main(): print(f"Output JSONLIST directory: {OUTPUT_DIR}") print(f"Parallel workers: {MAX_WORKERS}") print() - + # create directories Path(RAW_DIR).mkdir(parents=True, exist_ok=True) Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True) - + print("Fetching file list from Figshare...") files = get_file_list(FIGSHARE_ARTICLE_ID) - + if not files: print("No files found!") sys.exit(1) - + print(f"Found {len(files)} files to process.") print() - + start_time = time.time() - + # process files in parallel completed = 0 with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: future_to_file = { executor.submit( - download_and_convert_file, - file_info, - RAW_DIR, - OUTPUT_DIR, - idx, - len(files) + download_and_convert_file, file_info, RAW_DIR, OUTPUT_DIR, idx, len(files) ): file_info for idx, file_info in enumerate(files, 1) } - + for future in as_completed(future_to_file): success, conv_count = future.result() completed += 1 - + # checks every 25 files if completed % 25 == 0: with print_lock: - print(f"\n--- Progress: {completed}/{len(files)} files | " - f"Downloaded: {stats['downloaded']} | " - f"Converted: {stats['converted']} | " - f"Failed: {stats['failed']} | " - f"Conversations: {stats['total_conversations']:,} ---\n") - + print( + f"\n--- Progress: {completed}/{len(files)} files | " + f"Downloaded: {stats['downloaded']} | " + f"Converted: {stats['converted']} | " + f"Failed: {stats['failed']} | " + f"Conversations: {stats['total_conversations']:,} ---\n" + ) + elapsed = time.time() - start_time - + try: os.rmdir(RAW_DIR) except: pass - + print() print("=" * 70) print(f"PIPELINE COMPLETED") @@ -350,5 +341,6 @@ def main(): print(f"\nOutput location: {OUTPUT_DIR}") print("=" * 70) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/datasets/wikiconv-corpus/wikiconv_conversion_with_merging_04_28_20.py b/datasets/wikiconv-corpus/wikiconv_conversion_with_merging_04_28_20.py index 8460d5717..66f8ad960 100644 --- a/datasets/wikiconv-corpus/wikiconv_conversion_with_merging_04_28_20.py +++ b/datasets/wikiconv-corpus/wikiconv_conversion_with_merging_04_28_20.py @@ -2,62 +2,76 @@ import sys from matplotlib import text -sys.path.insert(0, '/home/jonathan/research/Cornell-Conversational-Analysis-Toolkit') + +sys.path.insert(0, "/home/jonathan/research/Cornell-Conversational-Analysis-Toolkit") from convokit import Corpus, Speaker, Utterance, Conversation import os from datetime import datetime, timedelta import copy import shutil + def main(): - #PARAMETERS FOR USER TO SET - data_directory_input = "/kitchen/wikiconv-convokit-processing/output/English" #directory where the files to transform into the Convokit format are kept - data_directory_intermediate = "/kitchen/wikiconv-convokit-processing/store_test_merging/"#intermediate directory where the split Convokit files are also kept - data_directory_output = "/kitchen/wikiconv-convokit-processing/final/English/" #directory to output the merged Convokit files - delete_intermediate_files = True #set to True for data_directory_intermediate to be immediately deleted; otherwise intermediate directory is also stored - utterances_before_output = 5*(10 ** 5) #number of utterances before output is generated to limit memory consumption, initalized to half a million + # PARAMETERS FOR USER TO SET + data_directory_input = "/kitchen/wikiconv-convokit-processing/output/English" # directory where the files to transform into the Convokit format are kept + data_directory_intermediate = "/kitchen/wikiconv-convokit-processing/store_test_merging/" # intermediate directory where the split Convokit files are also kept + data_directory_output = "/kitchen/wikiconv-convokit-processing/final/English/" # directory to output the merged Convokit files + delete_intermediate_files = True # set to True for data_directory_intermediate to be immediately deleted; otherwise intermediate directory is also stored + utterances_before_output = 5 * ( + 10**5 + ) # number of utterances before output is generated to limit memory consumption, initalized to half a million - #Setting up transformation script varialbes + # Setting up transformation script varialbes complete_utterance_list = [] master_conversation_level_dict = {} input_files_read = set() count_of_data_splits = 0 - #Read all files from the data input + # Read all files from the data input for input_filename in os.listdir(data_directory_input): - if ((input_filename.endswith(".jsonlist") and (not(input_filename in input_files_read)))): + if input_filename.endswith(".jsonlist") and (not (input_filename in input_files_read)): input_files_read.add(input_filename) final_input_path = os.path.join(data_directory_input, input_filename) - print (str(final_input_path)) #print the current file as an indicator of progress + print(str(final_input_path)) # print the current file as an indicator of progress - individual_utterance_list, conversation_individual_dict = create_utterances_from_individual_file(final_input_path) + ( + individual_utterance_list, + conversation_individual_dict, + ) = create_utterances_from_individual_file(final_input_path) master_conversation_level_dict.update(conversation_individual_dict) - - #Error checking - see check_for_dictionary_vals method + + # Error checking - see check_for_dictionary_vals method # for value in individual_utterance_list: # check_for_dictionary_vals(value) - #to reduce memory consumption, reset the utterance list if it gets too long + # to reduce memory consumption, reset the utterance list if it gets too long complete_utterance_list.extend(individual_utterance_list) - if (len(complete_utterance_list)> utterances_before_output): + if len(complete_utterance_list) > utterances_before_output: dictionary_of_year_lists = separate_by_year(complete_utterance_list) for key, value in dictionary_of_year_lists.items(): - set_meta_dump_corpus(value, master_conversation_level_dict, data_directory_intermediate, - 'conv_corpus_year_'+str(key)+'_data_split_'+str(count_of_data_splits)) + set_meta_dump_corpus( + value, + master_conversation_level_dict, + data_directory_intermediate, + "conv_corpus_year_" + str(key) + "_data_split_" + str(count_of_data_splits), + ) complete_utterance_list = [] master_conversation_level_dict = {} - count_of_data_splits +=1 - + count_of_data_splits += 1 - #Once all the data has been converted to a list of utterances, break up the data by the year + # Once all the data has been converted to a list of utterances, break up the data by the year dictionary_of_year_lists = separate_by_year(complete_utterance_list) for key, value in dictionary_of_year_lists.items(): - set_meta_dump_corpus(value, master_conversation_level_dict, data_directory_intermediate, 'conv_corpus_year_'+str(key)+'_data_split_'+str(count_of_data_splits)) - - - #Now merge files into the final master and remove intermediate files if neccessary + set_meta_dump_corpus( + value, + master_conversation_level_dict, + data_directory_intermediate, + "conv_corpus_year_" + str(key) + "_data_split_" + str(count_of_data_splits), + ) + + # Now merge files into the final master and remove intermediate files if neccessary merge_files(data_directory_output, data_directory_intermediate) - if (delete_intermediate_files): + if delete_intermediate_files: shutil.rmtree(data_directory_intermediate) @@ -65,7 +79,7 @@ def main(): # list_of_keys = dict_val.keys(): -# Un-comment for error checking, ensure that every value 3 in the utterance's metadata is a dictionary; +# Un-comment for error checking, ensure that every value 3 in the utterance's metadata is a dictionary; # def check_for_dictionary_vals(utterance): # modification_list = utterance.meta['modification'] # deletion_list = utterance.meta['deletion'] @@ -76,17 +90,17 @@ def main(): # # print(value) # if (not(type(value) is dict)): # print (type(value)) -# if (len(deletion_list)>0): +# if (len(deletion_list)>0): # for value in deletion_list: # if (not(type(value) is dict)): # print (type(value)) -# if (len(restoration_list)>0): +# if (len(restoration_list)>0): # for value in restoration_list: # if (not(type(value) is dict)): # print (type(value)) -#function for files to merge +# function for files to merge def merge_files(final_directory, input_directory): input_subdirectory_paths = [x[0] for x in os.walk(input_directory)] for year_x in range(1990, 2021): @@ -103,72 +117,80 @@ def search_list_for_year(lst_elements, year): matching_elements.append(file) return matching_elements + def master_corpora(final_directory, paths_lst, year): # print ('YEAR: ' + str(year) + ' list is ' + str(paths_lst)) - if (len(paths_lst)==0): + if len(paths_lst) == 0: pass - elif (len(paths_lst)== 1): - corpus_1 = Corpus(filename =paths_lst[0]) - corpus_1.dump(final_directory + 'wikiconv_corpus_merged_'+year) + elif len(paths_lst) == 1: + corpus_1 = Corpus(filename=paths_lst[0]) + corpus_1.dump(final_directory + "wikiconv_corpus_merged_" + year) else: - corpus_1 = Corpus(filename =paths_lst[0]) - corpus_2 = Corpus(filename = paths_lst[1]) + corpus_1 = Corpus(filename=paths_lst[0]) + corpus_2 = Corpus(filename=paths_lst[1]) # merged_corpus = corpus_1.merge(corpus_2) merged_corpus = Corpus.merge(corpus_1, corpus_2) - if (len(paths_lst) >2): + if len(paths_lst) > 2: for val in paths_lst[2:]: - merged_corpus = merged_corpus.merge(merged_corpus, Corpus(filename = val)) - merged_corpus.dump(final_directory + 'wikiconv_corpus_merged_'+str(year)) + merged_corpus = merged_corpus.merge(merged_corpus, Corpus(filename=val)) + merged_corpus.dump(final_directory + "wikiconv_corpus_merged_" + str(year)) + def is_empty_utterance(utterance): """check if an utterance is empty (both text and original are empty/None)""" # Check if text is empty or None - text_is_empty = not utterance.text or utterance.text.strip() == "" or utterance.text.strip() == '-' - + text_is_empty = ( + not utterance.text or utterance.text.strip() == "" or utterance.text.strip() == "-" + ) + # Check if original is empty or None # original = utterance.get('original') # original_is_empty = True - + # if original is not None: # # If original exists, check if its text is empty # original_text = getattr(original, 'text', None) # if original_text: # original_is_empty = False - + # Filter out only if BOTH are empty - if text_is_empty and utterance.meta['original'] is None: + if text_is_empty and utterance.meta["original"] is None: # print("breaK") print(utterance.text) - print(utterance.meta['original']) - return text_is_empty and utterance.meta['original'] is None + print(utterance.meta["original"]) + return text_is_empty and utterance.meta["original"] is None -def set_meta_dump_corpus(complete_utterance_list,master_conversation_level_dict, data_directory_output, corpus_name): +def set_meta_dump_corpus( + complete_utterance_list, master_conversation_level_dict, data_directory_output, corpus_name +): filtered_utterance_list = [u for u in complete_utterance_list if not is_empty_utterance(u)] - - print(f"Filtered {len(complete_utterance_list) - len(filtered_utterance_list)} empty utterances") + + print( + f"Filtered {len(complete_utterance_list) - len(filtered_utterance_list)} empty utterances" + ) print(f"Remaining utterances: {len(filtered_utterance_list)}") - + if len(filtered_utterance_list) == 0: print(f"Warning: No utterances remaining for corpus {corpus_name}") return - - conversation_corpus = Corpus(utterances = filtered_utterance_list) - #Set the conversation level meta data in the corpus + + conversation_corpus = Corpus(utterances=filtered_utterance_list) + # Set the conversation level meta data in the corpus for conversation in conversation_corpus.iter_conversations(): conversation.meta = master_conversation_level_dict[conversation.id] conversation_corpus.dump(data_directory_output + corpus_name) - # conversation_corpus = Corpus(utterances = complete_utterance_list) # #Set the conversation level meta data in the corpus # for conversation in conversation_corpus.iter_conversations(): # conversation.meta = master_conversation_level_dict[conversation.id] # conversation_corpus.dump(data_directory_output + corpus_name) -#Separate utterances into lists by year + +# Separate utterances into lists by year def separate_by_year(individual_utterance_list): different_year_timestamps = set() dictionary_of_lists = {} @@ -176,116 +198,140 @@ def separate_by_year(individual_utterance_list): for utterance in individual_utterance_list: timestamp_value = utterance.timestamp datetime_value = datetime.fromtimestamp(timestamp_value) - year_value = datetime_value.year - if (year_value in dictionary_of_lists): + year_value = datetime_value.year + if year_value in dictionary_of_lists: dictionary_of_lists[year_value].append(utterance) else: dictionary_of_lists[year_value] = [utterance] return dictionary_of_lists + def create_utterances_from_individual_file(name_of_file_to_convert): - list_of_utterances= [] + list_of_utterances = [] conversation_level_info = {} with open(name_of_file_to_convert, "r") as f: for line in f: dict_of_utterances = {} list_of_order_identification_values = [] json_val = json.loads(line) - - #conversational level data - conversation_id = json_val['conversation_id'] - - #Create the conversation meta values - page_id = json_val['page_id'] - page_title = json_val['page_title'] - page_type = json_val['page_type'] - conversation_meta_dict = {'page_id':page_id, 'page_title': page_title, 'page_type': page_type} - conversation_level_info[conversation_id] = conversation_meta_dict + # conversational level data + conversation_id = json_val["conversation_id"] + + # Create the conversation meta values + page_id = json_val["page_id"] + page_title = json_val["page_title"] + page_type = json_val["page_type"] + conversation_meta_dict = { + "page_id": page_id, + "page_title": page_title, + "page_type": page_type, + } + conversation_level_info[conversation_id] = conversation_meta_dict - #Reformat each set of conversations in the final form - comments = json_val['comments'] - if (len(comments) > 0): + # Reformat each set of conversations in the final form + comments = json_val["comments"] + if len(comments) > 0: for comment_number in range(len(comments)): comment_val = comments[comment_number] - if (comment_number == 0): + if comment_number == 0: section_header_value = True else: section_header_value = False - utterance_val, order_identification_val = reformat_comment(comment_val, section_header_value) + utterance_val, order_identification_val = reformat_comment( + comment_val, section_header_value + ) dict_of_utterances[utterance_val.id] = utterance_val list_of_order_identification_values.append(order_identification_val) - correct_order_of_comments = correct_comment_order(list_of_order_identification_values) - + correct_order_of_comments = correct_comment_order( + list_of_order_identification_values + ) final_utterance_list = final_dict(dict_of_utterances, correct_order_of_comments) - flat_utterance_list = [utterance_sub for sublist in final_utterance_list for utterance_sub in sublist] + flat_utterance_list = [ + utterance_sub for sublist in final_utterance_list for utterance_sub in sublist + ] list_of_utterances.extend(flat_utterance_list) return (list_of_utterances, conversation_level_info) -#Return the correct dictionary order + +# Return the correct dictionary order def final_dict(dict_of_utterances, correct_order_identification_values): final_list = [] - if (len(correct_order_identification_values) == 1): + if len(correct_order_identification_values) == 1: first_comment = correct_order_identification_values[0][0] - final_list.append([dict_of_utterances[first_comment['id']]]) + final_list.append([dict_of_utterances[first_comment["id"]]]) return final_list for list_of_comments in correct_order_identification_values: - #print (list_of_comments) - if (len(list_of_comments) == 1): + # print (list_of_comments) + if len(list_of_comments) == 1: first_comment = list_of_comments[0] - final_list.append([dict_of_utterances[first_comment['id']]]) + final_list.append([dict_of_utterances[first_comment["id"]]]) else: - #In all cases the original comment is the first comment in the thread (the original comment's modification/deletion/restoration will always be empty) + # In all cases the original comment is the first comment in the thread (the original comment's modification/deletion/restoration will always be empty) original_comment_order_id = list_of_comments[0] - original_utterance = dict_of_utterances[original_comment_order_id['id']] - #original_utterance.meta['original'] = original_utterance + original_utterance = dict_of_utterances[original_comment_order_id["id"]] + # original_utterance.meta['original'] = original_utterance final_list.append([original_utterance]) - #Now fill out the modification/deletion/restoration objects and add to the final list if its an addition object + # Now fill out the modification/deletion/restoration objects and add to the final list if its an addition object for x in range(1, (len(list_of_comments))): current_comment_order_id = list_of_comments[x] - current_comment_order_id_type = current_comment_order_id['type'].lower() - #print (current_comment_order_id_type) - if (current_comment_order_id_type == 'addition'): - utterance_to_append = dict_of_utterances[current_comment_order_id['id']] + current_comment_order_id_type = current_comment_order_id["type"].lower() + # print (current_comment_order_id_type) + if current_comment_order_id_type == "addition": + utterance_to_append = dict_of_utterances[current_comment_order_id["id"]] final_list.append([utterance_to_append]) for finalized_utterance_list in final_list: for utterance in finalized_utterance_list: - #print ((current_comment_order_id['parent_id'])) + # print ((current_comment_order_id['parent_id'])) # print('hiiiiiii') - if (current_comment_order_id['parent_id'] == utterance.id): + if current_comment_order_id["parent_id"] == utterance.id: original_utterance_value = copy.deepcopy(utterance) - comment_to_append = dict_of_utterances[current_comment_order_id['id']] + comment_to_append = dict_of_utterances[current_comment_order_id["id"]] utterance.meta[current_comment_order_id_type].append(comment_to_append) - if (current_comment_order_id_type == 'deletion'): + if current_comment_order_id_type == "deletion": rewrite_utterance_deletion(utterance, comment_to_append) else: rewrite_utterance_data(utterance, comment_to_append) - if (utterance.meta['original'] is None): + if utterance.meta["original"] is None: # print("hm") - utterance.meta['original'] = utterance_to_dict(original_utterance_value) - check_action_lists(utterance, current_comment_order_id, dict_of_utterances, current_comment_order_id_type) - convert_utterance_values_to_dict(utterance, current_comment_order_id, dict_of_utterances, current_comment_order_id_type) + utterance.meta["original"] = utterance_to_dict( + original_utterance_value + ) + check_action_lists( + utterance, + current_comment_order_id, + dict_of_utterances, + current_comment_order_id_type, + ) + convert_utterance_values_to_dict( + utterance, + current_comment_order_id, + dict_of_utterances, + current_comment_order_id_type, + ) # return utterance_list return final_list -#If an utterance is deleted, make the final utterance text empty and change the id, speaker, reply-to and timestamp -def rewrite_utterance_deletion (utterance, comment_to_append): + +# If an utterance is deleted, make the final utterance text empty and change the id, speaker, reply-to and timestamp +def rewrite_utterance_deletion(utterance, comment_to_append): utterance.id = comment_to_append.id utterance.speaker = comment_to_append.speaker utterance.reply_to = comment_to_append.reply_to utterance.timestamp = comment_to_append.timestamp utterance.text = " " -#If an utterance is modified, make the final utteranxe text equal to the modified text and change the id, speaker, reply-to and timestamp + +# If an utterance is modified, make the final utteranxe text equal to the modified text and change the id, speaker, reply-to and timestamp def rewrite_utterance_data(utterance, comment_to_append): utterance.id = comment_to_append.id utterance.speaker = comment_to_append.speaker @@ -296,169 +342,246 @@ def rewrite_utterance_data(utterance, comment_to_append): def create_utterance_list(list_of_comments): final_list_of_utterances = [] - for individual_comment_list in list_of_comments: + for individual_comment_list in list_of_comments: ind_com = individual_comment_list[0] - #Construct the speaker value of type Speaker Class - speaker_dict = ind_com['speaker_info'] - speaker_value = Speaker(id = speaker_dict['speaker'], meta = {'speaker_id': speaker_dict['speaker_id']}) - - #Construct the utterance value of type Utterance Class - utterance_value = Utterance(ind_com['id'], speaker_value, ind_com['root'], ind_com['reply-to'], ind_com['timestamp'], ind_com['text'], None, meta = ind_com['meta']) + # Construct the speaker value of type Speaker Class + speaker_dict = ind_com["speaker_info"] + speaker_value = Speaker( + id=speaker_dict["speaker"], meta={"speaker_id": speaker_dict["speaker_id"]} + ) + + # Construct the utterance value of type Utterance Class + utterance_value = Utterance( + ind_com["id"], + speaker_value, + ind_com["root"], + ind_com["reply-to"], + ind_com["timestamp"], + ind_com["text"], + None, + meta=ind_com["meta"], + ) final_list_of_utterances.append(utterance_value) return final_list_of_utterances -#Check within the lists -def check_action_lists(utterance, current_comment_order_id, dict_of_utterances, current_comment_order_id_type): - modification_list = utterance.meta['modification'] - deletion_list = utterance.meta['deletion'] - restoration_list = utterance.meta['restoration'] - comment_to_append = dict_of_utterances[current_comment_order_id['id']] - if (len(modification_list)>0): +# Check within the lists +def check_action_lists( + utterance, current_comment_order_id, dict_of_utterances, current_comment_order_id_type +): + modification_list = utterance.meta["modification"] + deletion_list = utterance.meta["deletion"] + restoration_list = utterance.meta["restoration"] + comment_to_append = dict_of_utterances[current_comment_order_id["id"]] + if len(modification_list) > 0: for utterance_val in modification_list: - check_id_add(utterance, utterance_val, current_comment_order_id, current_comment_order_id_type, comment_to_append) - if (len(deletion_list)>0): + check_id_add( + utterance, + utterance_val, + current_comment_order_id, + current_comment_order_id_type, + comment_to_append, + ) + if len(deletion_list) > 0: for utterance_val in deletion_list: - check_id_add(utterance, utterance_val, current_comment_order_id, current_comment_order_id_type, comment_to_append) - - if (len(restoration_list)>0): + check_id_add( + utterance, + utterance_val, + current_comment_order_id, + current_comment_order_id_type, + comment_to_append, + ) + + if len(restoration_list) > 0: for utterance_val in restoration_list: - check_id_add(utterance, utterance_val, current_comment_order_id, current_comment_order_id_type, comment_to_append) + check_id_add( + utterance, + utterance_val, + current_comment_order_id, + current_comment_order_id_type, + comment_to_append, + ) -# Convert the utterance's data into a dictionary +# Convert the utterance's data into a dictionary def utterance_to_dict(utterance_val): dict_rep = {} - if (type(utterance_val) == Utterance): - dict_rep['id'] = utterance_val.id - dict_rep['speaker'] = {'id':utterance_val.speaker.id, 'speaker_id':utterance_val.speaker.meta['speaker_id']} + if type(utterance_val) == Utterance: + dict_rep["id"] = utterance_val.id + dict_rep["speaker"] = { + "id": utterance_val.speaker.id, + "speaker_id": utterance_val.speaker.meta["speaker_id"], + } # Speaker(id = speaker, meta = {'speaker_id': speaker_id_val}) - dict_rep['root'] = utterance_val.conversation_id - dict_rep['reply_to'] = utterance_val.reply_to - dict_rep['timestamp'] = utterance_val.timestamp - dict_rep['text'] = utterance_val.text - dict_rep['meta_dict'] = utterance_val.meta + dict_rep["root"] = utterance_val.conversation_id + dict_rep["reply_to"] = utterance_val.reply_to + dict_rep["timestamp"] = utterance_val.timestamp + dict_rep["text"] = utterance_val.text + dict_rep["meta_dict"] = utterance_val.meta # print (str((utterance_val.meta))) return dict_rep else: return None -def convert_utterance_values_to_dict(utterance, current_comment_order_id, dict_of_utterances, current_comment_order_id_type): - modification_list = utterance.meta['modification'] - deletion_list = utterance.meta['deletion'] - restoration_list = utterance.meta['restoration'] + +def convert_utterance_values_to_dict( + utterance, current_comment_order_id, dict_of_utterances, current_comment_order_id_type +): + modification_list = utterance.meta["modification"] + deletion_list = utterance.meta["deletion"] + restoration_list = utterance.meta["restoration"] new_modification_list = [] new_deletion_list = [] new_restoration_list = [] - if (len(modification_list)>0): + if len(modification_list) > 0: for utterance_val in modification_list: returned_val = utterance_to_dict(utterance_val) - if (returned_val is not None): + if returned_val is not None: new_modification_list.append(returned_val) - if (len(deletion_list)>0): + if len(deletion_list) > 0: for utterance_val in deletion_list: returned_val = utterance_to_dict(utterance_val) - if (returned_val is not None): + if returned_val is not None: new_deletion_list.append(returned_val) - if (len(restoration_list)>0): + if len(restoration_list) > 0: for utterance_val in restoration_list: returned_val = utterance_to_dict(utterance_val) - if (returned_val is not None): + if returned_val is not None: new_restoration_list.append(returned_val) - utterance.meta['modification'] = new_modification_list - utterance.meta['deletion'] = new_deletion_list - utterance.meta['restoration'] = new_restoration_list + utterance.meta["modification"] = new_modification_list + utterance.meta["deletion"] = new_deletion_list + utterance.meta["restoration"] = new_restoration_list -#Since top level data is stored as an utterance and metadata as a dictionary, uniform method to get the id + +# Since top level data is stored as an utterance and metadata as a dictionary, uniform method to get the id def get_id_from_utt_or_dict(utterance_val): - #find the utterance value - if (isinstance(utterance_val, dict)): - id_val_utterance = utterance_val.get('id') + # find the utterance value + if isinstance(utterance_val, dict): + id_val_utterance = utterance_val.get("id") else: id_val_utterance = utterance_val.id return id_val_utterance -#Add the comment to the action list -def check_id_add(utterance, utterance_val, current_comment_order_id, current_comment_order_id_type, comment_to_append): - if (current_comment_order_id['parent_id'] == get_id_from_utt_or_dict(utterance_val) and check_not_in_list(utterance.meta[current_comment_order_id_type], comment_to_append)): +# Add the comment to the action list +def check_id_add( + utterance, + utterance_val, + current_comment_order_id, + current_comment_order_id_type, + comment_to_append, +): + + if current_comment_order_id["parent_id"] == get_id_from_utt_or_dict( + utterance_val + ) and check_not_in_list(utterance.meta[current_comment_order_id_type], comment_to_append): utterance.meta[current_comment_order_id_type].append(comment_to_append) - if (current_comment_order_id_type == 'deletion'): + if current_comment_order_id_type == "deletion": rewrite_utterance_deletion(utterance, comment_to_append) else: rewrite_utterance_data(utterance, comment_to_append) -#Make sure that this comment ahs not already been added + +# Make sure that this comment ahs not already been added def check_not_in_list(list_of_values, utterance): utterance_id = utterance.id timestamp_val = utterance.timestamp for value in list_of_values: - if (get_id_from_utt_or_dict(value) == utterance_id and value.timestamp == timestamp_val): + if get_id_from_utt_or_dict(value) == utterance_id and value.timestamp == timestamp_val: return False return True - -#Find the correct order of comments to display + + +# Find the correct order of comments to display def correct_comment_order(list_of_order_identification_values): correct_order = [] for value in list_of_order_identification_values: - current_type = value['type'].lower() - if (current_type == 'creation' or current_type == 'addition'): + current_type = value["type"].lower() + if current_type == "creation" or current_type == "addition": correct_order.append([value]) - elif (current_type == 'modification' or current_type == 'deletion' or current_type == 'restoration'): - id_changed = value['parent_id'] + elif ( + current_type == "modification" + or current_type == "deletion" + or current_type == "restoration" + ): + id_changed = value["parent_id"] for each_ordered_list in correct_order: - if (len(each_ordered_list) > 0): + if len(each_ordered_list) > 0: for each_comment in each_ordered_list: - if (each_comment['id'] == id_changed): + if each_comment["id"] == id_changed: each_ordered_list.append(value) return correct_order -#Create common information doc from the json val and store as an utterance +# Create common information doc from the json val and store as an utterance def reformat_comment(json_val, section_header_value): - #Top level/ required information - id_val = json_val['id'] - root_val = json_val['conversation_id'] - reply_to = json_val['replyTo_id'] - text = json_val['text'] - timestamp = json_val['timestamp'] #not required but placed in the top level - - #Access the infrmoation necessary for the speaker class - speaker = json_val['user_text'] - speaker_id_val = json_val['user_id'] - - #Construct the Speaker value of type Speaker Class - speaker_value = Speaker(id = speaker, meta = {'speaker_id': speaker_id_val}) + # Top level/ required information + id_val = json_val["id"] + root_val = json_val["conversation_id"] + reply_to = json_val["replyTo_id"] + text = json_val["text"] + timestamp = json_val["timestamp"] # not required but placed in the top level + + # Access the infrmoation necessary for the speaker class + speaker = json_val["user_text"] + speaker_id_val = json_val["user_id"] + + # Construct the Speaker value of type Speaker Class + speaker_value = Speaker(id=speaker, meta={"speaker_id": speaker_id_val}) # speaker_value = {'id':speaker; 'speaker_id':speaker_id} - #Values for the meta dictionary + # Values for the meta dictionary is_section_header = section_header_value - indentation = json_val['indentation'] - toxicity = json_val['toxicity'] - sever_toxicity = json_val['sever_toxicity'] - ancestor_id = json_val['ancestor_id'] - rev_id = json_val['rev_id'] - - #Used to identify order - parent_id = json_val['parent_id'] - type_val = json_val['type'] - - #Build the meta dict - meta_dict = {'is_section_header':is_section_header, 'indentation':indentation, 'toxicity':toxicity, 'sever_toxicity':sever_toxicity, 'ancestor_id':ancestor_id, 'rev_id':rev_id, 'parent_id':parent_id, - 'original': None, 'modification': [], 'deletion': [], 'restoration':[]} - - #Construct the utterance value of type Utterance Class - utterance_value = Utterance(id = id_val,speaker= speaker_value, conversation_id = root_val, reply_to = reply_to,timestamp = timestamp, text= text, meta = meta_dict) - order_identification = {'id':id_val, 'parent_id':parent_id, 'type':type_val, 'timestamp': timestamp} + indentation = json_val["indentation"] + toxicity = json_val["toxicity"] + sever_toxicity = json_val["sever_toxicity"] + ancestor_id = json_val["ancestor_id"] + rev_id = json_val["rev_id"] + + # Used to identify order + parent_id = json_val["parent_id"] + type_val = json_val["type"] + + # Build the meta dict + meta_dict = { + "is_section_header": is_section_header, + "indentation": indentation, + "toxicity": toxicity, + "sever_toxicity": sever_toxicity, + "ancestor_id": ancestor_id, + "rev_id": rev_id, + "parent_id": parent_id, + "original": None, + "modification": [], + "deletion": [], + "restoration": [], + } + + # Construct the utterance value of type Utterance Class + utterance_value = Utterance( + id=id_val, + speaker=speaker_value, + conversation_id=root_val, + reply_to=reply_to, + timestamp=timestamp, + text=text, + meta=meta_dict, + ) + order_identification = { + "id": id_val, + "parent_id": parent_id, + "type": type_val, + "timestamp": timestamp, + } return (utterance_value, order_identification) -if __name__ == '__main__': +if __name__ == "__main__": main()