Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,6 @@ select = [
]
ignore = [
"ANN401", # Dynamically typed expressions (typing.Any) are disallowed
"ASYNC210", # TODO Async functions should not call blocking HTTP methods
"ASYNC230", # TODO Async functions should not open files with blocking methods like `open`
"ASYNC240", # TODO Async functions should not use os.path methods, use trio.Path or anyio.path
"ASYNC250", # TODO Blocking call to input() in async context
"COM812", # Trailing comma missing
"D1", # Missing docstring in ...
"D200", # One-line docstring should fit on one line
Expand Down
4 changes: 2 additions & 2 deletions src/kleinanzeigen_bot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ async def publish_ad(self, ad_file:str, ad_cfg:Ad, ad_cfg_orig:dict[str, Any], p
LOG.warning("# Payment form detected! Please proceed with payment.")
LOG.warning("############################################")
await self.web_scroll_page_down()
input(_("Press a key to continue..."))
await ainput(_("Press a key to continue...")) # noqa: ASYNC240
except TimeoutError:
pass

Expand Down Expand Up @@ -1108,7 +1108,7 @@ async def __set_shipping(self, ad_cfg:Ad, mode:AdUpdateStrategy = AdUpdateStrate
# in some categories we need to go another dialog back
try:
await self.web_find(By.XPATH, '//dialog//button[contains(., "Andere Versandmethoden")]',
timeout=short_timeout)
timeout = short_timeout)
except TimeoutError:
await self.web_click(By.XPATH, '//dialog//button[contains(., "Zurück")]')

Expand Down
94 changes: 72 additions & 22 deletions src/kleinanzeigen_bot/extract.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import asyncio
from gettext import gettext as _

import json, mimetypes, os, re, shutil # isort: skip
import json, mimetypes, re, shutil # isort: skip
import urllib.request as urllib_request
from datetime import datetime
from pathlib import Path
from typing import Any, Final

from kleinanzeigen_bot.model.ad_model import ContactPartial
Expand All @@ -25,6 +27,28 @@
BREADCRUMB_RE = re.compile(r"/c(\d+)")


def _path_exists(path:Path | str) -> bool:
"""Helper for Path.exists() that can be mocked in tests."""
return Path(path).exists()


def _path_is_dir(path:Path | str) -> bool:
"""Helper for Path.is_dir() that can be mocked in tests."""
return Path(path).is_dir()


async def _exists(path:Path | str) -> bool:
result = await asyncio.get_running_loop().run_in_executor(None, _path_exists, path) # noqa: ASYNC240
LOG.debug("Path exists check: %s -> %s", path, result)
return result


async def _isdir(path:Path | str) -> bool:
result = await asyncio.get_running_loop().run_in_executor(None, _path_is_dir, path) # noqa: ASYNC240
LOG.debug("Path is_dir check: %s -> %s", path, result)
return result


class AdExtractor(WebScrapingMixin):
"""
Wrapper class for ad extraction that uses an active bot´s browser session to extract specific elements from an ad page.
Expand All @@ -44,10 +68,11 @@ async def download_ad(self, ad_id:int) -> None:
"""

# create sub-directory for ad(s) to download (if necessary):
relative_directory = "downloaded-ads"
relative_directory = Path("downloaded-ads")
# make sure configured base directory exists
if not os.path.exists(relative_directory) or not os.path.isdir(relative_directory):
os.mkdir(relative_directory)
if not await _exists(relative_directory) or not await _isdir(relative_directory):
LOG.debug("Creating base directory: %s", relative_directory)
await asyncio.get_running_loop().run_in_executor(None, relative_directory.mkdir)
LOG.info("Created ads directory at ./%s.", relative_directory)

# Extract ad info and determine final directory path
Expand All @@ -56,12 +81,26 @@ async def download_ad(self, ad_id:int) -> None:
)

# Save the ad configuration file
ad_file_path = final_dir + "/" + f"ad_{ad_id}.yaml"
ad_file_path = str(Path(final_dir) / f"ad_{ad_id}.yaml")
dicts.save_dict(
ad_file_path,
ad_cfg.model_dump(),
header = "# yaml-language-server: $schema=https://raw.githubusercontent.com/Second-Hand-Friends/kleinanzeigen-bot/refs/heads/main/schemas/ad.schema.json")

@staticmethod
def _download_and_save_image_sync(url:str, directory:str, filename_prefix:str, img_nr:int) -> str | None:
try:
with urllib_request.urlopen(url) as response: # noqa: S310 Audit URL open for permitted schemes.
content_type = response.info().get_content_type()
file_ending = mimetypes.guess_extension(content_type) or ""
img_path = f"{directory}/{filename_prefix}{img_nr}{file_ending}"
with open(img_path, "wb") as f:
shutil.copyfileobj(response, f)
return img_path
except Exception as e:
LOG.warning("Failed to download image %s: %s", url, e)
return None

async def _download_images_from_ad_page(self, directory:str, ad_id:int) -> list[str]:
"""
Downloads all images of an ad.
Expand All @@ -85,17 +124,23 @@ async def _download_images_from_ad_page(self, directory:str, ad_id:int) -> list[
img_nr = 1
dl_counter = 0

loop = asyncio.get_running_loop()

for img_element in images:
current_img_url = img_element.attrs["src"] # URL of the image
if current_img_url is None:
continue

with urllib_request.urlopen(str(current_img_url)) as response: # noqa: S310 Audit URL open for permitted schemes.
content_type = response.info().get_content_type()
file_ending = mimetypes.guess_extension(content_type)
img_path = f"{directory}/{img_fn_prefix}{img_nr}{file_ending}"
with open(img_path, "wb") as f:
shutil.copyfileobj(response, f)
img_path = await loop.run_in_executor(
None,
self._download_and_save_image_sync,
str(current_img_url),
directory,
img_fn_prefix,
img_nr
)

if img_path:
dl_counter += 1
img_paths.append(img_path.rsplit("/", maxsplit = 1)[-1])

Expand Down Expand Up @@ -354,8 +399,8 @@ async def _extract_ad_page_info(self, directory:str, ad_id:int) -> AdPartial:
return ad_cfg

async def _extract_ad_page_info_with_directory_handling(
self, relative_directory:str, ad_id:int
) -> tuple[AdPartial, str]:
self, relative_directory:Path, ad_id:int
) -> tuple[AdPartial, Path]:
"""
Extracts ad information and handles directory creation/renaming.

Expand All @@ -373,32 +418,37 @@ async def _extract_ad_page_info_with_directory_handling(

# Determine the final directory path
sanitized_title = misc.sanitize_folder_name(title, self.config.download.folder_name_max_length)
final_dir = os.path.join(relative_directory, f"ad_{ad_id}_{sanitized_title}")
temp_dir = os.path.join(relative_directory, f"ad_{ad_id}")
final_dir = relative_directory / f"ad_{ad_id}_{sanitized_title}"
temp_dir = relative_directory / f"ad_{ad_id}"

loop = asyncio.get_running_loop()

# Handle existing directories
if os.path.exists(final_dir):
if await _exists(final_dir):
# If the folder with title already exists, delete it
LOG.info("Deleting current folder of ad %s...", ad_id)
shutil.rmtree(final_dir)
LOG.debug("Removing directory tree: %s", final_dir)
await loop.run_in_executor(None, shutil.rmtree, str(final_dir))

if os.path.exists(temp_dir):
if await _exists(temp_dir):
if self.config.download.rename_existing_folders:
# Rename the old folder to the new name with title
LOG.info("Renaming folder from %s to %s for ad %s...",
os.path.basename(temp_dir), os.path.basename(final_dir), ad_id)
os.rename(temp_dir, final_dir)
temp_dir.name, final_dir.name, ad_id)
LOG.debug("Renaming: %s -> %s", temp_dir, final_dir)
await loop.run_in_executor(None, temp_dir.rename, final_dir)
else:
# Use the existing folder without renaming
final_dir = temp_dir
LOG.info("Using existing folder for ad %s at %s.", ad_id, final_dir)
else:
# Create new directory with title
os.mkdir(final_dir)
LOG.debug("Creating new directory: %s", final_dir)
await loop.run_in_executor(None, final_dir.mkdir)
LOG.info("New directory for ad created at %s.", final_dir)

# Now extract complete ad info (including images) to the final directory
ad_cfg = await self._extract_ad_page_info(final_dir, ad_id)
ad_cfg = await self._extract_ad_page_info(str(final_dir), ad_id)

return ad_cfg, final_dir

Expand Down
3 changes: 3 additions & 0 deletions src/kleinanzeigen_bot/resources/translations.de.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ kleinanzeigen_bot/extract.py:
download_ad:
"Created ads directory at ./%s.": "Verzeichnis für Anzeigen erstellt unter ./%s."

_download_and_save_image_sync:
"Failed to download image %s: %s": "Fehler beim Herunterladen des Bildes %s: %s"

_download_images_from_ad_page:
"Found %s.": "%s gefunden."
"Downloaded %s.": "%s heruntergeladen."
Expand Down
70 changes: 39 additions & 31 deletions src/kleinanzeigen_bot/utils/web_scraping_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,41 @@ def __init__(self) -> None:
self.profile_name:str | None = None


def _write_initial_prefs(prefs_file:str) -> None:
with open(prefs_file, "w", encoding = "UTF-8") as fd:
json.dump({
"credentials_enable_service": False,
"enable_do_not_track": True,
"google": {
"services": {
"consented_to_sync": False
}
},
"profile": {
"default_content_setting_values": {
"popups": 0,
"notifications": 2 # 1 = allow, 2 = block browser notifications
},
"password_manager_enabled": False
},
"signin": {
"allowed": False
},
"translate_site_blacklist": [
"www.kleinanzeigen.de"
],
"devtools": {
"preferences": {
"currentDockState": '"bottom"'
}
}
}, fd)


async def _exists(path:str) -> bool:
return await asyncio.get_running_loop().run_in_executor(None, os.path.exists, path)


class WebScrapingMixin:

def __init__(self) -> None:
Expand Down Expand Up @@ -174,7 +209,7 @@ async def create_browser_session(self) -> None:
LOG.info("Creating Browser session...")

if self.browser_config.binary_location:
ensure(os.path.exists(self.browser_config.binary_location), f"Specified browser binary [{self.browser_config.binary_location}] does not exist.")
ensure(await _exists(self.browser_config.binary_location), f"Specified browser binary [{self.browser_config.binary_location}] does not exist.")
else:
self.browser_config.binary_location = self.get_compatible_browser()
LOG.info(" -> Browser binary location: %s", self.browser_config.binary_location)
Expand Down Expand Up @@ -289,41 +324,14 @@ async def create_browser_session(self) -> None:
profile_dir = os.path.join(cfg.user_data_dir, self.browser_config.profile_name or "Default")
os.makedirs(profile_dir, exist_ok = True)
prefs_file = os.path.join(profile_dir, "Preferences")
if not os.path.exists(prefs_file):
if not await _exists(prefs_file):
LOG.info(" -> Setting chrome prefs [%s]...", prefs_file)
with open(prefs_file, "w", encoding = "UTF-8") as fd:
json.dump({
"credentials_enable_service": False,
"enable_do_not_track": True,
"google": {
"services": {
"consented_to_sync": False
}
},
"profile": {
"default_content_setting_values": {
"popups": 0,
"notifications": 2 # 1 = allow, 2 = block browser notifications
},
"password_manager_enabled": False
},
"signin": {
"allowed": False
},
"translate_site_blacklist": [
"www.kleinanzeigen.de"
],
"devtools": {
"preferences": {
"currentDockState": '"bottom"'
}
}
}, fd)
await asyncio.get_running_loop().run_in_executor(None, _write_initial_prefs, prefs_file)

# load extensions
for crx_extension in self.browser_config.extensions:
LOG.info(" -> Adding Browser extension: [%s]", crx_extension)
ensure(os.path.exists(crx_extension), f"Configured extension-file [{crx_extension}] does not exist.")
ensure(await _exists(crx_extension), f"Configured extension-file [{crx_extension}] does not exist.")
cfg.add_extension(crx_extension)

try:
Expand Down
Loading
Loading