diff --git a/beetsplug/lastgenre/__init__.py b/beetsplug/lastgenre/__init__.py index 3840b26533..e89104a639 100644 --- a/beetsplug/lastgenre/__init__.py +++ b/beetsplug/lastgenre/__init__.py @@ -25,6 +25,8 @@ from __future__ import annotations import os +import re +from collections import defaultdict from functools import singledispatchmethod from pathlib import Path from typing import TYPE_CHECKING, Any @@ -34,6 +36,7 @@ from beets import config, library, plugins, ui from beets.library import Album, Item from beets.logging import extra_debug +from beets.ui import UserError from beets.util import plurality, unique_list from .client import LastFmClient @@ -52,6 +55,12 @@ """Genre hierarchy as list of paths from general to specific. Example: [['electronic', 'house'], ['electronic', 'techno']]""" + RawBlacklist = dict[str, list[str]] + """Mapping of artist name to list of raw regex/string patterns.""" + + Blacklist = dict[str, list[re.Pattern[str]]] + """Mapping of artist name to list of compiled case-insensitive patterns.""" + # Canonicalization tree processing. @@ -131,6 +140,7 @@ def __init__(self) -> None: "prefer_specific": False, "title_case": True, "pretend": False, + "blacklist": False, } ) self.setup() @@ -140,12 +150,13 @@ def setup(self) -> None: if self.config["auto"]: self.import_stages = [self.imported] - self.client = LastFmClient( - self._log, self.config["min_weight"].get(int) - ) self.whitelist: Whitelist = self._load_whitelist() self.c14n_branches: CanonTree self.c14n_branches, self.canonicalize = self._load_c14n_tree() + self.blacklist: Blacklist = self._load_blacklist() + self.client = LastFmClient( + self._log, self.config["min_weight"].get(int), self.blacklist + ) def _load_whitelist(self) -> Whitelist: """Load the whitelist from a text file. @@ -188,6 +199,99 @@ def _load_c14n_tree(self) -> tuple[CanonTree, bool]: flatten_tree(genres_tree, [], c14n_branches) return c14n_branches, canonicalize + def _load_blacklist(self) -> Blacklist: + """Load the blacklist from a configured file path. + + For maximum compatibility with regex patterns, a custom format is used: + - Each section starts with an artist name, followed by a colon. + - Subsequent lines are indented (at least one space, typically 4 spaces) + and contain a regex pattern to match a genre. + - A '*' key for artist can be used to specify global forbidden genres. + + Returns a compiled blacklist dictionary mapping artist names to lists of + case-insensitive regex patterns. Returns empty dict if not configured. + + Example blacklist file format:: + + Artist Name: + .*rock.* + .*metal.* + Another Artist Name: + ^jazz$ + *: + spoken word + comedy + + Raises: + UserError: if the file format is invalid. + """ + blacklist_raw: RawBlacklist = defaultdict(list) + bl_filename = self.config["blacklist"].get() + if not bl_filename: + return {} + if not isinstance(bl_filename, str): + raise UserError( + "Invalid value for lastgenre.blacklist: expected path string " + f"or 'no', got {bl_filename!r} (type {type(bl_filename).__name__})" + ) + + self._log.debug("Loading blacklist file {}", bl_filename) + section = None + with Path(bl_filename).expanduser().open(encoding="utf-8") as f: + for lineno, line in enumerate(f, 1): + if not line.strip() or line.lstrip().startswith("#"): + continue + if not line.startswith(" "): + # Section header + header = line.strip().lower() + if not header.endswith(":"): + raise UserError( + f"Malformed blacklist section header " + f"at line {lineno}: {line}" + ) + section = header[:-1].rstrip() + if not section: + raise UserError( + f"Empty blacklist section name " + f"at line {lineno}: {line}" + ) + else: + # Pattern line: must be indented (at least one space) + if section is None: + raise UserError( + f"Blacklist regex pattern line before any section header " + f"at line {lineno}: {line}" + ) + blacklist_raw[section].append(line.strip()) + extra_debug(self._log, "Blacklist: {}", blacklist_raw) + return self._compile_blacklist_patterns(blacklist_raw) + + @staticmethod + def _compile_blacklist_patterns( + blacklist: RawBlacklist, + ) -> Blacklist: + """Compile blacklist patterns into regex objects. + + Tries regex compilation first, falls back to literal string matching. + That way users can use regexes for flexible matching but also simple + strings without worrying about regex syntax. All patterns are + case-insensitive. + """ + compiled_blacklist: defaultdict[str, list[re.Pattern[str]]] = ( + defaultdict(list) + ) + for artist, patterns in blacklist.items(): + compiled_patterns = [] + for pattern in patterns: + try: + compiled_patterns.append(re.compile(pattern, re.IGNORECASE)) + except re.error: + compiled_patterns.append( + re.compile(re.escape(pattern), re.IGNORECASE) + ) + compiled_blacklist[artist] = compiled_patterns + return compiled_blacklist + @property def sources(self) -> tuple[str, ...]: """A tuple of allowed genre sources. May contain 'track', @@ -203,7 +307,9 @@ def sources(self) -> tuple[str, ...]: # Genre list processing. - def _resolve_genres(self, tags: list[str]) -> list[str]: + def _resolve_genres( + self, tags: list[str], artist: str | None = None + ) -> list[str]: """Canonicalize, sort and filter a list of genres. - Returns an empty list if the input tags list is empty. @@ -230,11 +336,15 @@ def _resolve_genres(self, tags: list[str]) -> list[str]: # Extend the list to consider tags parents in the c14n tree tags_all = [] for tag in tags: - # Add parents that are in the whitelist, or add the oldest - # ancestor if no whitelist - if self.whitelist: + # Skip blacklisted tags entirely — don't walk their ancestry. + if self._is_blacklisted(tag, artist): + continue + # Add parents that pass whitelist/blacklist, or add the oldest + # ancestor if neither is configured + if self.whitelist or self.blacklist: parents = self._filter_valid( - find_parents(tag, self.c14n_branches) + find_parents(tag, self.c14n_branches), + artist=artist, ) else: parents = [find_parents(tag, self.c14n_branches)[-1]] @@ -255,24 +365,57 @@ def _resolve_genres(self, tags: list[str]) -> list[str]: if self.config["prefer_specific"]: tags = sort_by_depth(tags, self.c14n_branches) - # c14n only adds allowed genres but we may have had forbidden genres in - # the original tags list - valid_tags = self._filter_valid(tags) + # Final filter: catches forbidden genres when c14n is disabled, + # and any that survived the loop without whitelist/blacklist active. + valid_tags = self._filter_valid(tags, artist=artist) return valid_tags[:count] - def _filter_valid(self, genres: Iterable[str]) -> list[str]: - """Filter genres based on whitelist. + def _filter_valid( + self, genres: Iterable[str], artist: str | None = None + ) -> list[str]: + """Filter genres through whitelist and blacklist. - Returns all genres if no whitelist is configured, otherwise returns - only genres that are in the whitelist. + Drops empty/whitespace-only strings, then applies whitelist and + blacklist checks. Returns all genres if neither is configured. + Whitelist is checked first for performance reasons (blacklist regex + matching is more expensive). """ - # First, drop any falsy or whitespace-only genre strings to avoid - # retaining empty tags from multi-valued fields. cleaned = [g for g in genres if g and g.strip()] - if not self.whitelist: + if not self.whitelist and not self.blacklist: return cleaned - return [g for g in cleaned if g.lower() in self.whitelist] + result = [] + for genre in cleaned: + if self.whitelist and genre.lower() not in self.whitelist: + continue + if self._is_blacklisted(genre, artist): + continue + result.append(genre) + return result + + def _is_blacklisted(self, genre: str, artist: str | None = None) -> bool: + """Check if genre tag should be forbidden. + + Returns True if: + - tag matches global blacklist, OR + - artist matches artist blacklist AND tag matches artist blacklist. + + Tags pass if no blacklist match found. + + KEEP DUPLICATED IN SYNC with client._is_blacklisted(). + TODO: Extract to filters.py when regex genre aliases land (next PR). + """ + if not self.blacklist: + return False + genre_lower = genre.lower() + for pattern in self.blacklist.get("*") or []: + if pattern.search(genre_lower): + return True + if artist: + for pattern in self.blacklist.get(artist.lower()) or []: + if pattern.search(genre_lower): + return True + return False # Genre resolution pipeline. @@ -293,13 +436,13 @@ def _get_existing_genres(self, obj: LibModel) -> list[str]: return genres_list def _combine_resolve_and_log( - self, old: list[str], new: list[str] + self, old: list[str], new: list[str], artist: str | None = None ) -> list[str]: """Combine old and new genres and process via _resolve_genres.""" self._log.debug("raw last.fm tags: {}", new) self._log.debug("existing genres taken into account: {}", old) combined = old + new - return self._resolve_genres(combined) + return self._resolve_genres(combined, artist=artist) def _get_genre(self, obj: LibModel) -> tuple[list[str], str]: """Get the final genre list for an Album or Item object. @@ -321,12 +464,21 @@ def _get_genre(self, obj: LibModel) -> tuple[list[str], str]: and the whitelist feature was disabled. """ + def _fallback_stage() -> tuple[list[str], str]: + """Return the fallback genre and label.""" + if fallback := self.config["fallback"].get(): + return [fallback], "fallback" + return [], "fallback unconfigured" + def _try_resolve_stage( - stage_label: str, keep_genres: list[str], new_genres: list[str] + stage_label: str, + keep_genres: list[str], + new_genres: list[str], + artist: str | None = None, ) -> tuple[list[str], str] | None: """Try to resolve genres for a given stage and log the result.""" resolved_genres = self._combine_resolve_and_log( - keep_genres, new_genres + keep_genres, new_genres, artist=artist ) if resolved_genres: suffix = "whitelist" if self.whitelist else "any" @@ -346,11 +498,15 @@ def _try_resolve_stage( # If none are found, we use the fallback (if set). if self.config["cleanup_existing"]: keep_genres = [g.lower() for g in genres] - if result := _try_resolve_stage("cleanup", keep_genres, []): + cleanup_artist = getattr(obj, "albumartist", None) or getattr( + obj, "artist", None + ) + if result := _try_resolve_stage( + "cleanup", keep_genres, [], artist=cleanup_artist + ): return result - # Return fallback string (None if not set). - return self.config["fallback"].get(), "fallback" + return _fallback_stage() # If cleanup_existing is not set, the pre-populated tags are # returned as-is. @@ -369,7 +525,7 @@ def _try_resolve_stage( obj.artist, obj.title ): if result := _try_resolve_stage( - "track", keep_genres, new_genres + "track", keep_genres, new_genres, artist=obj.artist ): return result @@ -378,18 +534,21 @@ def _try_resolve_stage( obj.albumartist, obj.album ): if result := _try_resolve_stage( - "album", keep_genres, new_genres + "album", keep_genres, new_genres, artist=obj.albumartist ): return result if "artist" in self.sources: new_genres = [] + stage_artist: str | None = None if isinstance(obj, library.Item): new_genres = self.client.fetch_artist_genre(obj.artist) stage_label = "artist" + stage_artist = obj.artist elif obj.albumartist != config["va_name"].as_str(): new_genres = self.client.fetch_artist_genre(obj.albumartist) stage_label = "album artist" + stage_artist = obj.albumartist if not new_genres: extra_debug( self._log, @@ -434,27 +593,30 @@ def _try_resolve_stage( if new_genres: if result := _try_resolve_stage( - stage_label, keep_genres, new_genres + stage_label, keep_genres, new_genres, artist=stage_artist ): return result # Nothing found, leave original if configured and valid. - if genres and self.config["keep_existing"]: - if valid_genres := self._filter_valid(genres): + if genres and self.config["keep_existing"].get(): + if isinstance(obj, library.Item): + # For track items, use track artist (important for compilations). + artist = getattr(obj, "artist", None) + else: + # For albums, prefer albumartist, fall back to artist. + artist = getattr(obj, "albumartist", None) or getattr( + obj, "artist", None + ) + if valid_genres := self._filter_valid(genres, artist=artist): return valid_genres, "original fallback" # If the original genre doesn't match a whitelisted genre, check # if we can canonicalize it to find a matching, whitelisted genre! if result := _try_resolve_stage( - "original fallback", keep_genres, [] + "original fallback", keep_genres, [], artist=artist ): return result - # Return fallback as a list. - if fallback := self.config["fallback"].get(): - return [fallback], "fallback" - - # No fallback configured. - return [], "fallback unconfigured" + return _fallback_stage() # Beets plugin hooks and CLI. diff --git a/beetsplug/lastgenre/client.py b/beetsplug/lastgenre/client.py index 602acb6410..db66c5d43c 100644 --- a/beetsplug/lastgenre/client.py +++ b/beetsplug/lastgenre/client.py @@ -27,6 +27,7 @@ from beets.logging import extra_debug if TYPE_CHECKING: + import re from collections.abc import Callable from beets.logging import BeetsLogger @@ -36,6 +37,9 @@ Keys are formatted as 'entity.arg1-arg2-...' (e.g., 'album.artist-title'). Values are lists of lowercase genre strings.""" + Blacklist = dict[str, list[re.Pattern[str]]] + """Mapping of artist name to list of compiled case-insensitive patterns.""" + LASTFM = pylast.LastFMNetwork(api_key=plugins.LASTFM_KEY) @@ -49,15 +53,40 @@ class LastFmClient: """Client for fetching genres from Last.fm.""" - def __init__(self, log: BeetsLogger, min_weight: int): + def __init__(self, log: BeetsLogger, min_weight: int, blacklist: Blacklist): """Initialize the client. The min_weight parameter filters tags by their minimum weight. + The blacklist filters forbidden genres directly after Last.fm lookup. """ self._log = log self._min_weight = min_weight + self._blacklist: Blacklist = blacklist self._genre_cache: GenreCache = {} + def _is_blacklisted(self, genre: str, artist: str) -> bool: + """Check if genre tag should be forbidden. + + Returns True if: + - tag matches global blacklist, OR + - artist matches artist blacklist AND tag matches artist blacklist. + + Tags pass if no blacklist match found. + + KEEP DUPLICATED IN SYNC with LastGenrePlugin._is_blacklisted(). + TODO: Extract to filters.py when regex genre aliases land (next PR). + """ + if not self._blacklist: + return False + genre_lower = genre.lower() + for pattern in self._blacklist.get("*") or []: + if pattern.search(genre_lower): + return True + for pattern in self._blacklist.get(artist.lower()) or []: + if pattern.search(genre_lower): + return True + return False + def fetch_genre( self, lastfm_obj: pylast.Album | pylast.Artist | pylast.Track ) -> list[str]: @@ -130,6 +159,22 @@ def _last_lookup( extra_debug( self._log, "last.fm (unfiltered) {} tags: {}", entity, genre ) + + # Filter forbidden genres + if genre and len(args) >= 1: + # For all current lastfm API calls, the first argument is always the artist: + # - get_album(artist, album) + # - get_artist(artist) + # - get_track(artist, title) + artist = args[0] + filtered_genre = [ + g for g in genre if not self._is_blacklisted(g, artist) + ] + if filtered_genre != genre: + log_filtered = set(genre) - set(filtered_genre) + extra_debug(self._log, "blacklisted: {}", log_filtered) + genre = filtered_genre + return genre def fetch_album_genre(self, albumartist: str, albumtitle: str) -> list[str]: diff --git a/docs/plugins/lastgenre.rst b/docs/plugins/lastgenre.rst index b0c01eabd2..2beb485501 100644 --- a/docs/plugins/lastgenre.rst +++ b/docs/plugins/lastgenre.rst @@ -160,6 +160,36 @@ genres remain, set ``whitelist: no``). If ``force`` is disabled the ``keep_existing`` option is simply ignored (since ``force: no`` means ``not touching`` existing tags anyway). +Blacklisting Genres +------------------- + +Last.fm tags are crowd-sourced, so they can be wrong — especially for artists +whose names are shared with or confused with others. For example, a drum and +bass artist named "Fracture" might incorrectly receive metal tags. The blacklist +lets you reject specific genres globally or per-artist. A possible ``blacklist`` +file would look like this: + +.. code-block:: text + + fracture: + ^(heavy|black|power|death)?\s?(metal|rock)$|\w+-metal\d*$ + progressive metal + bob marley: + ska + *: + electronic + +A combination of regex patterns and plain genre names is possible. The ``*`` key +applies globally to all artists — use it to block genres you never want, +regardless of artist. + +Set the ``blacklist`` option to the path of a blacklist file to enable this +feature. + +.. attention:: + + Do not use single or double quotes around the genre names or regex patterns. + Configuration ------------- @@ -200,6 +230,9 @@ file. The available options are: internal whitelist, or ``no`` to consider all genres valid. Default: ``yes``. - **title_case**: Convert the new tags to TitleCase before saving. Default: ``yes``. +- **blacklist**: The path to a blacklist file that contains genres to exclude + from being set as genres for specific artists. See `Blacklisting Genres`_ for + more details. Default: ``no``. Running Manually ---------------- diff --git a/test/plugins/test_lastgenre.py b/test/plugins/test_lastgenre.py index a619fd1b1b..2a799cd4d0 100644 --- a/test/plugins/test_lastgenre.py +++ b/test/plugins/test_lastgenre.py @@ -14,12 +14,17 @@ """Tests for the 'lastgenre' plugin.""" +import os +import re +import tempfile +from collections import defaultdict from unittest.mock import Mock, patch import pytest from beets.test import _common from beets.test.helper import IOMixin, PluginTestCase +from beets.ui import UserError from beetsplug import lastgenre @@ -202,6 +207,51 @@ def test_sort_by_depth(self): res = lastgenre.sort_by_depth(tags, self.plugin.c14n_branches) assert res == ["ambient", "electronic"] + def test_blacklist_filters_genres_in_resolve(self): + """Blacklisted genres are stripped by _resolve_genres (no c14n). + + Artist-specific and global patterns are both applied. + """ + self._setup_config(whitelist=False, canonical=False) + self.plugin.blacklist = defaultdict( + list, + { + "the artist": [re.compile(r"^metal$", re.IGNORECASE)], + "*": [re.compile(r"^rock$", re.IGNORECASE)], + }, + ) + result = self.plugin._resolve_genres( + ["metal", "rock", "jazz"], artist="the artist" + ) + assert "metal" not in result, ( + "artist-specific blacklisted genre must be removed" + ) + assert "rock" not in result, ( + "globally blacklisted genre must be removed" + ) + assert "jazz" in result, "non-blacklisted genre must survive" + + def test_blacklist_stops_c14n_ancestry_walk(self): + """A blacklisted tag's c14n parents don't bleed into the result. + + Without blacklist, 'delta blues' canonicalizes to 'blues'. + With 'delta blues' blacklisted the tag is skipped entirely in the + c14n loop, so 'blues' must not appear either. + """ + self._setup_config(whitelist=False, canonical=True, count=99) + self.plugin.blacklist = defaultdict( + list, + { + "the artist": [re.compile(r"^delta blues$", re.IGNORECASE)], + }, + ) + result = self.plugin._resolve_genres( + ["delta blues"], artist="the artist" + ) + assert result == [], ( + "blacklisted tag must not contribute c14n parents to the result" + ) + @pytest.fixture def config(config): @@ -614,3 +664,159 @@ def mock_fetch_artist_genre(self, artist): # Run assert plugin._get_genre(item) == expected_result + + +@pytest.mark.parametrize( + "blacklist_dict, artist, genre, expected_forbidden", + [ + # Global blacklist - simple word + ({"*": ["spoken word"]}, "Any Artist", "spoken word", True), + ({"*": ["spoken word"]}, "Any Artist", "jazz", False), + # Global blacklist - regex pattern + ({"*": [".*electronic.*"]}, "Any Artist", "ambient electronic", True), + ({"*": [".*electronic.*"]}, "Any Artist", "jazz", False), + # Artist-specific blacklist + ({"metallica": ["metal"]}, "Metallica", "metal", True), + ({"metallica": ["metal"]}, "Iron Maiden", "metal", False), + # Case insensitive matching + ({"metallica": ["metal"]}, "METALLICA", "METAL", True), + # Artist-specific blacklist - exact match + ({"metallica": ["^Heavy Metal$"]}, "Metallica", "classic metal", False), + # Combined global and artist-specific + ( + {"*": ["spoken word"], "metallica": ["metal"]}, + "Metallica", + "spoken word", + True, + ), + ( + {"*": ["spoken word"], "metallica": ["metal"]}, + "Metallica", + "metal", + True, + ), + # Complex regex pattern with multiple features (raw string) + ( + { + "fracture": [ + r"^(heavy|black|power|death)?\s?(metal|rock)$|\w+-metal\d*$" + ] + }, + "Fracture", + "power metal", + True, + ), + # Complex regex pattern with multiple features (regular string) + ( + {"amon tobin": ["d(rum)?[ n/]*b(ass)?"]}, + "Amon Tobin", + "dnb", + True, + ), + # Empty blacklist + ({}, "Any Artist", "any genre", False), + ], +) +def test_blacklist_patterns(blacklist_dict, artist, genre, expected_forbidden): + """Test blacklist pattern matching logic directly.""" + + # Initialize plugin + plugin = lastgenre.LastGenrePlugin() + + # Set up compiled blacklist directly (skipping file parsing) + compiled_blacklist = defaultdict(list) + for artist_name, patterns in blacklist_dict.items(): + compiled_blacklist[artist_name.lower()] = [ + re.compile(pattern, re.IGNORECASE) for pattern in patterns + ] + + plugin.blacklist = compiled_blacklist + + # Test the _is_blacklisted method on the plugin + result = plugin._is_blacklisted(genre, artist) + assert result == expected_forbidden + + +@pytest.mark.parametrize( + "file_content, expected_blacklist", + [ + # Basic artist with pattern + ("metallica:\n metal", {"metallica": ["metal"]}), + # Global blacklist + ("*:\n spoken word", {"*": ["spoken word"]}), + # Multiple patterns per artist + ( + "metallica:\n metal\n .*rock.*", + {"metallica": ["metal", ".*rock.*"]}, + ), + # Comments and empty lines ignored + ( + "# comment\n*:\n spoken word\n\nmetallica:\n metal", + {"*": ["spoken word"], "metallica": ["metal"]}, + ), + # Case insensitive artist names — key lowercased, pattern kept as-is + # (patterns compiled with re.IGNORECASE so case doesn't matter for matching) + ("METALLICA:\n METAL", {"metallica": ["METAL"]}), + # Invalid regex pattern that gets escaped + ("artist:\n [invalid(regex", {"artist": ["\\[invalid\\(regex"]}), + # Empty file + ("", {}), + ], +) +def test_blacklist_file_format(config, file_content, expected_blacklist): + """Test blacklist file format parsing.""" + + with tempfile.NamedTemporaryFile( + mode="w", suffix=".txt", delete=False, encoding="utf-8" + ) as f: + f.write(file_content) + blacklist_file = f.name + + try: + config["lastgenre"]["blacklist"] = blacklist_file + plugin = lastgenre.LastGenrePlugin() + + # Convert compiled regex patterns back to strings for comparison + string_blacklist = { + artist: [p.pattern for p in patterns] + for artist, patterns in plugin.blacklist.items() + } + + assert string_blacklist == expected_blacklist + + finally: + os.unlink(blacklist_file) + + +@pytest.mark.parametrize( + "invalid_content, expected_error_message", + [ + # Missing colon + ("metallica\n metal", "Malformed blacklist section header"), + # Pattern before section + (" metal\nmetallica:\n heavy metal", "before any section header"), + # Unindented pattern + ("metallica:\nmetal", "Malformed blacklist section header"), + ], +) +def test_blacklist_file_format_errors( + config, invalid_content, expected_error_message +): + """Test blacklist file format error handling.""" + + with tempfile.NamedTemporaryFile( + mode="w", suffix=".txt", delete=False, encoding="utf-8" + ) as f: + f.write(invalid_content) + blacklist_file = f.name + + try: + config["lastgenre"]["blacklist"] = blacklist_file + + with pytest.raises(UserError) as exc_info: + lastgenre.LastGenrePlugin() + + assert expected_error_message in str(exc_info.value) + + finally: + os.unlink(blacklist_file)