diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 40ba833..8a86c29 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,7 +21,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest] - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] requirements-file: ["requirements.txt"] runs-on: ${{ matrix.os }} steps: diff --git a/bin/opusfilter b/bin/opusfilter index 5bb58b1..f1f9501 100755 --- a/bin/opusfilter +++ b/bin/opusfilter @@ -10,24 +10,27 @@ from opusfilter.util import yaml logging.basicConfig(level=logging.INFO) logging.getLogger('mosestokenizer.tokenizer.MosesTokenizer').setLevel(logging.WARNING) -parser = argparse.ArgumentParser(prog='opusfilter', - description='Filter OPUS bitexts') - -parser.add_argument('config', metavar='CONFIG', help='YAML configuration file') -parser.add_argument('--overwrite', '-o', help='overwrite existing output files', action='store_true') -parser.add_argument('--last', type=int, default=None, help='Last step to run') -parser.add_argument('--single', type=int, default=None, help='Run only the nth step') -parser.add_argument('--n-jobs', type=int, default=None, - help='Number of parallel jobs when running score, filter and preprocess.') - -args = parser.parse_args() - -configuration = yaml.load(open(args.config)) -if args.n_jobs is not None: - configuration['common']['default_n_jobs'] = args.n_jobs - -of = OpusFilter(configuration) -if args.single is None: - of.execute_steps(overwrite=args.overwrite, last=args.last) -else: - of.execute_step(args.single, overwrite=args.overwrite) + +if __name__ == '__main__': + + parser = argparse.ArgumentParser(prog='opusfilter', + description='Filter OPUS bitexts') + + parser.add_argument('config', metavar='CONFIG', help='YAML configuration file') + parser.add_argument('--overwrite', '-o', help='overwrite existing output files', action='store_true') + parser.add_argument('--last', type=int, default=None, help='Last step to run') + parser.add_argument('--single', type=int, default=None, help='Run only the nth step') + parser.add_argument('--n-jobs', type=int, default=None, + help='Number of parallel jobs when running score, filter and preprocess.') + + args = parser.parse_args() + + configuration = yaml.load(open(args.config)) + if args.n_jobs is not None: + configuration['common']['default_n_jobs'] = args.n_jobs + + of = OpusFilter(configuration) + if args.single is None: + of.execute_steps(overwrite=args.overwrite, last=args.last) + else: + of.execute_step(args.single, overwrite=args.overwrite) diff --git a/bin/opusfilter-autogen b/bin/opusfilter-autogen index a478f99..4178615 100644 --- a/bin/opusfilter-autogen +++ b/bin/opusfilter-autogen @@ -18,74 +18,76 @@ except OSError: logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO) -logging.getLogger('mosestokenizer.tokenizer.MosesTokenizer').setLevel(logging.WARNING) +if __name__ == '__main__': -parser = argparse.ArgumentParser( - prog='opusfilter-autogen', - description='Generate initial configuration based on parallel text data') + logging.basicConfig(level=logging.INFO) + logging.getLogger('mosestokenizer.tokenizer.MosesTokenizer').setLevel(logging.WARNING) -parser.add_argument('--files', required=True, nargs='+', metavar='TEXTFILE', help='parallel text input file(s)') -parser.add_argument('--langs', nargs='+', metavar='LANGCODE', - help='Language codes corresponding to the input files. If omitted, LanguageIDFilters will not be used.') -parser.add_argument('--scripts', nargs='+', metavar='SCRIPT', help=( - 'Alphabetic scripts (e.g. Latin) corresponding to the input files. ' - 'If omitted, CharacterScoreFilter will not be used.')) -parser.add_argument('--method', choices=['defaults', 'percentiles', 'clustering'], default='clustering', - help='Method for selecting filter thresholds (default: %(default)s)') -parser.add_argument('--sample-size', default=100000, type=int, metavar='INT', - help='Max number of sentence pairs used for data-based methods (default %(default)s)') -parser.add_argument('--noisy-percentile', default=0.001, type=float, metavar='FLOAT', - help='Proportion of the data considered to be noisy; only for percentiles method (default %(default)s)') -parser.add_argument('--clusters', '-k', default=2, type=int, metavar='INT', - help=('Number of clusters for the clustering method; try increasing if too much data is clustered ' - 'as noisy (default %(default)s)')) -parser.add_argument('--work-dir', default='work', - help='Location of the source and target files for the generated configuration (default %(default)s)') -parser.add_argument('--inter-dir', help='Save intermediate files in this directory (use a temporary directory if not given)') -parser.add_argument('--plot', metavar='PATH', default=None, type=str, - help=('Create histograms of feature data distributions and a scatter plot of the clustering; ' - 'give path to plot the PDF files to, or "-" for interactive plots; only for the clustering method')) -parser.add_argument('--list-defaults', action='store_true', help='List default filters of the method to the output and quit') -parser.add_argument('--add-filter', nargs=2, action='append', default=[], metavar=('CLASS', 'JSON'), - help=('Instead of using default filters, add a filter of CLASS with JSON parameters object ' - '("{}" for default parameters). The class name may be followed by a dot and a unique ' - 'filter identifier in order to allow multiple filters of the same class. Example: ' - '--add-filter LanguageIDFilter.cld2 \'{"id_method": "cld2"}\'')) -parser.add_argument('--overwrite', action='store_true', - help='Overwrite existing intermediate files') -parser.add_argument('-o', '--output', type=argparse.FileType('w'), - default='-', metavar='CONFIGFILE', help='Output configuration file (default %(default)s)') -args = parser.parse_args() + parser = argparse.ArgumentParser( + prog='opusfilter-autogen', + description='Generate initial configuration based on parallel text data') -filters = [(name, json.loads(jsonstr)) for name, jsonstr in args.add_filter] if args.add_filter else None + parser.add_argument('--files', required=True, nargs='+', metavar='TEXTFILE', help='parallel text input file(s)') + parser.add_argument('--langs', nargs='+', metavar='LANGCODE', + help='Language codes corresponding to the input files. If omitted, LanguageIDFilters will not be used.') + parser.add_argument('--scripts', nargs='+', metavar='SCRIPT', help=( + 'Alphabetic scripts (e.g. Latin) corresponding to the input files. ' + 'If omitted, CharacterScoreFilter will not be used.')) + parser.add_argument('--method', choices=['defaults', 'percentiles', 'clustering'], default='clustering', + help='Method for selecting filter thresholds (default: %(default)s)') + parser.add_argument('--sample-size', default=100000, type=int, metavar='INT', + help='Max number of sentence pairs used for data-based methods (default %(default)s)') + parser.add_argument('--noisy-percentile', default=0.001, type=float, metavar='FLOAT', + help='Proportion of the data considered to be noisy; only for percentiles method (default %(default)s)') + parser.add_argument('--clusters', '-k', default=2, type=int, metavar='INT', + help=('Number of clusters for the clustering method; try increasing if too much data is clustered ' + 'as noisy (default %(default)s)')) + parser.add_argument('--work-dir', default='work', + help='Location of the source and target files for the generated configuration (default %(default)s)') + parser.add_argument('--inter-dir', help='Save intermediate files in this directory (use a temporary directory if not given)') + parser.add_argument('--plot', metavar='PATH', default=None, type=str, + help=('Create histograms of feature data distributions and a scatter plot of the clustering; ' + 'give path to plot the PDF files to, or "-" for interactive plots; only for the clustering method')) + parser.add_argument('--list-defaults', action='store_true', help='List default filters of the method to the output and quit') + parser.add_argument('--add-filter', nargs=2, action='append', default=[], metavar=('CLASS', 'JSON'), + help=('Instead of using default filters, add a filter of CLASS with JSON parameters object ' + '("{}" for default parameters). The class name may be followed by a dot and a unique ' + 'filter identifier in order to allow multiple filters of the same class. Example: ' + '--add-filter LanguageIDFilter.cld2 \'{"id_method": "cld2"}\'')) + parser.add_argument('--overwrite', action='store_true', + help='Overwrite existing intermediate files') + parser.add_argument('-o', '--output', type=argparse.FileType('w'), + default='-', metavar='CONFIGFILE', help='Output configuration file (default %(default)s)') + args = parser.parse_args() -if args.method == 'clustering': - filtergen = ClusterFilters( - files=args.files, langs=args.langs, scripts=args.scripts, filters=filters, - sample_size=args.sample_size, k=args.clusters, inter_dir=args.inter_dir, overwrite=args.overwrite) -elif args.method == 'percentiles': - filtergen = PercentileFilters( - files=args.files, langs=args.langs, scripts=args.scripts, filters=filters, - excluded_percentile=args.noisy_percentile, sample_size=args.sample_size, - inter_dir=args.inter_dir, overwrite=args.overwrite) -else: - filtergen = DefaultParameterFilters(langs=args.langs, scripts=args.scripts, filters=filters) + filters = [(name, json.loads(jsonstr)) for name, jsonstr in args.add_filter] if args.add_filter else None -if args.list_defaults: - yaml.dump(filtergen.DEFAULT_FILTERS, args.output) - sys.exit(0) + if args.method == 'clustering': + filtergen = ClusterFilters( + files=args.files, langs=args.langs, scripts=args.scripts, filters=filters, + sample_size=args.sample_size, k=args.clusters, inter_dir=args.inter_dir, overwrite=args.overwrite) + elif args.method == 'percentiles': + filtergen = PercentileFilters( + files=args.files, langs=args.langs, scripts=args.scripts, filters=filters, + excluded_percentile=args.noisy_percentile, sample_size=args.sample_size, + inter_dir=args.inter_dir, overwrite=args.overwrite) + else: + filtergen = DefaultParameterFilters(langs=args.langs, scripts=args.scripts, filters=filters) -filters = filtergen.set_filter_thresholds() + if args.list_defaults: + yaml.dump(filtergen.DEFAULT_FILTERS, args.output) + sys.exit(0) -if args.method == 'clustering' and args.plot is not None: - if args.plot == '-': - filtergen.scoredata.plot(plt) - plt.show() - else: - filtergen.scoredata.plot(plt, path=args.plot) + filters = filtergen.set_filter_thresholds() + + if args.method == 'clustering' and args.plot is not None: + if args.plot == '-': + filtergen.scoredata.plot(plt) + plt.show() + else: + filtergen.scoredata.plot(plt, path=args.plot) -generator = ConfigurationGenerator( - files=[os.path.abspath(f) for f in args.files], langs=args.langs, workdir=args.work_dir) -generator.add_filter(filtergen.filters) -yaml.dump(generator.get_config(), args.output) + generator = ConfigurationGenerator( + files=[os.path.abspath(f) for f in args.files], langs=args.langs, workdir=args.work_dir) + generator.add_filter(filtergen.filters) + yaml.dump(generator.get_config(), args.output) diff --git a/bin/opusfilter-cmd b/bin/opusfilter-cmd index 21c26c7..a9464cb 100644 --- a/bin/opusfilter-cmd +++ b/bin/opusfilter-cmd @@ -38,49 +38,51 @@ def update_parameters(parameters, name, values): parameters[name].extend(values) -# Use to prevent warning from missing directory -tmpconfig = {'common': {'output_directory': '/tmp'}} - -logging.basicConfig(level=logging.INFO) -logging.getLogger('mosestokenizer.tokenizer.MosesTokenizer').setLevel(logging.WARNING) - -parser = argparse.ArgumentParser( - prog='opusfilter-cmd', description='Run single opusfilter function', allow_abbrev=False) - -parser.add_argument('function', choices=OpusFilter(tmpconfig).step_functions, help='OpusFilter function') -parser.add_argument('--overwrite', '-o', help='overwrite existing output files', action='store_true') -parser.add_argument('--outputdir', '-d', default='.', help='output directory') -parser.add_argument('--parameters', type=str, default=None, help='load parameters as a JSON object (e.g. \'{"inputs": ["all.gz"], "outputs": ["filtered.gz"]}\')') - -args, remaining = parser.parse_known_args() - -if args.parameters is None: - parameters = {} -else: - parameters = json.loads(args.parameters) - -temp = copy.copy(remaining) -name = None -values = [] -while temp: - new = temp.pop(0) - if new.startswith('--'): - if name is not None: - update_parameters(parameters, name, values) - name = new[2:].replace('-', '_') - values = [] - continue - if name is None: - raise ValueError("Could not parse remaining arguments: %s" % remaining) - values.append(json_value(new)) -update_parameters(parameters, name, values) - -configuration = { - 'common': {'output_directory': args.outputdir}, - 'steps': [{'type': args.function, 'parameters': parameters}] -} - -logger.info("Created configuration:\n\n%s", yaml_dumps(configuration)) - -of = OpusFilter(configuration) -of.execute_steps(overwrite=args.overwrite) +if __name__ == '__main__': + + # Use to prevent warning from missing directory + tmpconfig = {'common': {'output_directory': '/tmp'}} + + logging.basicConfig(level=logging.INFO) + logging.getLogger('mosestokenizer.tokenizer.MosesTokenizer').setLevel(logging.WARNING) + + parser = argparse.ArgumentParser( + prog='opusfilter-cmd', description='Run single opusfilter function', allow_abbrev=False) + + parser.add_argument('function', choices=OpusFilter(tmpconfig).step_functions, help='OpusFilter function') + parser.add_argument('--overwrite', '-o', help='overwrite existing output files', action='store_true') + parser.add_argument('--outputdir', '-d', default='.', help='output directory') + parser.add_argument('--parameters', type=str, default=None, help='load parameters as a JSON object (e.g. \'{"inputs": ["all.gz"], "outputs": ["filtered.gz"]}\')') + + args, remaining = parser.parse_known_args() + + if args.parameters is None: + parameters = {} + else: + parameters = json.loads(args.parameters) + + temp = copy.copy(remaining) + name = None + values = [] + while temp: + new = temp.pop(0) + if new.startswith('--'): + if name is not None: + update_parameters(parameters, name, values) + name = new[2:].replace('-', '_') + values = [] + continue + if name is None: + raise ValueError("Could not parse remaining arguments: %s" % remaining) + values.append(json_value(new)) + update_parameters(parameters, name, values) + + configuration = { + 'common': {'output_directory': args.outputdir}, + 'steps': [{'type': args.function, 'parameters': parameters}] + } + + logger.info("Created configuration:\n\n%s", yaml_dumps(configuration)) + + of = OpusFilter(configuration) + of.execute_steps(overwrite=args.overwrite) diff --git a/bin/opusfilter-diagram b/bin/opusfilter-diagram index 4611743..1ad2255 100644 --- a/bin/opusfilter-diagram +++ b/bin/opusfilter-diagram @@ -45,64 +45,66 @@ def get_other_params(step): if to_remove in params: del params[to_remove] return params - - -logging.basicConfig(level=logging.INFO) - -parser = argparse.ArgumentParser(prog='opusfilter-test', - description='Draw a diagram from OpusFilter configuration') - -parser.add_argument('yaml', metavar='FILE', help='YAML configuration file') -parser.add_argument('output', metavar='FILE', help='output file (rendered if does not end with .dot)') -parser.add_argument('--rankdir', default='LR', choices=['TB', 'LR'], help='graph direction (default %(default)s)') -parser.add_argument('--exclude-params', action='store_true', help='do not write step parameters') - -args = parser.parse_args() - -config = yaml.load(open(args.yaml)) - -graph = Digraph(comment=os.path.basename(args.yaml)) -graph.attr(rankdir=args.rankdir) -graph.attr('node', shape='box') - -node_outputs = {} -used_outputs = set() -sources = {} -targets = {} -for idx, step in enumerate(config['steps']): - name = str(idx) - label = step['type'] - params = get_other_params(step) - if params and not args.exclude_params: - paramstr = yaml_dumps(params) - label += '\n\n' + paramstr.replace('\n', '\l') - graph.node(name, label=label) - for fname in get_inputs(step): - if fname in node_outputs: - graph.edge(node_outputs[fname], name, label=os.path.basename(fname)) - used_outputs.add(fname) - elif fname in sources: - graph.edge(sources[fname], name) - else: - source_name = 'source_{}'.format(len(sources)) - sources[fname] = source_name - #graph.node(source_name, label=os.path.dirname(fname), shape='plaintext') - graph.node(source_name, shape='point') - graph.edge(source_name, name, label=os.path.basename(fname)) - for fname in get_outputs(step): - node_outputs[fname] = name -for fname in node_outputs: - if fname not in used_outputs: - target_name = 'target_{}'.format(len(targets)) - targets[fname] = target_name - graph.node(target_name, shape='point') - graph.edge(node_outputs[fname], target_name, label=os.path.basename(fname)) - -if args.output.endswith('.dot'): - with open(args.output, 'w') as fobj: - fobj.write(graph.source) - logger.info("Wrote %s", args.output) -else: - base, ext = os.path.splitext(args.output) - out = graph.render(filename=base, format=ext.lstrip('.'), cleanup=True, view=False) - logger.info("Wrote %s", out) + + +if __name__ == '__main__': + + logging.basicConfig(level=logging.INFO) + + parser = argparse.ArgumentParser(prog='opusfilter-test', + description='Draw a diagram from OpusFilter configuration') + + parser.add_argument('yaml', metavar='FILE', help='YAML configuration file') + parser.add_argument('output', metavar='FILE', help='output file (rendered if does not end with .dot)') + parser.add_argument('--rankdir', default='LR', choices=['TB', 'LR'], help='graph direction (default %(default)s)') + parser.add_argument('--exclude-params', action='store_true', help='do not write step parameters') + + args = parser.parse_args() + + config = yaml.load(open(args.yaml)) + + graph = Digraph(comment=os.path.basename(args.yaml)) + graph.attr(rankdir=args.rankdir) + graph.attr('node', shape='box') + + node_outputs = {} + used_outputs = set() + sources = {} + targets = {} + for idx, step in enumerate(config['steps']): + name = str(idx) + label = step['type'] + params = get_other_params(step) + if params and not args.exclude_params: + paramstr = yaml_dumps(params) + label += '\n\n' + paramstr.replace('\n', '\l') + graph.node(name, label=label) + for fname in get_inputs(step): + if fname in node_outputs: + graph.edge(node_outputs[fname], name, label=os.path.basename(fname)) + used_outputs.add(fname) + elif fname in sources: + graph.edge(sources[fname], name) + else: + source_name = 'source_{}'.format(len(sources)) + sources[fname] = source_name + #graph.node(source_name, label=os.path.dirname(fname), shape='plaintext') + graph.node(source_name, shape='point') + graph.edge(source_name, name, label=os.path.basename(fname)) + for fname in get_outputs(step): + node_outputs[fname] = name + for fname in node_outputs: + if fname not in used_outputs: + target_name = 'target_{}'.format(len(targets)) + targets[fname] = target_name + graph.node(target_name, shape='point') + graph.edge(node_outputs[fname], target_name, label=os.path.basename(fname)) + + if args.output.endswith('.dot'): + with open(args.output, 'w') as fobj: + fobj.write(graph.source) + logger.info("Wrote %s", args.output) + else: + base, ext = os.path.splitext(args.output) + out = graph.render(filename=base, format=ext.lstrip('.'), cleanup=True, view=False) + logger.info("Wrote %s", out) diff --git a/bin/opusfilter-duplicates b/bin/opusfilter-duplicates index 26843a8..311810d 100644 --- a/bin/opusfilter-duplicates +++ b/bin/opusfilter-duplicates @@ -12,74 +12,76 @@ from opusfilter.util import file_open logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO) -logging.getLogger('mosestokenizer.tokenizer.MosesTokenizer').setLevel(logging.WARNING) +if __name__ == '__main__': -parser = argparse.ArgumentParser(prog='opusfilter-duplicates', - description='Find duplicates from parallel text data using hashes and print statistics') + logging.basicConfig(level=logging.INFO) + logging.getLogger('mosestokenizer.tokenizer.MosesTokenizer').setLevel(logging.WARNING) -parser.add_argument('files', nargs='+', metavar='FILE', help='parallel text input file(s)') -parser.add_argument('--overlap', '-o', nargs='+', metavar='FILE', default=None, - help='calculate overlap with second set of input files') -parser.add_argument('--hash', type=str, default='xxh64', - help=('hash function from xxhash library, empty string ' - 'for no hashing (default "xxh64")')) -parser.add_argument('--letters-only', action='store_true', default=False, - help='remove all non-letters from intput strings before hashing') -parser.add_argument('--letter-words-only', action='store_true', default=False, - help='remove words with non-letter characters before hashing') -parser.add_argument('--lowercase', action='store_true', default=False, - help='lowercase input strings before hashing') -parser.add_argument('--tokenizers', type=str, metavar='JSON', default=None, - help=('load tokenizer specifications from a JSON list (e.g. \'[["moses", "en"], ["jieba", "zh"]]\'); ' - 'use with --letter-words-only')) + parser = argparse.ArgumentParser(prog='opusfilter-duplicates', + description='Find duplicates from parallel text data using hashes and print statistics') -args = parser.parse_args() + parser.add_argument('files', nargs='+', metavar='FILE', help='parallel text input file(s)') + parser.add_argument('--overlap', '-o', nargs='+', metavar='FILE', default=None, + help='calculate overlap with second set of input files') + parser.add_argument('--hash', type=str, default='xxh64', + help=('hash function from xxhash library, empty string ' + 'for no hashing (default "xxh64")')) + parser.add_argument('--letters-only', action='store_true', default=False, + help='remove all non-letters from intput strings before hashing') + parser.add_argument('--letter-words-only', action='store_true', default=False, + help='remove words with non-letter characters before hashing') + parser.add_argument('--lowercase', action='store_true', default=False, + help='lowercase input strings before hashing') + parser.add_argument('--tokenizers', type=str, metavar='JSON', default=None, + help=('load tokenizer specifications from a JSON list (e.g. \'[["moses", "en"], ["jieba", "zh"]]\'); ' + 'use with --letter-words-only')) -if args.overlap and len(args.overlap) != len(args.files): - raise ValueError("The number of the main and overlap input files should match") + args = parser.parse_args() -tokenizers = json.loads(args.tokenizers) if args.tokenizers else None + if args.overlap and len(args.overlap) != len(args.files): + raise ValueError("The number of the main and overlap input files should match") -hasher = SegmentHasher( - compare='all', - method=args.hash, - letters_only=args.letters_only, - letter_words_only=args.letter_words_only, - lowercase=args.lowercase, - tokenizers=tokenizers -) + tokenizers = json.loads(args.tokenizers) if args.tokenizers else None -total = 0 -counter = collections.Counter() -infs = [file_open(infile) for infile in args.files] -for lines in tqdm(zip(*infs)): - total += 1 - key = hasher.apply(lines) - counter[key] += 1 + hasher = SegmentHasher( + compare='all', + method=args.hash, + letters_only=args.letters_only, + letter_words_only=args.letter_words_only, + lowercase=args.lowercase, + tokenizers=tokenizers + ) -if args.overlap: - total2 = 0 - overlap = 0 - overlap_counter = collections.Counter() - infs = [file_open(infile) for infile in args.overlap] + total = 0 + counter = collections.Counter() + infs = [file_open(infile) for infile in args.files] for lines in tqdm(zip(*infs)): - total2 += 1 + total += 1 key = hasher.apply(lines) - if key in counter: - overlap += 1 - overlap_counter[key] += 1 - print("Total segments in 1st: {}".format(total)) - print("Total segments in 2nd: {}".format(total2)) - print("Overlapping segments: {} ({:.1f}% in 1st, {:.1f}% in 2nd)".format( - overlap, 100 * overlap / total, 100 * overlap / total2)) - print("Overlapping unique segments: {}".format(len(overlap_counter))) -else: - counts_of_counts = collections.Counter(counter.values()) - uniq = sum(counts_of_counts.values()) - print("Total segments: {}".format(total)) - print("Unique segments: {} ({:.1f}%)".format(uniq, 100 * uniq / total)) - print("Segments occurring once: {}".format(counts_of_counts[1])) - print("Average number of duplicates: {:.1f}".format( - sum((k * v) for k, v in counts_of_counts.items()) / sum(counts_of_counts.values()))) - print("Maximum number of duplicates: {}".format(max(counts_of_counts.keys()))) + counter[key] += 1 + + if args.overlap: + total2 = 0 + overlap = 0 + overlap_counter = collections.Counter() + infs = [file_open(infile) for infile in args.overlap] + for lines in tqdm(zip(*infs)): + total2 += 1 + key = hasher.apply(lines) + if key in counter: + overlap += 1 + overlap_counter[key] += 1 + print("Total segments in 1st: {}".format(total)) + print("Total segments in 2nd: {}".format(total2)) + print("Overlapping segments: {} ({:.1f}% in 1st, {:.1f}% in 2nd)".format( + overlap, 100 * overlap / total, 100 * overlap / total2)) + print("Overlapping unique segments: {}".format(len(overlap_counter))) + else: + counts_of_counts = collections.Counter(counter.values()) + uniq = sum(counts_of_counts.values()) + print("Total segments: {}".format(total)) + print("Unique segments: {} ({:.1f}%)".format(uniq, 100 * uniq / total)) + print("Segments occurring once: {}".format(counts_of_counts[1])) + print("Average number of duplicates: {:.1f}".format( + sum((k * v) for k, v in counts_of_counts.items()) / sum(counts_of_counts.values()))) + print("Maximum number of duplicates: {}".format(max(counts_of_counts.keys()))) diff --git a/bin/opusfilter-test b/bin/opusfilter-test index 2667a6d..8af5eab 100644 --- a/bin/opusfilter-test +++ b/bin/opusfilter-test @@ -22,55 +22,57 @@ def read_lines(infs): logging.basicConfig(level=logging.INFO) logging.getLogger('mosestokenizer.tokenizer.MosesTokenizer').setLevel(logging.WARNING) -parser = argparse.ArgumentParser(prog='opusfilter-test', - description='Test filters on parallel text data') - -parser.add_argument('files', nargs='+', metavar='FILE', help='parallel text input file(s)') -parser.add_argument('--yaml', metavar='FILE', help='load YAML configuration file for the filters to test') -parser.add_argument('--add', nargs=2, action='append', default=[], metavar=('CLASS', 'JSON'), - help='add filter of CLASS with JSON parameters object to the test') -parser.add_argument('--removed', metavar='FILE', help='write removed segments to JSONL file') -parser.add_argument('--scores', metavar='FILE', help='write filter scores to JSONL file') - -args = parser.parse_args() - -config = yaml.load(open(args.yaml)) if args.yaml else [] -for name, jsonstr in args.add: - config.append({name: json.loads(jsonstr)}) - -filter_pipe = FilterPipeline.from_config(config) -infs = [file_open(infile) for infile in args.files] -total = 0 -counter = collections.Counter() -logger.info("Calculating total") -for lines in read_lines(zip(*infs)): - total += 1 - -if args.removed: - removed_fobj = file_open(args.removed, 'w') - -for filter_ in filter_pipe.filters: - for inf in infs: - inf.seek(0) - name = filter_.name if filter_.name is not None else filter_.__class__.__name__ - logger.info("Testing {}".format(name)) - counter[name] = 0 - for segments in filter_.filterfalse(read_lines(zip(*infs))): - counter[name] += 1 - if args.removed: - obj = {'filter': name, 'segments': segments} - removed_fobj.write(json.dumps(obj, sort_keys=True)+'\n') - -for key, value in counter.items(): - print("{}: Removes {} / {} ({:.1f}%)".format(key, value, total, 100 * value / total)) - -if args.scores: - logger.info("Collecting scores") - for inf in infs: - inf.seek(0) - with open(args.scores, 'w') as fobj: - for score_obj in filter_pipe.score(read_lines(zip(*infs))): - fobj.write(json.dumps(score_obj, sort_keys=True)+'\n') +if __name__ == '__main__': + + parser = argparse.ArgumentParser(prog='opusfilter-test', + description='Test filters on parallel text data') + + parser.add_argument('files', nargs='+', metavar='FILE', help='parallel text input file(s)') + parser.add_argument('--yaml', metavar='FILE', help='load YAML configuration file for the filters to test') + parser.add_argument('--add', nargs=2, action='append', default=[], metavar=('CLASS', 'JSON'), + help='add filter of CLASS with JSON parameters object to the test') + parser.add_argument('--removed', metavar='FILE', help='write removed segments to JSONL file') + parser.add_argument('--scores', metavar='FILE', help='write filter scores to JSONL file') + + args = parser.parse_args() + + config = yaml.load(open(args.yaml)) if args.yaml else [] + for name, jsonstr in args.add: + config.append({name: json.loads(jsonstr)}) + + filter_pipe = FilterPipeline.from_config(config) + infs = [file_open(infile) for infile in args.files] + total = 0 + counter = collections.Counter() + logger.info("Calculating total") + for lines in read_lines(zip(*infs)): + total += 1 + + if args.removed: + removed_fobj = file_open(args.removed, 'w') + + for filter_ in filter_pipe.filters: + for inf in infs: + inf.seek(0) + name = filter_.name if filter_.name is not None else filter_.__class__.__name__ + logger.info("Testing {}".format(name)) + counter[name] = 0 + for segments in filter_.filterfalse(read_lines(zip(*infs))): + counter[name] += 1 + if args.removed: + obj = {'filter': name, 'segments': segments} + removed_fobj.write(json.dumps(obj, sort_keys=True)+'\n') + + for key, value in counter.items(): + print("{}: Removes {} / {} ({:.1f}%)".format(key, value, total, 100 * value / total)) + + if args.scores: + logger.info("Collecting scores") + for inf in infs: + inf.seek(0) + with open(args.scores, 'w') as fobj: + for score_obj in filter_pipe.score(read_lines(zip(*infs))): + fobj.write(json.dumps(score_obj, sort_keys=True)+'\n') -for inf in infs: - inf.close() + for inf in infs: + inf.close() diff --git a/opusfilter/opusfilter.py b/opusfilter/opusfilter.py index cc696be..6004495 100644 --- a/opusfilter/opusfilter.py +++ b/opusfilter/opusfilter.py @@ -70,6 +70,14 @@ def dict_set(key, value, dictionary): dictionary = dictionary[first] +def _run_parallel_task(func_name, obj, parameters, overwrite): + """Global helper function for ParallelWrapper""" + func = getattr(obj, func_name) + if hasattr(func, "__wrapped__"): + func = func.__wrapped__ + func(obj, parameters, overwrite) + + class ParallelWrapper: """Decorator for parallelizing OpusFilter steps @@ -155,6 +163,8 @@ def parallelize(self, obj, parameters, overwrite=False): in_chunked_files, out_chunked_files = self.split(infiles, outfiles, n_jobs) # run jobs in parallel sub_processes = [] + # Python>=3.14 restricts passing internal functions in multiprocessing, so pass its name instead + func_name = self.func.__name__ for intmpfiles, outtmpfiles in zip(in_chunked_files, out_chunked_files): parameters_i = copy.deepcopy(parameters) parameters_i["inputs"] = intmpfiles @@ -162,7 +172,10 @@ def parallelize(self, obj, parameters, overwrite=False): parameters_i["outputs"] = [os.path.relpath(path, obj.output_dir) for path in outtmpfiles] elif "output" in parameters: # function `score` use `output` instead of `outputs` parameters_i["output"] = os.path.relpath(outtmpfiles[0], obj.output_dir) - process = multiprocessing.Process(target=self.func, args=(obj, parameters_i, overwrite)) + process = multiprocessing.Process( + target=_run_parallel_task, + args=(func_name, obj, parameters_i, overwrite) + ) process.daemon = True process.start() sub_processes.append(process) diff --git a/tests/test_opusfilter.py b/tests/test_opusfilter.py index 32a6b22..428db05 100644 --- a/tests/test_opusfilter.py +++ b/tests/test_opusfilter.py @@ -1434,6 +1434,17 @@ def test_parallel_score(self): count_lines(os.path.join(self.tempdir, 'RF1_sents.en')) +@ParallelWrapper({'inputs', 'outputs', 'limit'}) +def _mock_parallel_func(obj, parameters, overwrite=False): + """Test function for TestParallelWrapper""" + inputs = parameters['inputs'] + outputs = parameters['outputs'] + for input_, output in zip(inputs, outputs): + input_ = os.path.join(obj.output_dir, input_) + output = os.path.join(obj.output_dir, output) + shutil.copyfile(input_, output) + + class TestParallelWrapper(unittest.TestCase): def setUp(self): self.parameters = [ @@ -1478,15 +1489,7 @@ def test_parallelize(self): mock_obj.output_dir = tempfile.mkdtemp() mock_obj.default_n_jobs = 1 mock_obj._check_extra_parameters = OpusFilter._check_extra_parameters - - @ParallelWrapper({'inputs', 'outputs', 'limit'}) - def func(self, parameters, overwrite=False): - inputs = parameters['inputs'] - outputs = parameters['outputs'] - for input_, output in zip(inputs, outputs): - input_ = os.path.join(self.output_dir, input_) - output = os.path.join(self.output_dir, output) - shutil.copyfile(input_, output) + mock_obj._mock_parallel_func = _mock_parallel_func for param in self.parameters: format = param.get('format', None) @@ -1503,7 +1506,7 @@ def func(self, parameters, overwrite=False): fin.write("{}\n".format(i)) fin.close() n_jobs = param["n_jobs"] - func(mock_obj, {'inputs': rel_inputs, 'outputs': rel_outputs, "n_jobs": n_jobs, + _mock_parallel_func(mock_obj, {'inputs': rel_inputs, 'outputs': rel_outputs, "n_jobs": n_jobs, "limit": param.get('limit', None)}, overwrite=True) for output in outputs: if param.get("limit", None) is not None: