Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions beets/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
"Logger",
"NullHandler",
"StreamHandler",
"extra_debug",
"getLogger",
]

Expand Down Expand Up @@ -193,6 +194,19 @@ class BeetsLogger(ThreadLocalLevelLogger, StrFormatLogger):
my_manager.loggerClass = BeetsLogger


def extra_debug(log: BeetsLogger, msg: str, *args: Any, **kwargs: Any) -> None:
"""Log a message at DEBUG level only when verbosity level is >= 3.

Intended for high-verbosity tuning/diagnostic messages that would be too
noisy at normal debug level.
"""
# Lazy import to avoid circular dependency (beets.__init__ -> beets.logging)
from beets import config

if config["verbose"].as_number() >= 3:
log.debug(msg, *args, **kwargs)


@overload
def getLogger(name: str) -> BeetsLogger: ...
@overload
Expand Down
200 changes: 57 additions & 143 deletions beetsplug/lastgenre/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,32 @@
from __future__ import annotations

import os
import traceback
from functools import singledispatchmethod
from pathlib import Path
from typing import TYPE_CHECKING, Any

import pylast
import yaml

from beets import config, library, plugins, ui
from beets.library import Album, Item
from beets.logging import extra_debug
from beets.util import plurality, unique_list

from .client import LastFmClient

if TYPE_CHECKING:
import optparse
from collections.abc import Callable, Iterable
from collections.abc import Iterable

from beets.importer import ImportSession, ImportTask
from beets.library import LibModel

LASTFM = pylast.LastFMNetwork(api_key=plugins.LASTFM_KEY)
Whitelist = set[str]
"""Set of valid genre names (lowercase). Empty set means all genres allowed."""

PYLAST_EXCEPTIONS = (
pylast.WSError,
pylast.MalformedResponseError,
pylast.NetworkError,
)
CanonTree = list[list[str]]
"""Genre hierarchy as list of paths from general to specific.
Example: [['electronic', 'house'], ['electronic', 'techno']]"""


# Canonicalization tree processing.
Expand All @@ -59,7 +59,7 @@
def flatten_tree(
elem: dict[Any, Any] | list[Any] | str,
path: list[str],
branches: list[list[str]],
branches: CanonTree,
) -> None:
"""Flatten nested lists/dictionaries into lists of strings
(branches).
Expand All @@ -77,7 +77,7 @@ def flatten_tree(
branches.append([*path, str(elem)])


def find_parents(candidate: str, branches: list[list[str]]) -> list[str]:
def find_parents(candidate: str, branches: CanonTree) -> list[str]:
"""Find parents genre of a given genre, ordered from the closest to
the further parent.
"""
Expand All @@ -90,6 +90,22 @@ def find_parents(candidate: str, branches: list[list[str]]) -> list[str]:
return [candidate]


def get_depth(tag: str, branches: CanonTree) -> int | None:
"""Find the depth of a tag in the genres tree."""
for branch in branches:
if tag in branch:
return branch.index(tag)
return None
Comment on lines +93 to +98
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm so sorry I didn't realize my ai agent also refactored when I asked it to move that function out of the class.

Actually this feels much simpler than the original and some more research revealed it might even be slightly more efficient.

I was curious though and asked my agent why a style like that would have been chosen back then and it could just be it was more typical for python code to not use early returns. Today this is more idiomatic.

Anyway an intersting accident and I'd say let's keep it



def sort_by_depth(tags: list[str], branches: CanonTree) -> list[str]:
"""Given a list of tags, sort the tags by their depths in the genre tree."""
depth_tag_pairs = [(get_depth(t, branches), t) for t in tags]
depth_tag_pairs = [e for e in depth_tag_pairs if e[0] is not None]
depth_tag_pairs.sort(reverse=True)
return [p[1] for p in depth_tag_pairs]


# Main plugin logic.

WHITELIST = os.path.join(os.path.dirname(__file__), "genres.txt")
Expand Down Expand Up @@ -124,11 +140,14 @@ def setup(self) -> None:
if self.config["auto"]:
self.import_stages = [self.imported]

self._genre_cache: dict[str, list[str]] = {}
self.whitelist = self._load_whitelist()
self.client = LastFmClient(
self._log, self.config["min_weight"].get(int)
)
self.whitelist: Whitelist = self._load_whitelist()
self.c14n_branches: CanonTree
self.c14n_branches, self.canonicalize = self._load_c14n_tree()

def _load_whitelist(self) -> set[str]:
def _load_whitelist(self) -> Whitelist:
"""Load the whitelist from a text file.

Default whitelist is used if config is True, empty string or set to "nothing".
Expand All @@ -146,13 +165,13 @@ def _load_whitelist(self) -> set[str]:

return whitelist

def _load_c14n_tree(self) -> tuple[list[list[str]], bool]:
def _load_c14n_tree(self) -> tuple[CanonTree, bool]:
"""Load the canonicalization tree from a YAML file.

Default tree is used if config is True, empty string, set to "nothing"
or if prefer_specific is enabled.
"""
c14n_branches: list[list[str]] = []
c14n_branches: CanonTree = []
c14n_filename = self.config["canonical"].get()
canonicalize = c14n_filename is not False
# Default tree
Expand All @@ -169,11 +188,6 @@ def _load_c14n_tree(self) -> tuple[list[list[str]], bool]:
flatten_tree(genres_tree, [], c14n_branches)
return c14n_branches, canonicalize

def _tunelog(self, msg: str, *args: Any, **kwargs: Any) -> None:
"""Log tuning messages at DEBUG level when verbosity level is high enough."""
if config["verbose"].as_number() >= 3:
self._log.debug(msg, *args, **kwargs)

@property
def sources(self) -> tuple[str, ...]:
"""A tuple of allowed genre sources. May contain 'track',
Expand All @@ -187,25 +201,7 @@ def sources(self) -> tuple[str, ...]:
}
)

# More canonicalization and general helpers.

def _get_depth(self, tag: str) -> int | None:
"""Find the depth of a tag in the genres tree."""
depth = None
for key, value in enumerate(self.c14n_branches):
if tag in value:
depth = value.index(tag)
break
return depth

def _sort_by_depth(self, tags: list[str]) -> list[str]:
"""Given a list of tags, sort the tags by their depths in the
genre tree.
"""
depth_tag_pairs = [(self._get_depth(t), t) for t in tags]
depth_tag_pairs = [e for e in depth_tag_pairs if e[0] is not None]
depth_tag_pairs.sort(reverse=True)
return [p[1] for p in depth_tag_pairs]
# Genre list processing.

def _resolve_genres(self, tags: list[str]) -> list[str]:
"""Canonicalize, sort and filter a list of genres.
Expand Down Expand Up @@ -257,22 +253,13 @@ def _resolve_genres(self, tags: list[str]) -> list[str]:

# Sort the tags by specificity.
if self.config["prefer_specific"]:
tags = self._sort_by_depth(tags)
tags = sort_by_depth(tags, self.c14n_branches)

# c14n only adds allowed genres but we may have had forbidden genres in
# the original tags list
valid_tags = self._filter_valid(tags)
return valid_tags[:count]

def fetch_genre(
self, lastfm_obj: pylast.Album | pylast.Artist | pylast.Track
) -> list[str]:
"""Return genres for a pylast entity. Returns an empty list if
no suitable genres are found.
"""
min_weight = self.config["min_weight"].get(int)
return self._tags_for(lastfm_obj, min_weight)

def _filter_valid(self, genres: Iterable[str]) -> list[str]:
"""Filter genres based on whitelist.

Expand All @@ -287,49 +274,7 @@ def _filter_valid(self, genres: Iterable[str]) -> list[str]:

return [g for g in cleaned if g.lower() in self.whitelist]

# Cached last.fm entity lookups.

def _last_lookup(
self, entity: str, method: Callable[..., Any], *args: str
) -> list[str]:
"""Get genres based on the named entity using the callable `method`
whose arguments are given in the sequence `args`. The genre lookup
is cached based on the entity name and the arguments.

