diff --git a/pythainlp/augment/lm/__init__.py b/pythainlp/augment/lm/__init__.py index b0e19d2c7..f0265b136 100644 --- a/pythainlp/augment/lm/__init__.py +++ b/pythainlp/augment/lm/__init__.py @@ -2,13 +2,15 @@ # SPDX-FileCopyrightText: Copyright 2016-2023 PyThaiNLP Project # SPDX-License-Identifier: Apache-2.0 """ -LM +Language Models """ __all__ = [ "FastTextAug", "Thai2transformersAug", + "ThaiTextAugmenter", ] from pythainlp.augment.lm.fasttext import FastTextAug +from pythainlp.augment.lm.phayathaibert import ThaiTextAugmenter from pythainlp.augment.lm.wangchanberta import Thai2transformersAug diff --git a/pythainlp/augment/lm/phayathaibert.py b/pythainlp/augment/lm/phayathaibert.py new file mode 100644 index 000000000..47b43c219 --- /dev/null +++ b/pythainlp/augment/lm/phayathaibert.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: Copyright 2016-2023 PyThaiNLP Project +# SPDX-License-Identifier: Apache-2.0 + +from typing import List +import random +import re + +from pythainlp.phayathaibert.core import ThaiTextProcessor + + +_MODEL_NAME = "clicknext/phayathaibert" + + +class ThaiTextAugmenter: + def __init__(self,) -> None: + from transformers import (AutoTokenizer, + AutoModelForMaskedLM, + pipeline,) + self.tokenizer = AutoTokenizer.from_pretrained(_MODEL_NAME) + self.model_for_masked_lm = AutoModelForMaskedLM.from_pretrained(_MODEL_NAME) + self.model = pipeline("fill-mask", tokenizer=self.tokenizer, model=self.model_for_masked_lm) + self.processor = ThaiTextProcessor() + + def generate(self, + sample_text: str, + word_rank: int, + max_length: int = 3, + sample: bool = False + ) -> str: + sample_txt = sample_text + final_text = "" + + for j in range(max_length): + input = self.processor.preprocess(sample_txt) + if sample: + random_word_idx = random.randint(0, 4) + output = self.model(input)[random_word_idx]["sequence"] + else: + output = self.model(input)[word_rank]["sequence"] + sample_txt = output + "" + final_text = sample_txt + + gen_txt = re.sub("", "", final_text) + + return gen_txt + + def augment(self, + text: str, + num_augs: int = 3, + sample: bool = False + ) -> List[str]: + """ + Text augmentation from PhayaThaiBERT + + :param str text: Thai text + :param int num_augs: an amount of augmentation text needed as an output + :param bool sample: whether to sample the text as an output or not, \ + true if more word diversity is needed + + :return: list of text augment + :rtype: List[str] + + :Example: + :: + + from pythainlp.augment.lm import ThaiTextAugmenter + + aug = ThaiTextAugmenter() + aug.augment("ช้างมีทั้งหมด 50 ตัว บน", num_args=5) + + # output = ['ช้างมีทั้งหมด 50 ตัว บนโลกใบนี้ครับ.', + 'ช้างมีทั้งหมด 50 ตัว บนพื้นดินครับ...', + 'ช้างมีทั้งหมด 50 ตัว บนท้องฟ้าครับ...', + 'ช้างมีทั้งหมด 50 ตัว บนดวงจันทร์.‼', + 'ช้างมีทั้งหมด 50 ตัว บนเขาค่ะ😁'] + """ + MAX_NUM_AUGS = 5 + augment_list = [] + + if "" not in text: + text = text + "" + + if num_augs <= MAX_NUM_AUGS: + for rank in range(num_augs): + gen_text = self.generate(text, rank, sample=sample) + processed_text = re.sub("<_>", " ", self.processor.preprocess(gen_text)) + augment_list.append(processed_text) + + return augment_list + + raise ValueError( + f"augmentation of more than {num_augs} is exceeded the default limit: {MAX_NUM_AUGS}" + ) diff --git a/pythainlp/phayathaibert/__init__.py b/pythainlp/phayathaibert/__init__.py new file mode 100644 index 000000000..bf0f847bf --- /dev/null +++ b/pythainlp/phayathaibert/__init__.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: Copyright 2016-2023 PyThaiNLP Project +# SPDX-License-Identifier: Apache-2.0 +__all__ = [ + "NamedEntityTagger", + "PartOfSpeechTagger", + "ThaiTextAugmenter", + "ThaiTextProcessor", + "segment", +] + +from pythainlp.phayathaibert.core import ( + NamedEntityTagger, + PartOfSpeechTagger, + ThaiTextAugmenter, + ThaiTextProcessor, + segment, +) diff --git a/pythainlp/phayathaibert/core.py b/pythainlp/phayathaibert/core.py new file mode 100644 index 000000000..579d24ddc --- /dev/null +++ b/pythainlp/phayathaibert/core.py @@ -0,0 +1,387 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: Copyright 2016-2023 PyThaiNLP Project +# SPDX-License-Identifier: Apache-2.0 +from typing import Callable, Collection, List, Tuple, Union +import random +import re +import warnings + +from pythainlp.tokenize import word_tokenize +from transformers import ( + CamembertTokenizer, +) + + +_model_name = "clicknext/phayathaibert" +_tokenizer = CamembertTokenizer.from_pretrained(_model_name) + + +class ThaiTextProcessor: + def __init__(self): + self._TK_UNK, self._TK_REP, self._TK_WREP, self._TK_URL, self._TK_END = \ + " ".split() + self.SPACE_SPECIAL_TOKEN = "<_>" + + def replace_url(self, text: str) -> str: + """ + Replace url in `text` with TK_URL (https://stackoverflow.com/a/6041965) + :param str text: text to replace url + :return: text where urls are replaced + :rtype: str + :Example: + >>> replace_url("go to https://github.com") + go to + """ + URL_PATTERN = \ + r"(http|ftp|https)://([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:/~+#-]*[\w@?^=%&/~+#-])?" + return re.sub(URL_PATTERN, self._TK_URL, text) + + def rm_brackets(text: str) -> str: + """ + Remove all empty brackets and artifacts within brackets from `text`. + :param str text: text to remove useless brackets + :return: text where all useless brackets are removed + :rtype: str + :Example: + >>> rm_brackets("hey() whats[;] up{*&} man(hey)") + hey whats up man(hey) + """ + # remove empty brackets + new_line = re.sub(r"\(\)", "", text) + new_line = re.sub(r"\{\}", "", new_line) + new_line = re.sub(r"\[\]", "", new_line) + # brackets with only punctuations + new_line = re.sub(r"\([^a-zA-Z0-9ก-๙]+\)", "", new_line) + new_line = re.sub(r"\{[^a-zA-Z0-9ก-๙]+\}", "", new_line) + new_line = re.sub(r"\[[^a-zA-Z0-9ก-๙]+\]", "", new_line) + # artifiacts after ( + new_line = re.sub(r"(?<=\()[^a-zA-Z0-9ก-๙]+(?=[a-zA-Z0-9ก-๙])", "", new_line) + new_line = re.sub(r"(?<=\{)[^a-zA-Z0-9ก-๙]+(?=[a-zA-Z0-9ก-๙])", "", new_line) + new_line = re.sub(r"(?<=\[)[^a-zA-Z0-9ก-๙]+(?=[a-zA-Z0-9ก-๙])", "", new_line) + # artifacts before ) + new_line = re.sub(r"(?<=[a-zA-Z0-9ก-๙])[^a-zA-Z0-9ก-๙]+(?=\))", "", new_line) + new_line = re.sub(r"(?<=[a-zA-Z0-9ก-๙])[^a-zA-Z0-9ก-๙]+(?=\})", "", new_line) + new_line = re.sub(r"(?<=[a-zA-Z0-9ก-๙])[^a-zA-Z0-9ก-๙]+(?=\])", "", new_line) + return new_line + + def replace_newlines(text: str) -> str: + """ + Replace newlines in `text` with spaces. + :param str text: text to replace all newlines with spaces + :return: text where all newlines are replaced with spaces + :rtype: str + :Example: + >>> rm_useless_spaces("hey whats\n\nup") + hey whats up + """ + + return re.sub(r"[\n]", " ", text.strip()) + + def rm_useless_spaces(text: str) -> str: + """ + Remove multiple spaces in `text`. (code from `fastai`) + :param str text: text to replace useless spaces + :return: text where all spaces are reduced to one + :rtype: str + :Example: + >>> rm_useless_spaces("oh no") + oh no + """ + return re.sub(" {2,}", " ", text) + + def replace_spaces(text: str, space_token: str = "<_>") -> str: + """ + Replace spaces with _ + :param str text: text to replace spaces + :return: text where all spaces replaced with _ + :rtype: str + :Example: + >>> replace_spaces("oh no") + oh_no + """ + return re.sub(" ", space_token, text) + + def replace_rep_after(text: str) -> str: + """ + Replace repetitions at the character level in `text` + :param str text: input text to replace character repetition + :return: text with repetitive tokens removed. + :rtype: str + :Example: + >>> text = "กาาาาาาา" + >>> replace_rep_after(text) + 'กา' + """ + + def _replace_rep(m): + c, cc = m.groups() + return f"{c}" + + re_rep = re.compile(r"(\S)(\1{3,})") + return re_rep.sub(_replace_rep, text) + + def replace_wrep_post(self, toks: Collection[str]) -> Collection[str]: + """ + Replace reptitive words post tokenization; + fastai `replace_wrep` does not work well with Thai. + :param Collection[str] toks: list of tokens + :return: list of tokens where repetitive words are removed. + :rtype: Collection[str] + :Example: + >>> toks = ["กา", "น้ำ", "น้ำ", "น้ำ", "น้ำ"] + >>> replace_wrep_post(toks) + ['กา', 'น้ำ'] + """ + previous_word = None + rep_count = 0 + res = [] + for current_word in toks + [self._TK_END]: + if current_word == previous_word: + rep_count += 1 + elif (current_word != previous_word) & (rep_count > 0): + res += [previous_word] + rep_count = 0 + else: + res.append(previous_word) + previous_word = current_word + return res[1:] + + def remove_space(toks: Collection[str]) -> Collection[str]: + """ + Do not include space for bag-of-word models. + :param Collection[str] toks: list of tokens + :return: Collection of tokens where space tokens (" ") are filtered out + :rtype: Collection[str] + :Example: + >>> toks = ['ฉัน','เดิน',' ','กลับ','บ้าน'] + >>> remove_space(toks) + ['ฉัน','เดิน','กลับ','บ้าน'] + """ + res = [] + for t in toks: + t = t.strip() + if t: + res.append(t) + return res + + # combine them together + def preprocess( + self, + text: str, + pre_rules: Collection[Callable] = [ + rm_brackets, + replace_newlines, + rm_useless_spaces, + replace_spaces, + replace_rep_after, + ], + tok_func: Callable = word_tokenize, + ) -> str: + text = text.lower() + for rule in pre_rules: + text = rule(text) + toks = tok_func(text) + return "".join(toks) + + +class ThaiTextAugmenter: + def __init__(self) -> None: + from transformers import (AutoTokenizer, + AutoModelForMaskedLM, + pipeline,) + self.tokenizer = AutoTokenizer.from_pretrained(_model_name) + self.model_for_masked_lm = AutoModelForMaskedLM.from_pretrained(_model_name) + self.model = pipeline("fill-mask", + tokenizer=self.tokenizer, + model=self.model_for_masked_lm, + ) + self.processor = ThaiTextProcessor() + + def generate(self, + sample_text: str, + word_rank: int, + max_length: int = 3, + sample: bool = False, + ) -> str: + sample_txt = sample_text + final_text = "" + for j in range(max_length): + input = self.processor.preprocess(sample_txt) + if sample: + random_word_idx = random.randint(0, 4) + output = self.model(input)[random_word_idx]['sequence'] + else: + output = self.model(input)[word_rank]['sequence'] + sample_txt = output + "" + final_text = sample_txt + + gen_txt = re.sub("", "", final_text) + return gen_txt + + def augment(self, + text: str, + num_augs: int = 3, + sample: bool = False, + ) -> List[str]: + """ + Text Augment from phayathaibert + + :param str text: Thai text + :param int num_augs: an amount of augmentation text needed as an output + :param bool sample: whether to sample the text as an output or not,\ + true if more word diversity is needed + + :return: list of text augment + :rtype: List[str] + + :Example: + :: + + from pythainlp.augment.lm import ThaiTextAugmenter + + aug = ThaiTextAugmenter() + aug.augment("ช้างมีทั้งหมด 50 ตัว บน", num_args=5) + + # output = ['ช้างมีทั้งหมด 50 ตัว บนโลกใบนี้ครับ.', + 'ช้างมีทั้งหมด 50 ตัว บนพื้นดินครับ...', + 'ช้างมีทั้งหมด 50 ตัว บนท้องฟ้าครับ...', + 'ช้างมีทั้งหมด 50 ตัว บนดวงจันทร์.‼', + 'ช้างมีทั้งหมด 50 ตัว บนเขาค่ะ😁'] + """ + augment_list = [] + if num_augs <= 5: + for rank in range(num_augs): + gen_text = self.generate(text, + rank, + sample=sample, + ) + processed_text = re.sub("<_>", " ", self.processor.preprocess(gen_text)) + augment_list.append(processed_text) + + return augment_list + + +class PartOfSpeechTagger: + def __init__(self, model: str = "lunarlist/pos_thai_phayathai") -> None: + # Load model directly + from transformers import (AutoTokenizer, + AutoModelForTokenClassification, + ) + self.tokenizer = AutoTokenizer.from_pretrained(model) + self.model = AutoModelForTokenClassification.from_pretrained(model) + + def get_tag(self, + sentence: str, + strategy: str = 'simple' + ) -> List[List[Tuple[str, str]]]: + """ + Marks sentences with part-of-speech (POS) tags. + + :param str sentence: a list of lists of tokenized words + :return: a list of lists of tuples (word, POS tag) + :rtype: list[list[tuple[str, str]]] + + :Example: + + Labels POS for given sentence:: + + from pythainlp.phayathaibert.core import PartOfSpeechTagger + + tagger = PartOfSpeechTagger() + tagger.get_tag("แมวทำอะไรตอนห้าโมงเช้า") + # output: + # [[('แมว', 'NOUN'), ('ทําอะไร', 'VERB'), ('ตอนห้าโมงเช้า', 'NOUN')]] + """ + from transformers import TokenClassificationPipeline + pipeline = TokenClassificationPipeline(model=self.model, + tokenizer=self.tokenizer, + aggregation_strategy=strategy, + ) + outputs = pipeline(sentence) + word_tags = [[(tag['word'], tag['entity_group']) for tag in outputs]] + return word_tags + + +class NamedEntityTagger: + def __init__(self, model: str = "Pavarissy/phayathaibert-thainer") -> None: + from transformers import (AutoTokenizer, + AutoModelForTokenClassification, + ) + self.tokenizer = AutoTokenizer.from_pretrained(model) + self.model = AutoModelForTokenClassification.from_pretrained(model) + + def get_ner(self, + text: str, + tag: bool = False, + pos: bool = False, + strategy: str = "simple", + ) -> Union[List[Tuple[str, str]], List[Tuple[str, str, str]], str]: + """ + This function tags named entities in text in IOB format. + + :param str text: text in Thai to be tagged + :param bool pos: output with part-of-speech tags.\ + (phayathaibert is supported in PartOfSpeechTagger) + :return: a list of tuples associated with tokenized words, NER tags, + POS tags (if the parameter `pos` is specified as `True`), + and output HTML-like tags (if the parameter `tag` is + specified as `True`). + Otherwise, return a list of tuples associated with tokenized + words and NER tags + :rtype: Union[List[Tuple[str, str]], List[Tuple[str, str, str]], str] + :Example: + + >>> from pythainlp.phayathaibert.core import NamedEntityTagger + >>> + >>> tagger = NamedEntityTagger() + >>> tagger.get_ner("ทดสอบนายปวริศ เรืองจุติโพธิ์พานจากประเทศไทย") + [('นายปวริศ เรืองจุติโพธิ์พานจากประเทศไทย', 'PERSON'), + ('จาก', 'LOCATION'), + ('ประเทศไทย', 'LOCATION')] + >>> ner.tag("ทดสอบนายปวริศ เรืองจุติโพธิ์พานจากประเทศไทย", tag=True) + 'ทดสอบนายปวริศ เรืองจุติโพธิ์พาน\ + จากประเทศไทย' + """ + from transformers import TokenClassificationPipeline + if pos: + warnings.warn("This model doesn't support output \ + postag and It doesn't output the postag.") + sample_output = [] + tag_text_list = [] + current_pos = 0 + pipeline = TokenClassificationPipeline(model=self.model, + tokenizer=self.tokenizer, + aggregation_strategy=strategy, + ) + outputs = pipeline(text) + for token in outputs: + ner_tag = token['entity_group'] + begin_pos, end_pos = token['start'], token['end'] + if current_pos == 0: + text_tag = text[:begin_pos] + f"<{ner_tag}>" \ + + text[begin_pos:end_pos] + f"" + else: + text_tag = text[current_pos:begin_pos] + f"<{ner_tag}>" \ + + text[begin_pos:end_pos] + f"" + tag_text_list.append(text_tag) + sample_output.append((token['word'], token['entity_group'])) + current_pos = end_pos + if tag: + return str("".join(tag_text_list)) + else: + return sample_output + + +def segment(sentence: str) -> List[str]: + """ + Subword tokenize of phayathaibert, \ + sentencepiece from wangchanberta model with Vocabulary Expansion. + + :param str text: text to be tokenized + :return: list of subwords + :rtype: list[str] + """ + if not sentence or not isinstance(sentence, str): + return [] + + return _tokenizer.tokenize(sentence) diff --git a/pythainlp/tag/named_entity.py b/pythainlp/tag/named_entity.py index 109e0faa1..88b290f29 100644 --- a/pythainlp/tag/named_entity.py +++ b/pythainlp/tag/named_entity.py @@ -46,6 +46,10 @@ def load_engine(self, engine: str, corpus: str) -> None: from pythainlp.wangchanberta import ThaiNameTagger self.engine = ThaiNameTagger(dataset_name=corpus) + elif engine == "phayathaibert" and corpus == "thainer-v2": + from pythainlp.phayathaibert.core import NamedEntityTagger + + self.engine = NamedEntityTagger() else: raise ValueError( "NER class not support {0} engine or {1} corpus.".format( @@ -53,9 +57,11 @@ def load_engine(self, engine: str, corpus: str) -> None: ) ) - def tag( - self, text, pos=False, tag=False - ) -> Union[List[Tuple[str, str]], List[Tuple[str, str, str]], str]: + def tag(self, + text, + pos=False, + tag=False + ) -> Union[List[Tuple[str, str]], List[Tuple[str, str, str]], str]: """ This function tags named entities in text in IOB format. diff --git a/pythainlp/tag/pos_tag.py b/pythainlp/tag/pos_tag.py index 7d345198f..a195f8b69 100644 --- a/pythainlp/tag/pos_tag.py +++ b/pythainlp/tag/pos_tag.py @@ -179,6 +179,8 @@ def pos_tag_transformers( :param str engine: * *bert* - BERT: Bidirectional Encoder Representations from Transformers (default) * *wangchanberta* - fine-tuned version of airesearch/wangchanberta-base-att-spm-uncased on pud corpus (support PUD cotpus only) + * *phayathaibert* - fine-tuned version of clicknext/phayathaibert \ + on blackboard corpus (support blackboard cotpus only) * *mdeberta* - mDeBERTa: Multilingual Decoding-enhanced BERT with disentangled attention (support PUD corpus only) :param str corpus: the corpus that is used to create the language model for tagger * *blackboard* - `blackboard treebank (support bert engine only) `_ @@ -216,6 +218,7 @@ def pos_tag_transformers( _blackboard_support_engine = { "bert": "lunarlist/pos_thai", + "phayathai": "lunarlist/pos_thai_phayathai", } _pud_support_engine = { @@ -238,9 +241,10 @@ def pos_tag_transformers( ) ) - pipeline = TokenClassificationPipeline( - model=model, tokenizer=tokenizer, aggregation_strategy="simple" - ) + pipeline = TokenClassificationPipeline(model=model, + tokenizer=tokenizer, + aggregation_strategy="simple", + ) outputs = pipeline(sentence) word_tags = [[(tag["word"], tag["entity_group"]) for tag in outputs]] diff --git a/pythainlp/tokenize/core.py b/pythainlp/tokenize/core.py index 71fd3eeb7..250c5fb8a 100644 --- a/pythainlp/tokenize/core.py +++ b/pythainlp/tokenize/core.py @@ -613,6 +613,8 @@ def subword_tokenize( from pythainlp.tokenize.tltk import syllable_tokenize as segment elif engine == "han_solo": from pythainlp.tokenize.han_solo import segment + elif engine == "phayathai": + from pythainlp.phayathaibert import segment else: raise ValueError( f"""Tokenizer \"{engine}\" not found. diff --git a/tests/test_augment.py b/tests/test_augment.py index 82cb2e244..48714e041 100644 --- a/tests/test_augment.py +++ b/tests/test_augment.py @@ -7,6 +7,7 @@ from pythainlp.augment import WordNetAug from pythainlp.augment.wordnet import postype2wordnet # from pythainlp.augment.lm import Thai2transformersAug +# from pythainlp.augment.lm.phayathaibert import ThaiTextAugmenter from pythainlp.augment.word2vec.bpemb_wv import BPEmbAug from pythainlp.augment.word2vec import ( LTW2VAug @@ -45,3 +46,7 @@ def test_LTW2VAug(self): # def test_Thai2transformersAug(self): # _aug = Thai2transformersAug() # self.assertIsNotNone(_aug.augment(self.text2, num_replace_tokens=1)) + + # def test_ThaiTextAugmenter(self): + # _aug = ThaiTextAugmenter() + # self.assertIsNotNone(_aug.augment(self.text2, num__augs=3)) diff --git a/tests/test_tokenize.py b/tests/test_tokenize.py index 431bfc0e0..84a200c74 100644 --- a/tests/test_tokenize.py +++ b/tests/test_tokenize.py @@ -427,6 +427,18 @@ def test_subword_tokenize(self): "า" in subword_tokenize("สวัสดีดาวอังคาร", engine="tltk") ) self.assertIsInstance(subword_tokenize("โควิด19", engine="tltk"), list) + + self.assertEqual(subword_tokenize(None, engine="phayathai"), []) + self.assertEqual(subword_tokenize("", engine="phayathai"), []) + self.assertIsInstance( + subword_tokenize("สวัสดิีดาวอังคาร", engine="phayathai"), list + ) + self.assertFalse( + "า" in subword_tokenize("สวัสดีดาวอังคาร", engine="phayathai") + ) + self.assertIsInstance( + subword_tokenize("โควิด19", engine="phayathai"), list + ) with self.assertRaises(ValueError): subword_tokenize("นกแก้ว", engine="XX") # engine does not exist