Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 200 additions & 38 deletions beetsplug/lastgenre/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from __future__ import annotations

import os
import re
from collections import defaultdict
from functools import singledispatchmethod
from pathlib import Path
from typing import TYPE_CHECKING, Any
Expand All @@ -34,6 +36,7 @@
from beets import config, library, plugins, ui
from beets.library import Album, Item
from beets.logging import extra_debug
from beets.ui import UserError
from beets.util import plurality, unique_list

from .client import LastFmClient
Expand All @@ -52,6 +55,12 @@
"""Genre hierarchy as list of paths from general to specific.
Example: [['electronic', 'house'], ['electronic', 'techno']]"""

RawBlacklist = dict[str, list[str]]
"""Mapping of artist name to list of raw regex/string patterns."""

Blacklist = dict[str, list[re.Pattern[str]]]
"""Mapping of artist name to list of compiled case-insensitive patterns."""


# Canonicalization tree processing.

Expand Down Expand Up @@ -131,6 +140,7 @@ def __init__(self) -> None:
"prefer_specific": False,
"title_case": True,
"pretend": False,
"blacklist": False,
}
)
self.setup()
Expand All @@ -140,12 +150,13 @@ def setup(self) -> None:
if self.config["auto"]:
self.import_stages = [self.imported]

self.client = LastFmClient(
self._log, self.config["min_weight"].get(int)
)
self.whitelist: Whitelist = self._load_whitelist()
self.c14n_branches: CanonTree
self.c14n_branches, self.canonicalize = self._load_c14n_tree()
self.blacklist: Blacklist = self._load_blacklist()
self.client = LastFmClient(
self._log, self.config["min_weight"].get(int), self.blacklist
)

def _load_whitelist(self) -> Whitelist:
"""Load the whitelist from a text file.
Expand Down Expand Up @@ -188,6 +199,99 @@ def _load_c14n_tree(self) -> tuple[CanonTree, bool]:
flatten_tree(genres_tree, [], c14n_branches)
return c14n_branches, canonicalize

def _load_blacklist(self) -> Blacklist:
"""Load the blacklist from a configured file path.

For maximum compatibility with regex patterns, a custom format is used:
- Each section starts with an artist name, followed by a colon.
- Subsequent lines are indented (at least one space, typically 4 spaces)
and contain a regex pattern to match a genre.
- A '*' key for artist can be used to specify global forbidden genres.

Returns a compiled blacklist dictionary mapping artist names to lists of
case-insensitive regex patterns. Returns empty dict if not configured.

Example blacklist file format::

Artist Name:
.*rock.*
.*metal.*
Another Artist Name:
^jazz$
*:
spoken word
comedy

Raises:
UserError: if the file format is invalid.
"""
blacklist_raw: RawBlacklist = defaultdict(list)
bl_filename = self.config["blacklist"].get()
if not bl_filename:
return {}
if not isinstance(bl_filename, str):
raise UserError(
"Invalid value for lastgenre.blacklist: expected path string "
f"or 'no', got {bl_filename!r} (type {type(bl_filename).__name__})"
)

self._log.debug("Loading blacklist file {}", bl_filename)
section = None
with Path(bl_filename).expanduser().open(encoding="utf-8") as f:
for lineno, line in enumerate(f, 1):
if not line.strip() or line.lstrip().startswith("#"):
continue
if not line.startswith(" "):
# Section header
header = line.strip().lower()
if not header.endswith(":"):
raise UserError(
f"Malformed blacklist section header "
f"at line {lineno}: {line}"
)
section = header[:-1].rstrip()
if not section:
raise UserError(
f"Empty blacklist section name "
f"at line {lineno}: {line}"
)
else:
# Pattern line: must be indented (at least one space)
if section is None:
raise UserError(
f"Blacklist regex pattern line before any section header "
f"at line {lineno}: {line}"
)
blacklist_raw[section].append(line.strip())
extra_debug(self._log, "Blacklist: {}", blacklist_raw)
return self._compile_blacklist_patterns(blacklist_raw)

@staticmethod
def _compile_blacklist_patterns(
blacklist: RawBlacklist,
) -> Blacklist:
"""Compile blacklist patterns into regex objects.