Before the lookup, each argument has the "-" Unicode character replaced
with its rough ASCII equivalents in order to return better results from
the Last.fm database.
"""
# Shortcut if we're missing metadata.
if any(not s for s in args):
return []

key = f"{entity}.{'-'.join(str(a) for a in args)}"
if key not in self._genre_cache:
args_replaced = [a.replace("\u2010", "-") for a in args]
self._genre_cache[key] = self.fetch_genre(method(*args_replaced))

genre = self._genre_cache[key]
self._tunelog("last.fm (unfiltered) {} tags: {}", entity, genre)
return genre

def fetch_album_genre(self, albumartist: str, albumtitle: str) -> list[str]:
"""Return genres from Last.fm for the album by albumartist."""
return self._last_lookup(
"album", LASTFM.get_album, albumartist, albumtitle
)

def fetch_artist_genre(self, artist: str) -> list[str]:
"""Return genres from Last.fm for the artist."""
return self._last_lookup("artist", LASTFM.get_artist, artist)

def fetch_track_genre(self, trackartist: str, tracktitle: str) -> list[str]:
"""Return genres from Last.fm for the track by artist."""
return self._last_lookup(
"track", LASTFM.get_track, trackartist, tracktitle
)

# Main processing: _get_genre() and helpers.
# Genre resolution pipeline.

def _format_genres(self, tags: list[str]) -> list[str]:
"""Format to title case if configured."""
Expand Down Expand Up @@ -420,14 +365,18 @@ def _try_resolve_stage(
# Run through stages: track, album, artist,
# album artist, or most popular track genre.
if isinstance(obj, library.Item) and "track" in self.sources:
if new_genres := self.fetch_track_genre(obj.artist, obj.title):
if new_genres := self.client.fetch_track_genre(
obj.artist, obj.title
):
if result := _try_resolve_stage(
"track", keep_genres, new_genres
):
return result

if "album" in self.sources:
if new_genres := self.fetch_album_genre(obj.albumartist, obj.album):
if new_genres := self.client.fetch_album_genre(
obj.albumartist, obj.album
):
if result := _try_resolve_stage(
"album", keep_genres, new_genres
):
Expand All @@ -436,22 +385,27 @@ def _try_resolve_stage(
if "artist" in self.sources:
new_genres = []
if isinstance(obj, library.Item):
new_genres = self.fetch_artist_genre(obj.artist)
new_genres = self.client.fetch_artist_genre(obj.artist)
stage_label = "artist"
elif obj.albumartist != config["va_name"].as_str():
new_genres = self.fetch_artist_genre(obj.albumartist)
new_genres = self.client.fetch_artist_genre(obj.albumartist)
stage_label = "album artist"
if not new_genres:
self._tunelog(
extra_debug(
self._log,
'No album artist genre found for "{}", '
"trying multi-valued field...",
obj.albumartist,
)
for albumartist in obj.albumartists:
self._tunelog(
'Fetching artist genre for "{}"', albumartist
extra_debug(
self._log,
'Fetching artist genre for "{}"',
albumartist,
)
new_genres += self.client.fetch_artist_genre(
albumartist
)
new_genres += self.fetch_artist_genre(albumartist)
if new_genres:
stage_label = "multi-valued album artist"
else:
Expand All @@ -461,11 +415,11 @@ def _try_resolve_stage(
for item in obj.items():
item_genre = None
if "track" in self.sources:
item_genre = self.fetch_track_genre(
item_genre = self.client.fetch_track_genre(
item.artist, item.title
)
if not item_genre:
item_genre = self.fetch_artist_genre(item.artist)
item_genre = self.client.fetch_artist_genre(item.artist)
if item_genre:
item_genres += item_genre
if item_genres:
Expand Down Expand Up @@ -610,43 +564,3 @@ def lastgenre_func(

def imported(self, _: ImportSession, task: ImportTask) -> None:
self._process(task.album if task.is_album else task.item, write=False) # type: ignore[attr-defined]

def _tags_for(
self,
obj: pylast.Album | pylast.Artist | pylast.Track,
min_weight: int | None = None,
) -> list[str]:
"""Core genre identification routine.

Given a pylast entity (album or track), return a list of
tag names for that entity. Return an empty list if the entity is
not found or another error occurs.

If `min_weight` is specified, tags are filtered by weight.
"""
# Work around an inconsistency in pylast where
# Album.get_top_tags() does not return TopItem instances.
# https://github.com/pylast/pylast/issues/86
obj_to_query: Any = obj
if isinstance(obj, pylast.Album):
obj_to_query = super(pylast.Album, obj)

try:
res: Any = obj_to_query.get_top_tags()
except PYLAST_EXCEPTIONS as exc:
self._log.debug("last.fm error: {}", exc)
return []
except Exception as exc:
# Isolate bugs in pylast.
self._log.debug("{}", traceback.format_exc())
self._log.error("error in pylast library: {}", exc)
return []

# Filter by weight (optionally).
if min_weight:
res = [el for el in res if (int(el.weight or 0)) >= min_weight]

# Get strings from tags.
tags: list[str] = [el.item.get_name().lower() for el in res]

return tags
Loading
Loading