|
| 1 | +import io |
| 2 | +import logging |
| 3 | +import os |
| 4 | + |
| 5 | +import pydantic |
| 6 | +import requests |
| 7 | +from welearn_database.data.models import WeLearnDocument |
| 8 | +from welearn_database.modules.text_cleaning import clean_text |
| 9 | + |
| 10 | +from welearn_datastack import constants |
| 11 | +from welearn_datastack.constants import AUTHORIZED_LICENSES, HEADERS |
| 12 | +from welearn_datastack.data.db_wrapper import WrapperRetrieveDocument |
| 13 | +from welearn_datastack.data.details_dataclass.author import AuthorDetails |
| 14 | +from welearn_datastack.data.source_models.fao_open_knowledge import Bundle, Item |
| 15 | +from welearn_datastack.exceptions import ( |
| 16 | + NoContent, |
| 17 | + NoDescriptionFoundError, |
| 18 | + PDFFileSizeExceedLimit, |
| 19 | + UnauthorizedLicense, |
| 20 | +) |
| 21 | +from welearn_datastack.modules.pdf_extractor import ( |
| 22 | + delete_accents, |
| 23 | + delete_non_printable_character, |
| 24 | + extract_txt_from_pdf_with_tika, |
| 25 | + remove_hyphens, |
| 26 | + replace_ligatures, |
| 27 | +) |
| 28 | +from welearn_datastack.plugins.interface import IPluginRESTCollector |
| 29 | +from welearn_datastack.utils_.http_client_utils import ( |
| 30 | + get_http_code_from_exception, |
| 31 | + get_new_https_session, |
| 32 | +) |
| 33 | +from welearn_datastack.utils_.scraping_utils import ( |
| 34 | + format_cc_license, |
| 35 | + remove_extra_whitespace, |
| 36 | +) |
| 37 | + |
| 38 | +logger = logging.getLogger(__name__) |
| 39 | + |
| 40 | + |
| 41 | +# Collector |
| 42 | +class FAOOpenKnowledgeCollector(IPluginRESTCollector): |
| 43 | + related_corpus = "fao-open-knowledge" |
| 44 | + |
| 45 | + def __init__(self) -> None: |
| 46 | + self.corpus_name = self.related_corpus |
| 47 | + self.corpus_fix = True |
| 48 | + self.pdf_size_page_limit: int = int(os.getenv("PDF_SIZE_PAGE_LIMIT", 100000)) |
| 49 | + self.tika_address = os.getenv("TIKA_ADDRESS", "http://localhost:9998") |
| 50 | + |
| 51 | + self.api_base_url = "https://openknowledge.fao.org/server/api/" |
| 52 | + self.application_base_url = "https://openknowledge.fao.org/" |
| 53 | + self.headers = constants.HEADERS |
| 54 | + self.pdf_size_file_limit: int = int(os.getenv("PDF_SIZE_FILE_LIMIT", 2000000)) |
| 55 | + |
| 56 | + def _get_pdf_content(self, url: str) -> str: |
| 57 | + logger.info("Getting PDF content from %s", url) |
| 58 | + client = get_new_https_session(retry_total=0) |
| 59 | + |
| 60 | + if self.pdf_size_file_limit and self.pdf_size_file_limit < 0: |
| 61 | + raise ValueError( |
| 62 | + f"file_size_limit must be positive : {self.pdf_size_file_limit}" |
| 63 | + ) |
| 64 | + |
| 65 | + if self.pdf_size_file_limit: |
| 66 | + resp_head = client.head( |
| 67 | + url, headers=HEADERS, allow_redirects=True, timeout=30 |
| 68 | + ) |
| 69 | + try: |
| 70 | + content_length = int(resp_head.headers.get("content-length")) |
| 71 | + logger.info(f"PDF size is {content_length}") |
| 72 | + except ValueError: |
| 73 | + raise ValueError(f"Cannot retrieved this pdf size : {url}") |
| 74 | + |
| 75 | + if content_length > self.pdf_size_file_limit: |
| 76 | + raise PDFFileSizeExceedLimit( |
| 77 | + f"File size is {content_length} and limit is {self.pdf_size_file_limit}" |
| 78 | + ) |
| 79 | + |
| 80 | + response = client.get(url, headers=HEADERS, timeout=300) |
| 81 | + response.raise_for_status() |
| 82 | + |
| 83 | + with io.BytesIO(response.content) as pdf_file: |
| 84 | + pdf_content = extract_txt_from_pdf_with_tika( |
| 85 | + pdf_content=pdf_file, tika_base_url=self.tika_address |
| 86 | + ) |
| 87 | + |
| 88 | + # Delete non printable characters |
| 89 | + pdf_content = [ |
| 90 | + [delete_non_printable_character(word) for word in page] |
| 91 | + for page in pdf_content |
| 92 | + ] |
| 93 | + |
| 94 | + pages = [] |
| 95 | + for content in pdf_content: |
| 96 | + page_text = " ".join(content) |
| 97 | + page_text = replace_ligatures(page_text) |
| 98 | + page_text = remove_hyphens(page_text) |
| 99 | + page_text = delete_accents(page_text) |
| 100 | + |
| 101 | + pages.append(page_text) |
| 102 | + ret = remove_extra_whitespace(" ".join(pages)) |
| 103 | + |
| 104 | + return ret |
| 105 | + |
| 106 | + def _clean_txt_content(self, content: str) -> str: |
| 107 | + return clean_text(content) |
| 108 | + |
| 109 | + @staticmethod |
| 110 | + def _extract_licence(fao_item: Item) -> str: |
| 111 | + md_item = fao_item.metadata.get("dc.rights.license", None) |
| 112 | + |
| 113 | + if not md_item: |
| 114 | + raise UnauthorizedLicense("No license found.") |
| 115 | + |
| 116 | + try: |
| 117 | + messy_licence = md_item[0]["value"] |
| 118 | + except (KeyError, IndexError, TypeError): |
| 119 | + raise UnauthorizedLicense("No license found.") |
| 120 | + |
| 121 | + return format_cc_license(messy_licence.replace(" ", "-")) |
| 122 | + |
| 123 | + @staticmethod |
| 124 | + def _extract_authors(uved_document: UVEDMemberItem) -> list[AuthorDetails]: |
| 125 | + ret: list[AuthorDetails] = [] |
| 126 | + for contributor in uved_document.contributor: |
| 127 | + ret.append( |
| 128 | + AuthorDetails( |
| 129 | + name=f"{contributor.firstName} {contributor.lastName}", misc="" |
| 130 | + ) |
| 131 | + ) |
| 132 | + return ret |
| 133 | + |
| 134 | + @staticmethod |
| 135 | + def _check_licence_authorization(_license: str) -> None: |
| 136 | + if _license not in AUTHORIZED_LICENSES: |
| 137 | + raise UnauthorizedLicense(f"License '{_license}' is not authorized.") |
| 138 | + |
| 139 | + def get_metadata_json(self, document: WeLearnDocument) -> Item: |
| 140 | + session = get_new_https_session() |
| 141 | + item_url = f"{self.api_base_url}core/items/{document.external_id}" |
| 142 | + resp = session.get(url=item_url, headers=self.headers) |
| 143 | + resp.raise_for_status() |
| 144 | + |
| 145 | + item_json = Item.model_validate(resp.json()) |
| 146 | + return item_json |
| 147 | + |
| 148 | + def get_bundle_json(self, document: WeLearnDocument) -> list[Bundle]: |
| 149 | + session = get_new_https_session() |
| 150 | + bundle_url = f"{self.api_base_url}core/items/{document.external_id}/bundles" |
| 151 | + resp = session.get(url=bundle_url, headers=self.headers) |
| 152 | + resp.raise_for_status() |
| 153 | + |
| 154 | + bundle_json = [Bundle.model_validate(b) for b in resp.json()["_embedded"]] |
| 155 | + return bundle_json |
| 156 | + |
| 157 | + @staticmethod |
| 158 | + def extract_bitstream_id(bundles: list[Bundle]) -> str | None: |
| 159 | + for bundle in bundles: |
| 160 | + if bundle.name == "ORIGINAL": |
| 161 | + return bundle.uuid |
| 162 | + return None |
| 163 | + |
| 164 | + def run(self, documents: list[WeLearnDocument]) -> list[WrapperRetrieveDocument]: |
| 165 | + ret: list[WrapperRetrieveDocument] = [] |
| 166 | + for document in documents: |
| 167 | + try: |
| 168 | + fao_ok_metadata = self.get_metadata_json(document) |
| 169 | + self._check_licence_authorization( |
| 170 | + self._extract_licence(fao_ok_metadata) |
| 171 | + ) |
| 172 | + bundle_json = self.get_bundle_json(document) |
| 173 | + pdf_id = self.extract_bitstream_id(bundle_json) |
| 174 | + if not pdf_id: |
| 175 | + raise NoContent("No PDF bitstream found.") |
| 176 | + pdf_url = f"{self.api_base_url}core/bitstreams/{pdf_id}/content" |
| 177 | + pdf_content = self._get_pdf_content(pdf_url) |
| 178 | + if not pdf_content or pdf_content.isspace(): |
| 179 | + raise NoContent("No content extracted from PDF.") |
| 180 | + document.full_content = self._clean_txt_content(pdf_content) |
| 181 | + try: |
| 182 | + description = fao_ok_metadata.metadata.get( |
| 183 | + "dc.description.abstract" |
| 184 | + )[0].get("value", "") |
| 185 | + except (AttributeError, IndexError, KeyError, TypeError): |
| 186 | + raise NoDescriptionFoundError("No description found.") |
| 187 | + if not description or description.isspace(): |
| 188 | + raise NoDescriptionFoundError("No description found.") |
| 189 | + document.description = clean_text(description) |
| 190 | + document.title = fao_ok_metadata.name |
| 191 | + |
| 192 | + except UnauthorizedLicense as e: |
| 193 | + logger.warning( |
| 194 | + f"Document {document.url} skipped due to unauthorized license: {e}" |
| 195 | + ) |
| 196 | + ret.append( |
| 197 | + WrapperRetrieveDocument( |
| 198 | + document=document, |
| 199 | + error_info=f"From Document Hub Collector, unauthorized license: {e}", |
| 200 | + ) |
| 201 | + ) |
| 202 | + continue |
| 203 | + except pydantic.ValidationError as e: |
| 204 | + logger.error( |
| 205 | + f"Document {document.url} skipped due to validation error: {e}" |
| 206 | + ) |
| 207 | + ret.append( |
| 208 | + WrapperRetrieveDocument( |
| 209 | + document=document, |
| 210 | + error_info=f"From Document Hub Collector, validation error: {e}", |
| 211 | + ) |
| 212 | + ) |
| 213 | + continue |
| 214 | + except requests.HTTPError as e: |
| 215 | + http_code = get_http_code_from_exception(e) |
| 216 | + logger.error( |
| 217 | + f"Document {document.url} skipped due to HTTP error {http_code}: {e}" |
| 218 | + ) |
| 219 | + ret.append( |
| 220 | + WrapperRetrieveDocument( |
| 221 | + document=document, |
| 222 | + error_info=f"From Document Hub Collector, HTTP error {http_code}: {e}", |
| 223 | + ) |
| 224 | + ) |
| 225 | + continue |
| 226 | + return ret |
0 commit comments