Tries regex compilation first, falls back to literal string matching.
That way users can use regexes for flexible matching but also simple
strings without worrying about regex syntax. All patterns are
case-insensitive.
Comment on lines +275 to +278
"""
compiled_blacklist: defaultdict[str, list[re.Pattern[str]]] = (
defaultdict(list)
)
for artist, patterns in blacklist.items():
compiled_patterns = []
for pattern in patterns:
try:
compiled_patterns.append(re.compile(pattern, re.IGNORECASE))
except re.error:
compiled_patterns.append(
re.compile(re.escape(pattern), re.IGNORECASE)
)
compiled_blacklist[artist] = compiled_patterns
return compiled_blacklist

@property
def sources(self) -> tuple[str, ...]:
"""A tuple of allowed genre sources. May contain 'track',
Expand All @@ -203,7 +307,9 @@ def sources(self) -> tuple[str, ...]:

# Genre list processing.

def _resolve_genres(self, tags: list[str]) -> list[str]:
def _resolve_genres(
self, tags: list[str], artist: str | None = None
) -> list[str]:
"""Canonicalize, sort and filter a list of genres.

- Returns an empty list if the input tags list is empty.
Expand All @@ -230,11 +336,15 @@ def _resolve_genres(self, tags: list[str]) -> list[str]:
# Extend the list to consider tags parents in the c14n tree
tags_all = []
for tag in tags:
# Add parents that are in the whitelist, or add the oldest
# ancestor if no whitelist
if self.whitelist:
# Skip blacklisted tags entirely — don't walk their ancestry.
if self._is_blacklisted(tag, artist):
continue
# Add parents that pass whitelist/blacklist, or add the oldest
# ancestor if neither is configured
if self.whitelist or self.blacklist:
parents = self._filter_valid(
find_parents(tag, self.c14n_branches)
find_parents(tag, self.c14n_branches),
artist=artist,
)
else:
parents = [find_parents(tag, self.c14n_branches)[-1]]
Expand All @@ -255,24 +365,57 @@ def _resolve_genres(self, tags: list[str]) -> list[str]:
if self.config["prefer_specific"]:
tags = sort_by_depth(tags, self.c14n_branches)

# c14n only adds allowed genres but we may have had forbidden genres in
# the original tags list
valid_tags = self._filter_valid(tags)
# Final filter: catches forbidden genres when c14n is disabled,
# and any that survived the loop without whitelist/blacklist active.
valid_tags = self._filter_valid(tags, artist=artist)
return valid_tags[:count]

def _filter_valid(self, genres: Iterable[str]) -> list[str]:
"""Filter genres based on whitelist.
def _filter_valid(
self, genres: Iterable[str], artist: str | None = None
) -> list[str]:
"""Filter genres through whitelist and blacklist.

Returns all genres if no whitelist is configured, otherwise returns
only genres that are in the whitelist.
Drops empty/whitespace-only strings, then applies whitelist and
blacklist checks. Returns all genres if neither is configured.
Whitelist is checked first for performance reasons (blacklist regex
matching is more expensive).
"""
# First, drop any falsy or whitespace-only genre strings to avoid
# retaining empty tags from multi-valued fields.
cleaned = [g for g in genres if g and g.strip()]
if not self.whitelist:
if not self.whitelist and not self.blacklist:
return cleaned

return [g for g in cleaned if g.lower() in self.whitelist]
result = []
for genre in cleaned:
if self.whitelist and genre.lower() not in self.whitelist:
continue
if self._is_blacklisted(genre, artist):
continue
result.append(genre)
return result

def _is_blacklisted(self, genre: str, artist: str | None = None) -> bool:
"""Check if genre tag should be forbidden.

Returns True if:
- tag matches global blacklist, OR
- artist matches artist blacklist AND tag matches artist blacklist.

Tags pass if no blacklist match found.

KEEP DUPLICATED IN SYNC with client._is_blacklisted().
TODO: Extract to filters.py when regex genre aliases land (next PR).
"""
if not self.blacklist:
return False
genre_lower = genre.lower()
for pattern in self.blacklist.get("*") or []:
if pattern.search(genre_lower):
return True
if artist:
for pattern in self.blacklist.get(artist.lower()) or []:
if pattern.search(genre_lower):
return True
return False

# Genre resolution pipeline.

Expand All @@ -293,13 +436,13 @@ def _get_existing_genres(self, obj: LibModel) -> list[str]:
return genres_list

