Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,6 @@ select = [
]
ignore = [
"ANN401", # Dynamically typed expressions (typing.Any) are disallowed
"ASYNC210", # TODO Async functions should not call blocking HTTP methods
"ASYNC230", # TODO Async functions should not open files with blocking methods like `open`
"ASYNC240", # TODO Async functions should not use os.path methods, use trio.Path or anyio.path
"ASYNC250", # TODO Blocking call to input() in async context
"COM812", # Trailing comma missing
"D1", # Missing docstring in ...
"D200", # One-line docstring should fit on one line
Expand Down
4 changes: 2 additions & 2 deletions src/kleinanzeigen_bot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ async def publish_ad(self, ad_file:str, ad_cfg:Ad, ad_cfg_orig:dict[str, Any], p
LOG.warning("# Payment form detected! Please proceed with payment.")
LOG.warning("############################################")
await self.web_scroll_page_down()
input(_("Press a key to continue..."))
await ainput(_("Press a key to continue..."))
except TimeoutError:
pass

Expand Down Expand Up @@ -1108,7 +1108,7 @@ async def __set_shipping(self, ad_cfg:Ad, mode:AdUpdateStrategy = AdUpdateStrate
# in some categories we need to go another dialog back
try:
await self.web_find(By.XPATH, '//dialog//button[contains(., "Andere Versandmethoden")]',
timeout=short_timeout)
timeout = short_timeout)
except TimeoutError:
await self.web_click(By.XPATH, '//dialog//button[contains(., "Zurück")]')

Expand Down
113 changes: 83 additions & 30 deletions src/kleinanzeigen_bot/extract.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import asyncio
from gettext import gettext as _

import json, mimetypes, os, re, shutil # isort: skip
import json, mimetypes, re, shutil # isort: skip
import urllib.error as urllib_error
import urllib.request as urllib_request
from datetime import datetime
from pathlib import Path
from typing import Any, Final

from kleinanzeigen_bot.model.ad_model import ContactPartial
Expand All @@ -25,6 +28,28 @@
BREADCRUMB_RE = re.compile(r"/c(\d+)")


def _path_exists(path:Path | str) -> bool:
"""Helper for Path.exists() that can be mocked in tests."""
return Path(path).exists()


def _path_is_dir(path:Path | str) -> bool:
"""Helper for Path.is_dir() that can be mocked in tests."""
return Path(path).is_dir()


async def _exists(path:Path | str) -> bool:
result = await asyncio.get_running_loop().run_in_executor(None, _path_exists, path)
LOG.debug("Path exists check: %s -> %s", path, result)
return result


async def _isdir(path:Path | str) -> bool:
result = await asyncio.get_running_loop().run_in_executor(None, _path_is_dir, path)
LOG.debug("Path is_dir check: %s -> %s", path, result)
return result


class AdExtractor(WebScrapingMixin):
"""
Wrapper class for ad extraction that uses an active bot´s browser session to extract specific elements from an ad page.
Expand All @@ -44,23 +69,39 @@ async def download_ad(self, ad_id:int) -> None:
"""

# create sub-directory for ad(s) to download (if necessary):
relative_directory = "downloaded-ads"
# make sure configured base directory exists
if not os.path.exists(relative_directory) or not os.path.isdir(relative_directory):
os.mkdir(relative_directory)
LOG.info("Created ads directory at ./%s.", relative_directory)
relative_directory = Path("downloaded-ads")
# make sure configured base directory exists (using exist_ok=True to avoid TOCTOU race)
await asyncio.get_running_loop().run_in_executor(None, lambda: relative_directory.mkdir(exist_ok = True)) # noqa: ASYNC240
LOG.info("Ensured ads directory exists at ./%s.", relative_directory)

# Extract ad info and determine final directory path
ad_cfg, final_dir = await self._extract_ad_page_info_with_directory_handling(
relative_directory, ad_id
)

# Save the ad configuration file
ad_file_path = final_dir + "/" + f"ad_{ad_id}.yaml"
dicts.save_dict(
ad_file_path,
ad_cfg.model_dump(),
header = "# yaml-language-server: $schema=https://raw.githubusercontent.com/Second-Hand-Friends/kleinanzeigen-bot/refs/heads/main/schemas/ad.schema.json")
# Save the ad configuration file (offload to executor to avoid blocking the event loop)
ad_file_path = str(Path(final_dir) / f"ad_{ad_id}.yaml")
header_string = "# yaml-language-server: $schema=https://raw.githubusercontent.com/Second-Hand-Friends/kleinanzeigen-bot/refs/heads/main/schemas/ad.schema.json"
await asyncio.get_running_loop().run_in_executor(
None,
lambda: dicts.save_dict(ad_file_path, ad_cfg.model_dump(), header = header_string)
)

@staticmethod
def _download_and_save_image_sync(url:str, directory:str, filename_prefix:str, img_nr:int) -> str | None:
try:
with urllib_request.urlopen(url) as response: # noqa: S310 Audit URL open for permitted schemes.
content_type = response.info().get_content_type()
file_ending = mimetypes.guess_extension(content_type) or ""
# Use pathlib.Path for OS-agnostic path handling
img_path = Path(directory) / f"{filename_prefix}{img_nr}{file_ending}"
with open(img_path, "wb") as f:
shutil.copyfileobj(response, f)
return str(img_path)
except (urllib_error.URLError, urllib_error.HTTPError, OSError, shutil.Error) as e:
# Narrow exception handling to expected network/filesystem errors
LOG.warning("Failed to download image %s: %s", url, e)
return None

async def _download_images_from_ad_page(self, directory:str, ad_id:int) -> list[str]:
"""
Expand All @@ -85,19 +126,26 @@ async def _download_images_from_ad_page(self, directory:str, ad_id:int) -> list[
img_nr = 1
dl_counter = 0

loop = asyncio.get_running_loop()

for img_element in images:
current_img_url = img_element.attrs["src"] # URL of the image
if current_img_url is None:
continue

with urllib_request.urlopen(str(current_img_url)) as response: # noqa: S310 Audit URL open for permitted schemes.
content_type = response.info().get_content_type()
file_ending = mimetypes.guess_extension(content_type)
img_path = f"{directory}/{img_fn_prefix}{img_nr}{file_ending}"
with open(img_path, "wb") as f:
shutil.copyfileobj(response, f)
img_path = await loop.run_in_executor(
None,
self._download_and_save_image_sync,
str(current_img_url),
directory,
img_fn_prefix,
img_nr
)

if img_path:
dl_counter += 1
img_paths.append(img_path.rsplit("/", maxsplit = 1)[-1])
# Use pathlib.Path for OS-agnostic path handling
img_paths.append(Path(img_path).name)

img_nr += 1
LOG.info("Downloaded %s.", i18n.pluralize("image", dl_counter))
Expand Down Expand Up @@ -354,8 +402,8 @@ async def _extract_ad_page_info(self, directory:str, ad_id:int) -> AdPartial:
return ad_cfg

async def _extract_ad_page_info_with_directory_handling(
self, relative_directory:str, ad_id:int
) -> tuple[AdPartial, str]:
self, relative_directory:Path, ad_id:int
) -> tuple[AdPartial, Path]:
"""
Extracts ad information and handles directory creation/renaming.

Expand All @@ -373,32 +421,37 @@ async def _extract_ad_page_info_with_directory_handling(

# Determine the final directory path
sanitized_title = misc.sanitize_folder_name(title, self.config.download.folder_name_max_length)
final_dir = os.path.join(relative_directory, f"ad_{ad_id}_{sanitized_title}")
temp_dir = os.path.join(relative_directory, f"ad_{ad_id}")
final_dir = relative_directory / f"ad_{ad_id}_{sanitized_title}"
temp_dir = relative_directory / f"ad_{ad_id}"

loop = asyncio.get_running_loop()

# Handle existing directories
if os.path.exists(final_dir):
if await _exists(final_dir):
# If the folder with title already exists, delete it
LOG.info("Deleting current folder of ad %s...", ad_id)
shutil.rmtree(final_dir)
LOG.debug("Removing directory tree: %s", final_dir)
await loop.run_in_executor(None, shutil.rmtree, str(final_dir))

if os.path.exists(temp_dir):
if await _exists(temp_dir):
if self.config.download.rename_existing_folders:
# Rename the old folder to the new name with title
LOG.info("Renaming folder from %s to %s for ad %s...",
os.path.basename(temp_dir), os.path.basename(final_dir), ad_id)
os.rename(temp_dir, final_dir)
temp_dir.name, final_dir.name, ad_id)
LOG.debug("Renaming: %s -> %s", temp_dir, final_dir)
await loop.run_in_executor(None, temp_dir.rename, final_dir)
else:
# Use the existing folder without renaming
final_dir = temp_dir
LOG.info("Using existing folder for ad %s at %s.", ad_id, final_dir)
else:
# Create new directory with title
os.mkdir(final_dir)
LOG.debug("Creating new directory: %s", final_dir)
await loop.run_in_executor(None, final_dir.mkdir)
LOG.info("New directory for ad created at %s.", final_dir)

# Now extract complete ad info (including images) to the final directory
ad_cfg = await self._extract_ad_page_info(final_dir, ad_id)
ad_cfg = await self._extract_ad_page_info(str(final_dir), ad_id)

return ad_cfg, final_dir

Expand Down
5 changes: 4 additions & 1 deletion src/kleinanzeigen_bot/resources/translations.de.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ kleinanzeigen_bot/__init__.py:
kleinanzeigen_bot/extract.py:
#################################################
download_ad:
"Created ads directory at ./%s.": "Verzeichnis für Anzeigen erstellt unter ./%s."
"Ensured ads directory exists at ./%s.": "Verzeichnis für Anzeigen sichergestellt unter ./%s."

_download_and_save_image_sync:
"Failed to download image %s: %s": "Fehler beim Herunterladen des Bildes %s: %s"

_download_images_from_ad_page:
"Found %s.": "%s gefunden."
Expand Down
70 changes: 39 additions & 31 deletions src/kleinanzeigen_bot/utils/web_scraping_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,41 @@ def __init__(self) -> None:
self.profile_name:str | None = None


def _write_initial_prefs(prefs_file:str) -> None:
with open(prefs_file, "w", encoding = "UTF-8") as fd:
json.dump({
"credentials_enable_service": False,
"enable_do_not_track": True,
"google": {
"services": {
"consented_to_sync": False
}
},
"profile": {
"default_content_setting_values": {
"popups": 0,
"notifications": 2 # 1 = allow, 2 = block browser notifications
},
"password_manager_enabled": False
},
"signin": {
"allowed": False
},
"translate_site_blacklist": [
"www.kleinanzeigen.de"
],
"devtools": {
"preferences": {
"currentDockState": '"bottom"'
}
}
}, fd)


async def _exists(path:str) -> bool:
return await asyncio.get_running_loop().run_in_executor(None, os.path.exists, path)


class WebScrapingMixin:

def __init__(self) -> None:
Expand Down Expand Up @@ -174,7 +209,7 @@ async def create_browser_session(self) -> None:
LOG.info("Creating Browser session...")

if self.browser_config.binary_location:
ensure(os.path.exists(self.browser_config.binary_location), f"Specified browser binary [{self.browser_config.binary_location}] does not exist.")
ensure(await _exists(self.browser_config.binary_location), f"Specified browser binary [{self.browser_config.binary_location}] does not exist.")
else:
self.browser_config.binary_location = self.get_compatible_browser()
LOG.info(" -> Browser binary location: %s", self.browser_config.binary_location)
Expand Down Expand Up @@ -289,41 +324,14 @@ async def create_browser_session(self) -> None:
profile_dir = os.path.join(cfg.user_data_dir, self.browser_config.profile_name or "Default")
os.makedirs(profile_dir, exist_ok = True)
prefs_file = os.path.join(profile_dir, "Preferences")
if not os.path.exists(prefs_file):
if not await _exists(prefs_file):
LOG.info(" -> Setting chrome prefs [%s]...", prefs_file)
with open(prefs_file, "w", encoding = "UTF-8") as fd:
json.dump({
"credentials_enable_service": False,
"enable_do_not_track": True,
"google": {
"services": {
"consented_to_sync": False
}
},
"profile": {
"default_content_setting_values": {
"popups": 0,
"notifications": 2 # 1 = allow, 2 = block browser notifications
},
"password_manager_enabled": False
},
"signin": {
"allowed": False
},
"translate_site_blacklist": [
"www.kleinanzeigen.de"
],
"devtools": {
"preferences": {
"currentDockState": '"bottom"'
}
}
}, fd)
await asyncio.get_running_loop().run_in_executor(None, _write_initial_prefs, prefs_file)

# load extensions
for crx_extension in self.browser_config.extensions:
LOG.info(" -> Adding Browser extension: [%s]", crx_extension)
ensure(os.path.exists(crx_extension), f"Configured extension-file [{crx_extension}] does not exist.")
ensure(await _exists(crx_extension), f"Configured extension-file [{crx_extension}] does not exist.")
cfg.add_extension(crx_extension)

try:
Expand Down
Loading