diff --git a/pyproject.toml b/pyproject.toml index 9a78ccfb..83025be7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -226,10 +226,6 @@ select = [ ] ignore = [ "ANN401", # Dynamically typed expressions (typing.Any) are disallowed - "ASYNC210", # TODO Async functions should not call blocking HTTP methods - "ASYNC230", # TODO Async functions should not open files with blocking methods like `open` - "ASYNC240", # TODO Async functions should not use os.path methods, use trio.Path or anyio.path - "ASYNC250", # TODO Blocking call to input() in async context "COM812", # Trailing comma missing "D1", # Missing docstring in ... "D200", # One-line docstring should fit on one line diff --git a/src/kleinanzeigen_bot/__init__.py b/src/kleinanzeigen_bot/__init__.py index 3275afe9..63452de6 100644 --- a/src/kleinanzeigen_bot/__init__.py +++ b/src/kleinanzeigen_bot/__init__.py @@ -937,7 +937,7 @@ async def publish_ad(self, ad_file:str, ad_cfg:Ad, ad_cfg_orig:dict[str, Any], p LOG.warning("# Payment form detected! Please proceed with payment.") LOG.warning("############################################") await self.web_scroll_page_down() - input(_("Press a key to continue...")) + await ainput(_("Press a key to continue...")) except TimeoutError: pass @@ -1108,7 +1108,7 @@ async def __set_shipping(self, ad_cfg:Ad, mode:AdUpdateStrategy = AdUpdateStrate # in some categories we need to go another dialog back try: await self.web_find(By.XPATH, '//dialog//button[contains(., "Andere Versandmethoden")]', - timeout=short_timeout) + timeout = short_timeout) except TimeoutError: await self.web_click(By.XPATH, '//dialog//button[contains(., "Zurück")]') diff --git a/src/kleinanzeigen_bot/extract.py b/src/kleinanzeigen_bot/extract.py index da617709..88280c98 100644 --- a/src/kleinanzeigen_bot/extract.py +++ b/src/kleinanzeigen_bot/extract.py @@ -1,11 +1,14 @@ # SPDX-FileCopyrightText: © Sebastian Thomschke and contributors # SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ +import asyncio from gettext import gettext as _ -import json, mimetypes, os, re, shutil # isort: skip +import json, mimetypes, re, shutil # isort: skip +import urllib.error as urllib_error import urllib.request as urllib_request from datetime import datetime +from pathlib import Path from typing import Any, Final from kleinanzeigen_bot.model.ad_model import ContactPartial @@ -25,6 +28,28 @@ BREADCRUMB_RE = re.compile(r"/c(\d+)") +def _path_exists(path:Path | str) -> bool: + """Helper for Path.exists() that can be mocked in tests.""" + return Path(path).exists() + + +def _path_is_dir(path:Path | str) -> bool: + """Helper for Path.is_dir() that can be mocked in tests.""" + return Path(path).is_dir() + + +async def _exists(path:Path | str) -> bool: + result = await asyncio.get_running_loop().run_in_executor(None, _path_exists, path) + LOG.debug("Path exists check: %s -> %s", path, result) + return result + + +async def _isdir(path:Path | str) -> bool: + result = await asyncio.get_running_loop().run_in_executor(None, _path_is_dir, path) + LOG.debug("Path is_dir check: %s -> %s", path, result) + return result + + class AdExtractor(WebScrapingMixin): """ Wrapper class for ad extraction that uses an active bot´s browser session to extract specific elements from an ad page. @@ -44,23 +69,39 @@ async def download_ad(self, ad_id:int) -> None: """ # create sub-directory for ad(s) to download (if necessary): - relative_directory = "downloaded-ads" - # make sure configured base directory exists - if not os.path.exists(relative_directory) or not os.path.isdir(relative_directory): - os.mkdir(relative_directory) - LOG.info("Created ads directory at ./%s.", relative_directory) + relative_directory = Path("downloaded-ads") + # make sure configured base directory exists (using exist_ok=True to avoid TOCTOU race) + await asyncio.get_running_loop().run_in_executor(None, lambda: relative_directory.mkdir(exist_ok = True)) # noqa: ASYNC240 + LOG.info("Ensured ads directory exists at ./%s.", relative_directory) # Extract ad info and determine final directory path ad_cfg, final_dir = await self._extract_ad_page_info_with_directory_handling( relative_directory, ad_id ) - # Save the ad configuration file - ad_file_path = final_dir + "/" + f"ad_{ad_id}.yaml" - dicts.save_dict( - ad_file_path, - ad_cfg.model_dump(), - header = "# yaml-language-server: $schema=https://raw.githubusercontent.com/Second-Hand-Friends/kleinanzeigen-bot/refs/heads/main/schemas/ad.schema.json") + # Save the ad configuration file (offload to executor to avoid blocking the event loop) + ad_file_path = str(Path(final_dir) / f"ad_{ad_id}.yaml") + header_string = "# yaml-language-server: $schema=https://raw.githubusercontent.com/Second-Hand-Friends/kleinanzeigen-bot/refs/heads/main/schemas/ad.schema.json" + await asyncio.get_running_loop().run_in_executor( + None, + lambda: dicts.save_dict(ad_file_path, ad_cfg.model_dump(), header = header_string) + ) + + @staticmethod + def _download_and_save_image_sync(url:str, directory:str, filename_prefix:str, img_nr:int) -> str | None: + try: + with urllib_request.urlopen(url) as response: # noqa: S310 Audit URL open for permitted schemes. + content_type = response.info().get_content_type() + file_ending = mimetypes.guess_extension(content_type) or "" + # Use pathlib.Path for OS-agnostic path handling + img_path = Path(directory) / f"{filename_prefix}{img_nr}{file_ending}" + with open(img_path, "wb") as f: + shutil.copyfileobj(response, f) + return str(img_path) + except (urllib_error.URLError, urllib_error.HTTPError, OSError, shutil.Error) as e: + # Narrow exception handling to expected network/filesystem errors + LOG.warning("Failed to download image %s: %s", url, e) + return None async def _download_images_from_ad_page(self, directory:str, ad_id:int) -> list[str]: """ @@ -85,19 +126,26 @@ async def _download_images_from_ad_page(self, directory:str, ad_id:int) -> list[ img_nr = 1 dl_counter = 0 + loop = asyncio.get_running_loop() + for img_element in images: current_img_url = img_element.attrs["src"] # URL of the image if current_img_url is None: continue - with urllib_request.urlopen(str(current_img_url)) as response: # noqa: S310 Audit URL open for permitted schemes. - content_type = response.info().get_content_type() - file_ending = mimetypes.guess_extension(content_type) - img_path = f"{directory}/{img_fn_prefix}{img_nr}{file_ending}" - with open(img_path, "wb") as f: - shutil.copyfileobj(response, f) + img_path = await loop.run_in_executor( + None, + self._download_and_save_image_sync, + str(current_img_url), + directory, + img_fn_prefix, + img_nr + ) + + if img_path: dl_counter += 1 - img_paths.append(img_path.rsplit("/", maxsplit = 1)[-1]) + # Use pathlib.Path for OS-agnostic path handling + img_paths.append(Path(img_path).name) img_nr += 1 LOG.info("Downloaded %s.", i18n.pluralize("image", dl_counter)) @@ -354,8 +402,8 @@ async def _extract_ad_page_info(self, directory:str, ad_id:int) -> AdPartial: return ad_cfg async def _extract_ad_page_info_with_directory_handling( - self, relative_directory:str, ad_id:int - ) -> tuple[AdPartial, str]: + self, relative_directory:Path, ad_id:int + ) -> tuple[AdPartial, Path]: """ Extracts ad information and handles directory creation/renaming. @@ -373,32 +421,37 @@ async def _extract_ad_page_info_with_directory_handling( # Determine the final directory path sanitized_title = misc.sanitize_folder_name(title, self.config.download.folder_name_max_length) - final_dir = os.path.join(relative_directory, f"ad_{ad_id}_{sanitized_title}") - temp_dir = os.path.join(relative_directory, f"ad_{ad_id}") + final_dir = relative_directory / f"ad_{ad_id}_{sanitized_title}" + temp_dir = relative_directory / f"ad_{ad_id}" + + loop = asyncio.get_running_loop() # Handle existing directories - if os.path.exists(final_dir): + if await _exists(final_dir): # If the folder with title already exists, delete it LOG.info("Deleting current folder of ad %s...", ad_id) - shutil.rmtree(final_dir) + LOG.debug("Removing directory tree: %s", final_dir) + await loop.run_in_executor(None, shutil.rmtree, str(final_dir)) - if os.path.exists(temp_dir): + if await _exists(temp_dir): if self.config.download.rename_existing_folders: # Rename the old folder to the new name with title LOG.info("Renaming folder from %s to %s for ad %s...", - os.path.basename(temp_dir), os.path.basename(final_dir), ad_id) - os.rename(temp_dir, final_dir) + temp_dir.name, final_dir.name, ad_id) + LOG.debug("Renaming: %s -> %s", temp_dir, final_dir) + await loop.run_in_executor(None, temp_dir.rename, final_dir) else: # Use the existing folder without renaming final_dir = temp_dir LOG.info("Using existing folder for ad %s at %s.", ad_id, final_dir) else: # Create new directory with title - os.mkdir(final_dir) + LOG.debug("Creating new directory: %s", final_dir) + await loop.run_in_executor(None, final_dir.mkdir) LOG.info("New directory for ad created at %s.", final_dir) # Now extract complete ad info (including images) to the final directory - ad_cfg = await self._extract_ad_page_info(final_dir, ad_id) + ad_cfg = await self._extract_ad_page_info(str(final_dir), ad_id) return ad_cfg, final_dir diff --git a/src/kleinanzeigen_bot/resources/translations.de.yaml b/src/kleinanzeigen_bot/resources/translations.de.yaml index c7cf78f2..0615c911 100644 --- a/src/kleinanzeigen_bot/resources/translations.de.yaml +++ b/src/kleinanzeigen_bot/resources/translations.de.yaml @@ -173,7 +173,10 @@ kleinanzeigen_bot/__init__.py: kleinanzeigen_bot/extract.py: ################################################# download_ad: - "Created ads directory at ./%s.": "Verzeichnis für Anzeigen erstellt unter ./%s." + "Ensured ads directory exists at ./%s.": "Verzeichnis für Anzeigen sichergestellt unter ./%s." + + _download_and_save_image_sync: + "Failed to download image %s: %s": "Fehler beim Herunterladen des Bildes %s: %s" _download_images_from_ad_page: "Found %s.": "%s gefunden." diff --git a/src/kleinanzeigen_bot/utils/web_scraping_mixin.py b/src/kleinanzeigen_bot/utils/web_scraping_mixin.py index b4aaa91c..d96af01a 100644 --- a/src/kleinanzeigen_bot/utils/web_scraping_mixin.py +++ b/src/kleinanzeigen_bot/utils/web_scraping_mixin.py @@ -100,6 +100,41 @@ def __init__(self) -> None: self.profile_name:str | None = None +def _write_initial_prefs(prefs_file:str) -> None: + with open(prefs_file, "w", encoding = "UTF-8") as fd: + json.dump({ + "credentials_enable_service": False, + "enable_do_not_track": True, + "google": { + "services": { + "consented_to_sync": False + } + }, + "profile": { + "default_content_setting_values": { + "popups": 0, + "notifications": 2 # 1 = allow, 2 = block browser notifications + }, + "password_manager_enabled": False + }, + "signin": { + "allowed": False + }, + "translate_site_blacklist": [ + "www.kleinanzeigen.de" + ], + "devtools": { + "preferences": { + "currentDockState": '"bottom"' + } + } + }, fd) + + +async def _exists(path:str) -> bool: + return await asyncio.get_running_loop().run_in_executor(None, os.path.exists, path) + + class WebScrapingMixin: def __init__(self) -> None: @@ -174,7 +209,7 @@ async def create_browser_session(self) -> None: LOG.info("Creating Browser session...") if self.browser_config.binary_location: - ensure(os.path.exists(self.browser_config.binary_location), f"Specified browser binary [{self.browser_config.binary_location}] does not exist.") + ensure(await _exists(self.browser_config.binary_location), f"Specified browser binary [{self.browser_config.binary_location}] does not exist.") else: self.browser_config.binary_location = self.get_compatible_browser() LOG.info(" -> Browser binary location: %s", self.browser_config.binary_location) @@ -289,41 +324,14 @@ async def create_browser_session(self) -> None: profile_dir = os.path.join(cfg.user_data_dir, self.browser_config.profile_name or "Default") os.makedirs(profile_dir, exist_ok = True) prefs_file = os.path.join(profile_dir, "Preferences") - if not os.path.exists(prefs_file): + if not await _exists(prefs_file): LOG.info(" -> Setting chrome prefs [%s]...", prefs_file) - with open(prefs_file, "w", encoding = "UTF-8") as fd: - json.dump({ - "credentials_enable_service": False, - "enable_do_not_track": True, - "google": { - "services": { - "consented_to_sync": False - } - }, - "profile": { - "default_content_setting_values": { - "popups": 0, - "notifications": 2 # 1 = allow, 2 = block browser notifications - }, - "password_manager_enabled": False - }, - "signin": { - "allowed": False - }, - "translate_site_blacklist": [ - "www.kleinanzeigen.de" - ], - "devtools": { - "preferences": { - "currentDockState": '"bottom"' - } - } - }, fd) + await asyncio.get_running_loop().run_in_executor(None, _write_initial_prefs, prefs_file) # load extensions for crx_extension in self.browser_config.extensions: LOG.info(" -> Adding Browser extension: [%s]", crx_extension) - ensure(os.path.exists(crx_extension), f"Configured extension-file [{crx_extension}] does not exist.") + ensure(await _exists(crx_extension), f"Configured extension-file [{crx_extension}] does not exist.") cfg.add_extension(crx_extension) try: diff --git a/tests/unit/test_extract.py b/tests/unit/test_extract.py index d91da9f2..195d2fb0 100644 --- a/tests/unit/test_extract.py +++ b/tests/unit/test_extract.py @@ -1,10 +1,12 @@ # SPDX-FileCopyrightText: © Sebastian Thomschke and contributors # SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ -import json, os # isort: skip +import json # isort: skip from gettext import gettext as _ +from pathlib import Path from typing import Any, TypedDict from unittest.mock import AsyncMock, MagicMock, call, patch +from urllib.error import URLError import pytest @@ -66,6 +68,122 @@ def test_extract_ad_id_from_ad_url(self, test_extractor:AdExtractor, url:str, ex """Test extraction of ad ID from different URL formats.""" assert test_extractor.extract_ad_id_from_ad_url(url) == expected_id + def test_path_exists_helper(self, tmp_path:Path) -> None: + """Test _path_exists helper function.""" + + from kleinanzeigen_bot.extract import _path_exists # noqa: PLC0415, PLC2701 + + # Test with existing path + existing_file = tmp_path / "test.txt" + existing_file.write_text("test") + assert _path_exists(existing_file) is True + assert _path_exists(str(existing_file)) is True + + # Test with non-existing path + non_existing = tmp_path / "nonexistent.txt" + assert _path_exists(non_existing) is False + assert _path_exists(str(non_existing)) is False + + def test_path_is_dir_helper(self, tmp_path:Path) -> None: + """Test _path_is_dir helper function.""" + + from kleinanzeigen_bot.extract import _path_is_dir # noqa: PLC0415, PLC2701 + + # Test with directory + test_dir = tmp_path / "testdir" + test_dir.mkdir() + assert _path_is_dir(test_dir) is True + assert _path_is_dir(str(test_dir)) is True + + # Test with file + test_file = tmp_path / "test.txt" + test_file.write_text("test") + assert _path_is_dir(test_file) is False + assert _path_is_dir(str(test_file)) is False + + # Test with non-existing path + non_existing = tmp_path / "nonexistent" + assert _path_is_dir(non_existing) is False + assert _path_is_dir(str(non_existing)) is False + + @pytest.mark.asyncio + async def test_exists_async_helper(self, tmp_path:Path) -> None: + """Test _exists async helper function.""" + from kleinanzeigen_bot.extract import _exists # noqa: PLC0415, PLC2701 + + # Test with existing path + existing_file = tmp_path / "test.txt" + existing_file.write_text("test") + assert await _exists(existing_file) is True + assert await _exists(str(existing_file)) is True + + # Test with non-existing path + non_existing = tmp_path / "nonexistent.txt" + assert await _exists(non_existing) is False + assert await _exists(str(non_existing)) is False + + @pytest.mark.asyncio + async def test_isdir_async_helper(self, tmp_path:Path) -> None: + """Test _isdir async helper function.""" + from kleinanzeigen_bot.extract import _isdir # noqa: PLC0415, PLC2701 + + # Test with directory + test_dir = tmp_path / "testdir" + test_dir.mkdir() + assert await _isdir(test_dir) is True + assert await _isdir(str(test_dir)) is True + + # Test with file + test_file = tmp_path / "test.txt" + test_file.write_text("test") + assert await _isdir(test_file) is False + assert await _isdir(str(test_file)) is False + + # Test with non-existing path + non_existing = tmp_path / "nonexistent" + assert await _isdir(non_existing) is False + assert await _isdir(str(non_existing)) is False + + def test_download_and_save_image_sync_success(self, tmp_path:Path) -> None: + """Test _download_and_save_image_sync with successful download.""" + from unittest.mock import MagicMock, mock_open # noqa: PLC0415 + + test_dir = tmp_path / "images" + test_dir.mkdir() + + # Mock urllib response + mock_response = MagicMock() + mock_response.info().get_content_type.return_value = "image/jpeg" + mock_response.__enter__ = MagicMock(return_value = mock_response) + mock_response.__exit__ = MagicMock(return_value = False) + + with patch("kleinanzeigen_bot.extract.urllib_request.urlopen", return_value = mock_response), \ + patch("kleinanzeigen_bot.extract.open", mock_open()), \ + patch("kleinanzeigen_bot.extract.shutil.copyfileobj"): + + result = AdExtractor._download_and_save_image_sync( + "http://example.com/image.jpg", + str(test_dir), + "test_", + 1 + ) + + assert result is not None + assert result.endswith((".jpe", ".jpeg", ".jpg")) + assert "test_1" in result + + def test_download_and_save_image_sync_failure(self, tmp_path:Path) -> None: + """Test _download_and_save_image_sync with download failure.""" + with patch("kleinanzeigen_bot.extract.urllib_request.urlopen", side_effect = URLError("Network error")): + result = AdExtractor._download_and_save_image_sync( + "http://example.com/image.jpg", + str(tmp_path), + "test_", + 1 + ) + + assert result is None + class TestAdExtractorPricing: """Tests for pricing related functionality.""" @@ -852,27 +970,16 @@ def extractor(self, test_bot_config:Config) -> AdExtractor: return AdExtractor(browser_mock, config) @pytest.mark.asyncio - async def test_download_ad_existing_directory(self, extractor:AdExtractor) -> None: - """Test downloading an ad when the directory already exists.""" - with patch("os.path.exists") as mock_exists, \ - patch("os.path.isdir") as mock_isdir, \ - patch("os.makedirs") as mock_makedirs, \ - patch("os.mkdir") as mock_mkdir, \ - patch("os.rename") as mock_rename, \ - patch("shutil.rmtree") as mock_rmtree, \ + async def test_download_ad(self, extractor:AdExtractor) -> None: + """Test downloading an ad - directory creation and saving ad data.""" + with patch("pathlib.Path.mkdir"), \ patch("kleinanzeigen_bot.extract.dicts.save_dict", autospec = True) as mock_save_dict, \ patch.object(extractor, "_extract_ad_page_info_with_directory_handling", new_callable = AsyncMock) as mock_extract_with_dir: - base_dir = "downloaded-ads" - final_dir = os.path.join(base_dir, "ad_12345_Test Advertisement Title") - yaml_path = os.path.join(final_dir, "ad_12345.yaml") + # Use Path for OS-agnostic path handling + final_dir = Path("downloaded-ads") / "ad_12345_Test Advertisement Title" + yaml_path = final_dir / "ad_12345.yaml" - # Configure mocks for directory checks - existing_paths = {base_dir, final_dir} # Final directory with title exists - mock_exists.side_effect = lambda path: path in existing_paths - mock_isdir.side_effect = lambda path: path == base_dir - - # Mock the new method that handles directory creation and extraction mock_extract_with_dir.return_value = ( AdPartial.model_validate({ "title": "Test Advertisement Title", @@ -887,203 +994,218 @@ async def test_download_ad_existing_directory(self, extractor:AdExtractor) -> No "location": "Test City" } }), - final_dir + str(final_dir) ) await extractor.download_ad(12345) - # Verify the correct functions were called + # Verify observable behavior: extraction and save were called mock_extract_with_dir.assert_called_once() - # Directory handling is now done inside _extract_ad_page_info_with_directory_handling - # so we don't expect rmtree/mkdir to be called directly in download_ad - mock_rmtree.assert_not_called() # Directory handling is done internally - mock_mkdir.assert_not_called() # Directory handling is done internally - mock_makedirs.assert_not_called() # Directory already exists - mock_rename.assert_not_called() # No renaming needed - - # Get the actual call arguments + mock_save_dict.assert_called_once() + + # Verify saved to correct location with correct data actual_call = mock_save_dict.call_args - assert actual_call is not None - actual_path = actual_call[0][0].replace("/", os.path.sep) + actual_path = Path(actual_call[0][0]) assert actual_path == yaml_path assert actual_call[0][1] == mock_extract_with_dir.return_value[0].model_dump() @pytest.mark.asyncio - async def test_download_ad(self, extractor:AdExtractor) -> None: - """Test downloading an entire ad.""" - with patch("os.path.exists") as mock_exists, \ - patch("os.path.isdir") as mock_isdir, \ - patch("os.makedirs") as mock_makedirs, \ - patch("os.mkdir") as mock_mkdir, \ - patch("os.rename") as mock_rename, \ - patch("shutil.rmtree") as mock_rmtree, \ - patch("kleinanzeigen_bot.extract.dicts.save_dict", autospec = True) as mock_save_dict, \ - patch.object(extractor, "_extract_ad_page_info_with_directory_handling", new_callable = AsyncMock) as mock_extract_with_dir: + # pylint: disable=protected-access + async def test_download_images_no_images(self, extractor:AdExtractor) -> None: + """Test image download when no images are found.""" + with patch.object(extractor, "web_find", new_callable = AsyncMock, side_effect = TimeoutError): + image_paths = await extractor._download_images_from_ad_page("/some/dir", 12345) + assert len(image_paths) == 0 - base_dir = "downloaded-ads" - final_dir = os.path.join(base_dir, "ad_12345_Test Advertisement Title") - yaml_path = os.path.join(final_dir, "ad_12345.yaml") + @pytest.mark.asyncio + # pylint: disable=protected-access + async def test_download_images_with_none_url(self, extractor:AdExtractor) -> None: + """Test image download when some images have None as src attribute.""" + image_box_mock = MagicMock() - # Configure mocks for directory checks - mock_exists.return_value = False - mock_isdir.return_value = False + # Create image elements - one with valid src, one with None src + img_with_url = MagicMock() + img_with_url.attrs = {"src": "http://example.com/valid_image.jpg"} - # Mock the new method that handles directory creation and extraction - mock_extract_with_dir.return_value = ( - AdPartial.model_validate({ - "title": "Test Advertisement Title", - "description": "Test Description", - "category": "Dienstleistungen", - "price": 100, - "images": [], - "contact": { - "name": "Test User", - "street": "Test Street 123", - "zipcode": "12345", - "location": "Test City" - } - }), - final_dir - ) + img_without_url = MagicMock() + img_without_url.attrs = {"src": None} - await extractor.download_ad(12345) + with patch.object(extractor, "web_find", new_callable = AsyncMock, return_value = image_box_mock), \ + patch.object(extractor, "web_find_all", new_callable = AsyncMock, return_value = [img_with_url, img_without_url]), \ + patch.object(AdExtractor, "_download_and_save_image_sync", return_value = "/some/dir/ad_12345__img1.jpg"): - # Verify the correct functions were called - mock_extract_with_dir.assert_called_once() - # Directory handling is now done inside _extract_ad_page_info_with_directory_handling - mock_rmtree.assert_not_called() # Directory handling is done internally - mock_mkdir.assert_has_calls([call(base_dir)]) # Only base directory creation - mock_makedirs.assert_not_called() # Using mkdir instead - mock_rename.assert_not_called() # No renaming needed + image_paths = await extractor._download_images_from_ad_page("/some/dir", 12345) - # Get the actual call arguments - actual_call = mock_save_dict.call_args - assert actual_call is not None - actual_path = actual_call[0][0].replace("/", os.path.sep) - assert actual_path == yaml_path - assert actual_call[0][1] == mock_extract_with_dir.return_value[0].model_dump() + # Should only download the one valid image (skip the None) + assert len(image_paths) == 1 + assert image_paths[0] == "ad_12345__img1.jpg" @pytest.mark.asyncio - async def test_download_ad_use_existing_folder(self, extractor:AdExtractor) -> None: - """Test downloading an ad when an old folder without title exists (default behavior).""" - with patch("os.path.exists") as mock_exists, \ - patch("os.path.isdir") as mock_isdir, \ - patch("os.makedirs") as mock_makedirs, \ - patch("os.mkdir") as mock_mkdir, \ - patch("os.rename") as mock_rename, \ - patch("shutil.rmtree") as mock_rmtree, \ - patch("kleinanzeigen_bot.extract.dicts.save_dict", autospec = True) as mock_save_dict, \ - patch.object(extractor, "_extract_ad_page_info_with_directory_handling", new_callable = AsyncMock) as mock_extract_with_dir: + # pylint: disable=protected-access + async def test_extract_ad_page_info_with_directory_handling_final_dir_exists( + self, extractor:AdExtractor, tmp_path:Path + ) -> None: + """Test directory handling when final_dir already exists - it should be deleted.""" + base_dir = tmp_path / "downloaded-ads" + base_dir.mkdir() - base_dir = "downloaded-ads" - temp_dir = os.path.join(base_dir, "ad_12345") - yaml_path = os.path.join(temp_dir, "ad_12345.yaml") + # Create the final directory that should be deleted + final_dir = base_dir / "ad_12345_Test Title" + final_dir.mkdir() + old_file = final_dir / "old_file.txt" + old_file.write_text("old content") - # Configure mocks for directory checks - # Base directory exists, temp directory exists - existing_paths = {base_dir, temp_dir} - mock_exists.side_effect = lambda path: path in existing_paths - mock_isdir.side_effect = lambda path: path == base_dir + # Mock the page + page_mock = MagicMock() + page_mock.url = "https://www.kleinanzeigen.de/s-anzeige/test/12345" + extractor.page = page_mock - # Mock the new method that handles directory creation and extraction - mock_extract_with_dir.return_value = ( - AdPartial.model_validate({ - "title": "Test Advertisement Title", - "description": "Test Description", - "category": "Dienstleistungen", - "price": 100, - "images": [], - "contact": { - "name": "Test User", - "street": "Test Street 123", - "zipcode": "12345", - "location": "Test City" + with patch.object(extractor, "web_text", new_callable = AsyncMock, side_effect = [ + "Test Title", # Title extraction + "Test Title", # Second title call for full extraction + "Description text", # Description + "03.02.2025" # Creation date + ]), \ + patch.object(extractor, "web_execute", new_callable = AsyncMock, return_value = { + "universalAnalyticsOpts": { + "dimensions": { + "dimension92": "", + "dimension108": "" + } } - }), - temp_dir # Use existing temp directory + }), \ + patch.object(extractor, "_extract_category_from_ad_page", new_callable = AsyncMock, return_value = "160"), \ + patch.object(extractor, "_extract_special_attributes_from_ad_page", new_callable = AsyncMock, return_value = {}), \ + patch.object(extractor, "_extract_pricing_info_from_ad_page", new_callable = AsyncMock, return_value = (None, "NOT_APPLICABLE")), \ + patch.object(extractor, "_extract_shipping_info_from_ad_page", new_callable = AsyncMock, return_value = ("NOT_APPLICABLE", None, None)), \ + patch.object(extractor, "_extract_sell_directly_from_ad_page", new_callable = AsyncMock, return_value = False), \ + patch.object(extractor, "_download_images_from_ad_page", new_callable = AsyncMock, return_value = []), \ + patch.object(extractor, "_extract_contact_from_ad_page", new_callable = AsyncMock, return_value = ContactPartial( + name = "Test", zipcode = "12345", location = "Berlin" + )): + + ad_cfg, result_dir = await extractor._extract_ad_page_info_with_directory_handling( + base_dir, 12345 ) - await extractor.download_ad(12345) - - # Verify the correct functions were called - mock_extract_with_dir.assert_called_once() - mock_rmtree.assert_not_called() # No directory to remove - mock_mkdir.assert_not_called() # Base directory already exists - mock_makedirs.assert_not_called() # Using mkdir instead - mock_rename.assert_not_called() # No renaming (default behavior) - - # Get the actual call arguments - actual_call = mock_save_dict.call_args - assert actual_call is not None - actual_path = actual_call[0][0].replace("/", os.path.sep) - assert actual_path == yaml_path - assert actual_call[0][1] == mock_extract_with_dir.return_value[0].model_dump() + # Verify the old directory was deleted and recreated + assert result_dir == final_dir + assert result_dir.exists() + assert not old_file.exists() # Old file should be gone + assert ad_cfg.title == "Test Title" @pytest.mark.asyncio - async def test_download_ad_rename_existing_folder_when_enabled(self, extractor:AdExtractor) -> None: - """Test downloading an ad when an old folder without title exists and renaming is enabled.""" - # Enable renaming in config - extractor.config.download.rename_existing_folders = True + # pylint: disable=protected-access + async def test_extract_ad_page_info_with_directory_handling_rename_enabled( + self, extractor:AdExtractor, tmp_path:Path + ) -> None: + """Test directory handling when temp_dir exists and rename_existing_folders is True.""" + base_dir = tmp_path / "downloaded-ads" + base_dir.mkdir() - with patch("os.path.exists") as mock_exists, \ - patch("os.path.isdir") as mock_isdir, \ - patch("os.makedirs") as mock_makedirs, \ - patch("os.mkdir") as mock_mkdir, \ - patch("os.rename") as mock_rename, \ - patch("shutil.rmtree") as mock_rmtree, \ - patch("kleinanzeigen_bot.extract.dicts.save_dict", autospec = True) as mock_save_dict, \ - patch.object(extractor, "_extract_ad_page_info_with_directory_handling", new_callable = AsyncMock) as mock_extract_with_dir: + # Create the temp directory (without title) + temp_dir = base_dir / "ad_12345" + temp_dir.mkdir() + existing_file = temp_dir / "existing_image.jpg" + existing_file.write_text("existing image data") - base_dir = "downloaded-ads" - temp_dir = os.path.join(base_dir, "ad_12345") - final_dir = os.path.join(base_dir, "ad_12345_Test Advertisement Title") - yaml_path = os.path.join(final_dir, "ad_12345.yaml") + # Enable rename_existing_folders in config + extractor.config.download.rename_existing_folders = True - # Configure mocks for directory checks - # Base directory exists, temp directory exists, final directory doesn't exist - existing_paths = {base_dir, temp_dir} - mock_exists.side_effect = lambda path: path in existing_paths - mock_isdir.side_effect = lambda path: path == base_dir + # Mock the page + page_mock = MagicMock() + page_mock.url = "https://www.kleinanzeigen.de/s-anzeige/test/12345" + extractor.page = page_mock - # Mock the new method that handles directory creation and extraction - mock_extract_with_dir.return_value = ( - AdPartial.model_validate({ - "title": "Test Advertisement Title", - "description": "Test Description", - "category": "Dienstleistungen", - "price": 100, - "images": [], - "contact": { - "name": "Test User", - "street": "Test Street 123", - "zipcode": "12345", - "location": "Test City" + with patch.object(extractor, "web_text", new_callable = AsyncMock, side_effect = [ + "Test Title", # Title extraction + "Test Title", # Second title call for full extraction + "Description text", # Description + "03.02.2025" # Creation date + ]), \ + patch.object(extractor, "web_execute", new_callable = AsyncMock, return_value = { + "universalAnalyticsOpts": { + "dimensions": { + "dimension92": "", + "dimension108": "" + } } - }), - final_dir + }), \ + patch.object(extractor, "_extract_category_from_ad_page", new_callable = AsyncMock, return_value = "160"), \ + patch.object(extractor, "_extract_special_attributes_from_ad_page", new_callable = AsyncMock, return_value = {}), \ + patch.object(extractor, "_extract_pricing_info_from_ad_page", new_callable = AsyncMock, return_value = (None, "NOT_APPLICABLE")), \ + patch.object(extractor, "_extract_shipping_info_from_ad_page", new_callable = AsyncMock, return_value = ("NOT_APPLICABLE", None, None)), \ + patch.object(extractor, "_extract_sell_directly_from_ad_page", new_callable = AsyncMock, return_value = False), \ + patch.object(extractor, "_download_images_from_ad_page", new_callable = AsyncMock, return_value = []), \ + patch.object(extractor, "_extract_contact_from_ad_page", new_callable = AsyncMock, return_value = ContactPartial( + name = "Test", zipcode = "12345", location = "Berlin" + )): + + ad_cfg, result_dir = await extractor._extract_ad_page_info_with_directory_handling( + base_dir, 12345 ) - await extractor.download_ad(12345) - - # Verify the correct functions were called - mock_extract_with_dir.assert_called_once() # Extract to final directory - # Directory handling (including renaming) is now done inside _extract_ad_page_info_with_directory_handling - mock_rmtree.assert_not_called() # Directory handling is done internally - mock_mkdir.assert_not_called() # Directory handling is done internally - mock_makedirs.assert_not_called() # Using mkdir instead - mock_rename.assert_not_called() # Directory handling is done internally - - # Get the actual call arguments - actual_call = mock_save_dict.call_args - assert actual_call is not None - actual_path = actual_call[0][0].replace("/", os.path.sep) - assert actual_path == yaml_path - assert actual_call[0][1] == mock_extract_with_dir.return_value[0].model_dump() + # Verify the directory was renamed from temp_dir to final_dir + final_dir = base_dir / "ad_12345_Test Title" + assert result_dir == final_dir + assert result_dir.exists() + assert not temp_dir.exists() # Old temp dir should be gone + assert (result_dir / "existing_image.jpg").exists() # File should be preserved + assert ad_cfg.title == "Test Title" @pytest.mark.asyncio # pylint: disable=protected-access - async def test_download_images_no_images(self, extractor:AdExtractor) -> None: - """Test image download when no images are found.""" - with patch.object(extractor, "web_find", new_callable = AsyncMock, side_effect = TimeoutError): - image_paths = await extractor._download_images_from_ad_page("/some/dir", 12345) - assert len(image_paths) == 0 + async def test_extract_ad_page_info_with_directory_handling_use_existing( + self, extractor:AdExtractor, tmp_path:Path + ) -> None: + """Test directory handling when temp_dir exists and rename_existing_folders is False (default).""" + base_dir = tmp_path / "downloaded-ads" + base_dir.mkdir() + + # Create the temp directory (without title) + temp_dir = base_dir / "ad_12345" + temp_dir.mkdir() + existing_file = temp_dir / "existing_image.jpg" + existing_file.write_text("existing image data") + + # Ensure rename_existing_folders is False (default) + extractor.config.download.rename_existing_folders = False + + # Mock the page + page_mock = MagicMock() + page_mock.url = "https://www.kleinanzeigen.de/s-anzeige/test/12345" + extractor.page = page_mock + + with patch.object(extractor, "web_text", new_callable = AsyncMock, side_effect = [ + "Test Title", # Title extraction + "Test Title", # Second title call for full extraction + "Description text", # Description + "03.02.2025" # Creation date + ]), \ + patch.object(extractor, "web_execute", new_callable = AsyncMock, return_value = { + "universalAnalyticsOpts": { + "dimensions": { + "dimension92": "", + "dimension108": "" + } + } + }), \ + patch.object(extractor, "_extract_category_from_ad_page", new_callable = AsyncMock, return_value = "160"), \ + patch.object(extractor, "_extract_special_attributes_from_ad_page", new_callable = AsyncMock, return_value = {}), \ + patch.object(extractor, "_extract_pricing_info_from_ad_page", new_callable = AsyncMock, return_value = (None, "NOT_APPLICABLE")), \ + patch.object(extractor, "_extract_shipping_info_from_ad_page", new_callable = AsyncMock, return_value = ("NOT_APPLICABLE", None, None)), \ + patch.object(extractor, "_extract_sell_directly_from_ad_page", new_callable = AsyncMock, return_value = False), \ + patch.object(extractor, "_download_images_from_ad_page", new_callable = AsyncMock, return_value = []), \ + patch.object(extractor, "_extract_contact_from_ad_page", new_callable = AsyncMock, return_value = ContactPartial( + name = "Test", zipcode = "12345", location = "Berlin" + )): + + ad_cfg, result_dir = await extractor._extract_ad_page_info_with_directory_handling( + base_dir, 12345 + ) + + # Verify the existing temp_dir was used (not renamed) + assert result_dir == temp_dir + assert result_dir.exists() + assert (result_dir / "existing_image.jpg").exists() # File should be preserved + assert ad_cfg.title == "Test Title" diff --git a/tests/unit/test_init.py b/tests/unit/test_init.py index f97bba8d..66572801 100644 --- a/tests/unit/test_init.py +++ b/tests/unit/test_init.py @@ -641,12 +641,11 @@ async def test_run_unknown_command(self, test_bot:KleinanzeigenBot) -> None: async def test_verify_command(self, test_bot:KleinanzeigenBot, tmp_path:Any) -> None: """Test verify command with minimal config.""" config_path = Path(tmp_path) / "config.yaml" - with open(config_path, "w", encoding = "utf-8") as f: - f.write(""" + config_path.write_text(""" login: username: test password: test -""") +""", encoding = "utf-8") test_bot.config_file_path = str(config_path) await test_bot.run(["script.py", "verify"]) assert test_bot.config.login.username == "test" diff --git a/tests/unit/test_web_scraping_mixin.py b/tests/unit/test_web_scraping_mixin.py index 0b51e183..041169d2 100644 --- a/tests/unit/test_web_scraping_mixin.py +++ b/tests/unit/test_web_scraping_mixin.py @@ -7,6 +7,7 @@ All rights reserved. """ +import asyncio import json import logging import os @@ -95,6 +96,30 @@ def web_scraper(mock_browser:AsyncMock, mock_page:TrulyAwaitableMockPage) -> Web return scraper +def test_write_initial_prefs(tmp_path:Path) -> None: + """Test _write_initial_prefs helper function.""" + from kleinanzeigen_bot.utils.web_scraping_mixin import _write_initial_prefs # noqa: PLC0415, PLC2701 + + prefs_file = tmp_path / "Preferences" + _write_initial_prefs(str(prefs_file)) + + # Verify file was created + assert prefs_file.exists() + + # Verify content is valid JSON with expected structure + with open(prefs_file, encoding = "UTF-8") as f: + prefs = json.load(f) + + assert prefs["credentials_enable_service"] is False + assert prefs["enable_do_not_track"] is True + assert prefs["google"]["services"]["consented_to_sync"] is False + assert prefs["profile"]["password_manager_enabled"] is False + assert prefs["profile"]["default_content_setting_values"]["notifications"] == 2 + assert prefs["signin"]["allowed"] is False + assert "www.kleinanzeigen.de" in prefs["translate_site_blacklist"] + assert prefs["devtools"]["preferences"]["currentDockState"] == '"bottom"' + + class TestWebScrapingErrorHandling: """Test error handling scenarios in WebScrapingMixin.""" @@ -762,8 +787,7 @@ def mock_exists(path:str) -> bool: prefs_file = profile_dir / "Preferences" # Test with existing preferences file - with open(prefs_file, "w", encoding = "UTF-8") as f: - json.dump({"existing": "prefs"}, f) + prefs_file.write_text(json.dumps({"existing": "prefs"}), encoding = "UTF-8") scraper = WebScrapingMixin() scraper.browser_config.user_data_dir = str(tmp_path) @@ -771,22 +795,20 @@ def mock_exists(path:str) -> bool: await scraper.create_browser_session() # Verify preferences file was not overwritten - with open(prefs_file, "r", encoding = "UTF-8") as f: - prefs = json.load(f) - assert prefs["existing"] == "prefs" + prefs = json.loads(prefs_file.read_text(encoding = "UTF-8")) + assert prefs["existing"] == "prefs" # Test with missing preferences file prefs_file.unlink() await scraper.create_browser_session() # Verify new preferences file was created with correct settings - with open(prefs_file, "r", encoding = "UTF-8") as f: - prefs = json.load(f) - assert prefs["credentials_enable_service"] is False - assert prefs["enable_do_not_track"] is True - assert prefs["profile"]["password_manager_enabled"] is False - assert prefs["signin"]["allowed"] is False - assert "www.kleinanzeigen.de" in prefs["translate_site_blacklist"] + prefs = json.loads(prefs_file.read_text(encoding = "UTF-8")) + assert prefs["credentials_enable_service"] is False + assert prefs["enable_do_not_track"] is True + assert prefs["profile"]["password_manager_enabled"] is False + assert prefs["signin"]["allowed"] is False + assert "www.kleinanzeigen.de" in prefs["translate_site_blacklist"] @pytest.mark.asyncio async def test_browser_arguments_configuration(self, tmp_path:Path, monkeypatch:pytest.MonkeyPatch) -> None: @@ -894,8 +916,8 @@ def add_extension(self, ext:str) -> None: config = _nodriver_start_mock().call_args[0][0] assert len(config._extensions) == 2 for ext_path in config._extensions: - assert os.path.exists(ext_path) - assert os.path.isdir(ext_path) + assert await asyncio.get_running_loop().run_in_executor(None, Path(ext_path).exists) + assert await asyncio.get_running_loop().run_in_executor(None, Path(ext_path).is_dir) # Test with non-existent extension scraper.browser_config.extensions = ["non_existent.crx"] @@ -976,8 +998,7 @@ def add_extension(self, ext:str) -> None: scraper.browser_config.user_data_dir = str(tmp_path) scraper.browser_config.profile_name = "Default" await scraper.create_browser_session() - with open(state_file, "w", encoding = "utf-8") as f: - f.write('{"foo": "bar"}') + state_file.write_text('{"foo": "bar"}', encoding = "utf-8") scraper.browser._process_pid = 12345 scraper.browser.stop = MagicMock() with patch("psutil.Process") as mock_proc: @@ -989,8 +1010,7 @@ def add_extension(self, ext:str) -> None: scraper2.browser_config.user_data_dir = str(tmp_path) scraper2.browser_config.profile_name = "Default" await scraper2.create_browser_session() - with open(state_file, "r", encoding = "utf-8") as f: - data = f.read() + data = state_file.read_text(encoding = "utf-8") assert data == '{"foo": "bar"}' scraper2.browser._process_pid = 12346 scraper2.browser.stop = MagicMock()