def _combine_resolve_and_log(
self, old: list[str], new: list[str]
self, old: list[str], new: list[str], artist: str | None = None
) -> list[str]:
"""Combine old and new genres and process via _resolve_genres."""
self._log.debug("raw last.fm tags: {}", new)
self._log.debug("existing genres taken into account: {}", old)
combined = old + new
return self._resolve_genres(combined)
return self._resolve_genres(combined, artist=artist)

def _get_genre(self, obj: LibModel) -> tuple[list[str], str]:
"""Get the final genre list for an Album or Item object.
Expand All @@ -321,12 +464,21 @@ def _get_genre(self, obj: LibModel) -> tuple[list[str], str]:
and the whitelist feature was disabled.
"""

def _fallback_stage() -> tuple[list[str], str]:
"""Return the fallback genre and label."""
if fallback := self.config["fallback"].get():
return [fallback], "fallback"
return [], "fallback unconfigured"

def _try_resolve_stage(
stage_label: str, keep_genres: list[str], new_genres: list[str]
stage_label: str,
keep_genres: list[str],
new_genres: list[str],
artist: str | None = None,
) -> tuple[list[str], str] | None:
"""Try to resolve genres for a given stage and log the result."""
resolved_genres = self._combine_resolve_and_log(
keep_genres, new_genres
keep_genres, new_genres, artist=artist
)
if resolved_genres:
suffix = "whitelist" if self.whitelist else "any"
Expand All @@ -346,11 +498,15 @@ def _try_resolve_stage(
# If none are found, we use the fallback (if set).
if self.config["cleanup_existing"]:
keep_genres = [g.lower() for g in genres]
if result := _try_resolve_stage("cleanup", keep_genres, []):
cleanup_artist = getattr(obj, "albumartist", None) or getattr(
obj, "artist", None
)
if result := _try_resolve_stage(
"cleanup", keep_genres, [], artist=cleanup_artist
):
return result

# Return fallback string (None if not set).
return self.config["fallback"].get(), "fallback"
return _fallback_stage()

# If cleanup_existing is not set, the pre-populated tags are
# returned as-is.
Expand All @@ -369,7 +525,7 @@ def _try_resolve_stage(
obj.artist, obj.title
):
if result := _try_resolve_stage(
"track", keep_genres, new_genres
"track", keep_genres, new_genres, artist=obj.artist
):
return result

Expand All @@ -378,18 +534,21 @@ def _try_resolve_stage(
obj.albumartist, obj.album
):
if result := _try_resolve_stage(
"album", keep_genres, new_genres
"album", keep_genres, new_genres, artist=obj.albumartist
):
return result

if "artist" in self.sources:
new_genres = []
stage_artist: str | None = None
if isinstance(obj, library.Item):
new_genres = self.client.fetch_artist_genre(obj.artist)
stage_label = "artist"
stage_artist = obj.artist
elif obj.albumartist != config["va_name"].as_str():
new_genres = self.client.fetch_artist_genre(obj.albumartist)
stage_label = "album artist"
stage_artist = obj.albumartist
if not new_genres:
extra_debug(
self._log,
Expand Down Expand Up @@ -434,27 +593,30 @@ def _try_resolve_stage(

if new_genres:
if result := _try_resolve_stage(
stage_label, keep_genres, new_genres
stage_label, keep_genres, new_genres, artist=stage_artist
):
return result

# Nothing found, leave original if configured and valid.
if genres and self.config["keep_existing"]:
if valid_genres := self._filter_valid(genres):
if genres and self.config["keep_existing"].get():
if isinstance(obj, library.Item):
# For track items, use track artist (important for compilations).
artist = getattr(obj, "artist", None)
else:
# For albums, prefer albumartist, fall back to artist.
artist = getattr(obj, "albumartist", None) or getattr(
obj, "artist", None
)
if valid_genres := self._filter_valid(genres, artist=artist):
return valid_genres, "original fallback"
# If the original genre doesn't match a whitelisted genre, check
# if we can canonicalize it to find a matching, whitelisted genre!
if result := _try_resolve_stage(
"original fallback", keep_genres, []
"original fallback", keep_genres, [], artist=artist
):
return result

# Return fallback as a list.
if fallback := self.config["fallback"].get():
return [fallback], "fallback"

# No fallback configured.
return [], "fallback unconfigured"
return _fallback_stage()

# Beets plugin hooks and CLI.

Expand Down
Loading
Loading