Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions beets/autotag/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from typing_extensions import Self

from beets import plugins
from beets.util import cached_classproperty
from beets.util.deprecation import deprecate_for_maintainers

Expand Down Expand Up @@ -59,6 +60,18 @@ def __hash__(self) -> int: # type: ignore[override]
class Info(AttrDict[Any]):
"""Container for metadata about a musical entity."""

Identifier = tuple[str | None, str | None]

@property
def id(self) -> str | None:
"""Return the provider-specific identifier for this metadata object."""
raise NotImplementedError

@property
def identifier(self) -> Identifier:
"""Return a cross-provider key in ``(data_source, id)`` form."""
return (self.data_source, self.id)

@cached_property
def name(self) -> str:
raise NotImplementedError
Expand Down Expand Up @@ -118,6 +131,10 @@ class AlbumInfo(Info):
user items, and later to drive tagging decisions once selected.
"""

@property
def id(self) -> str | None:
return self.album_id

@cached_property
def name(self) -> str:
return self.album or ""
Expand Down Expand Up @@ -194,6 +211,10 @@ class TrackInfo(Info):
stand alone for singleton matching.
"""

@property
def id(self) -> str | None:
return self.track_id

@cached_property
def name(self) -> str:
return self.title or ""
Expand Down Expand Up @@ -262,6 +283,10 @@ class AlbumMatch(Match):
extra_items: list[Item]
extra_tracks: list[TrackInfo]

def __post_init__(self) -> None:
"""Notify listeners when an album candidate has been matched."""
plugins.send("album_matched", match=self)

@property
def item_info_pairs(self) -> list[tuple[Item, TrackInfo]]:
return list(self.mapping.items())
Expand Down
135 changes: 63 additions & 72 deletions beets/autotag/match.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,28 @@
from __future__ import annotations

from enum import IntEnum
from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar
from typing import TYPE_CHECKING, NamedTuple, TypeVar

import lap
import numpy as np

from beets import config, logging, metadata_plugins, plugins
from beets import config, logging, metadata_plugins
from beets.autotag import AlbumMatch, TrackMatch, hooks
from beets.util import get_most_common_tags

from .distance import VA_ARTISTS, distance, track_distance
from .hooks import Info

if TYPE_CHECKING:
from collections.abc import Iterable, Sequence

from beets.autotag import AlbumInfo, TrackInfo
from beets.library import Item


AnyMatch = TypeVar("AnyMatch", TrackMatch, AlbumMatch)
Candidates = dict[Info.Identifier, AnyMatch]

# Global logger.
log = logging.getLogger("beets")

Expand Down Expand Up @@ -99,28 +104,21 @@ def assign_items(
return list(mapping.items()), extra_items, extra_tracks


def match_by_id(items: Iterable[Item]) -> AlbumInfo | None:
"""If the items are tagged with an external source ID, return an
AlbumInfo object for the corresponding album. Otherwise, returns
None.
"""
albumids = (item.mb_albumid for item in items if item.mb_albumid)
def match_by_id(album_id: str | None, consensus: bool) -> Iterable[AlbumInfo]:
"""Return album candidates for the given album id.

# Did any of the items have an MB album ID?
try:
first = next(albumids)
except StopIteration:
Make sure that the ID is present and that there is consensus on it among
the items being tagged.
"""
if not album_id:
log.debug("No album ID found.")
return None
elif not consensus:
log.debug("No album ID consensus.")
else:
log.debug("Searching for discovered album ID: {}", album_id)
return metadata_plugins.albums_for_ids([album_id])

# Is there a consensus on the MB album ID?
for other in albumids:
if other != first:
log.debug("No album ID consensus.")
return None
# If all album IDs are equal, look up the album.
log.debug("Searching for discovered album ID: {}", first)
return metadata_plugins.album_for_id(first)
return ()


def _recommendation(
Expand Down Expand Up @@ -180,33 +178,33 @@ def _recommendation(
return rec


AnyMatch = TypeVar("AnyMatch", TrackMatch, AlbumMatch)


def _sort_candidates(candidates: Iterable[AnyMatch]) -> Sequence[AnyMatch]:
"""Sort candidates by distance."""
return sorted(candidates, key=lambda match: match.distance)


def _add_candidate(
items: Sequence[Item],
results: dict[Any, AlbumMatch],
results: Candidates[AlbumMatch],
info: AlbumInfo,
):
"""Given a candidate AlbumInfo object, attempt to add the candidate
to the output dictionary of AlbumMatch objects. This involves
checking the track count, ordering the items, checking for
duplicates, and calculating the distance.
"""
log.debug("Candidate: {0.artist} - {0.album} ({0.album_id})", info)
log.debug(
"Candidate: {0.artist} - {0.album} ({0.album_id}) from {0.data_source}",
info,
)

# Discard albums with zero tracks.
if not info.tracks:
log.debug("No tracks.")
return

# Prevent duplicates.
if info.album_id and info.album_id in results:
if info.album_id and info.identifier in results:
log.debug("Duplicate.")
return

Expand Down Expand Up @@ -234,7 +232,7 @@ def _add_candidate(
return

log.debug("Success. Distance: {}", dist)
results[info.album_id] = hooks.AlbumMatch(
results[info.identifier] = hooks.AlbumMatch(
dist, info, dict(item_info_pairs), extra_items, extra_tracks
)

Expand Down Expand Up @@ -268,39 +266,37 @@ def tag_album(
cur_album: str = likelies["album"]
log.debug("Tagging {} - {}", cur_artist, cur_album)

# The output result, keys are the MB album ID.
candidates: dict[Any, AlbumMatch] = {}
# The output result, keys are (data_source, album_id) pairs, values are
# AlbumMatch objects.
candidates: Candidates[AlbumMatch] = {}

# Search by explicit ID.
if search_ids:
for search_id in search_ids:
log.debug("Searching for album ID: {}", search_id)
if info := metadata_plugins.album_for_id(search_id):
_add_candidate(items, candidates, info)
if opt_candidate := candidates.get(info.album_id):
plugins.send("album_matched", match=opt_candidate)
log.debug("Searching for album IDs: {}", ", ".join(search_ids))
for _info in metadata_plugins.albums_for_ids(search_ids):
_add_candidate(items, candidates, _info)

# Use existing metadata or text search.
else:
# Try search based on current ID.
if info := match_by_id(items):
for info in match_by_id(
likelies["mb_albumid"], consensus["mb_albumid"]
):
_add_candidate(items, candidates, info)
for candidate in candidates.values():
plugins.send("album_matched", match=candidate)

rec = _recommendation(list(candidates.values()))
log.debug("Album ID match recommendation is {}", rec)
if candidates and not config["import"]["timid"]:
# If we have a very good MBID match, return immediately.
# Otherwise, this match will compete against metadata-based
# matches.
if rec == Recommendation.strong:
log.debug("ID match.")
return (
cur_artist,
cur_album,
Proposal(list(candidates.values()), rec),
)

rec = _recommendation(list(candidates.values()))
log.debug("Album ID match recommendation is {}", rec)
if candidates and not config["import"]["timid"]:
# If we have a very good MBID match, return immediately.
# Otherwise, this match will compete against metadata-based
# matches.
if rec == Recommendation.strong:
log.debug("ID match.")
return (
cur_artist,
cur_album,
Proposal(list(candidates.values()), rec),
)

# Search terms.
if not (search_artist and search_name):
Expand All @@ -321,8 +317,6 @@ def tag_album(
items, search_artist, search_name, va_likely
):
_add_candidate(items, candidates, matched_candidate)
if opt_candidate := candidates.get(matched_candidate.album_id):
plugins.send("album_matched", match=opt_candidate)

log.debug("Evaluating {} candidates.", len(candidates))
# Sort and get the recommendation.
Expand All @@ -344,27 +338,24 @@ def tag_item(
metadata in the search query. `search_ids` may be used for restricting the
search to a list of metadata backend IDs.
"""
# Holds candidates found so far: keys are MBIDs; values are
# (distance, TrackInfo) pairs.
candidates = {}
# Holds candidates found so far: keys are (data_source, track_id) pairs,
# values TrackMatch objects
candidates: Candidates[TrackMatch] = {}
rec: Recommendation | None = None

# First, try matching by the external source ID.
trackids = search_ids or [t for t in [item.mb_trackid] if t]
if trackids:
for trackid in trackids:
log.debug("Searching for track ID: {}", trackid)
if info := metadata_plugins.track_for_id(trackid):
dist = track_distance(item, info, incl_artist=True)
candidates[info.track_id] = hooks.TrackMatch(dist, info)
# If this is a good match, then don't keep searching.
rec = _recommendation(_sort_candidates(candidates.values()))
if (
rec == Recommendation.strong
and not config["import"]["timid"]
):
log.debug("Track ID match.")
return Proposal(_sort_candidates(candidates.values()), rec)
log.debug("Searching for track IDs: {}", ", ".join(trackids))
for info in metadata_plugins.tracks_for_ids(trackids):
dist = track_distance(item, info, incl_artist=True)
candidates[info.identifier] = hooks.TrackMatch(dist, info)

# If this is a good match, then don't keep searching.
rec = _recommendation(_sort_candidates(candidates.values()))
if rec == Recommendation.strong and not config["import"]["timid"]:
log.debug("Track ID match.")
return Proposal(_sort_candidates(candidates.values()), rec)

# If we're searching by ID, don't proceed.
if search_ids:
Expand All @@ -384,7 +375,7 @@ def tag_item(
item, search_artist, search_name
):
dist = track_distance(item, track_info, incl_artist=True)
candidates[track_info.track_id] = hooks.TrackMatch(dist, track_info)
candidates[track_info.identifier] = hooks.TrackMatch(dist, track_info)

# Sort by distance and return with recommendation.
log.debug("Found {} candidates.", len(candidates))
Expand Down
55 changes: 39 additions & 16 deletions beets/metadata_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import abc
import re
from contextlib import contextmanager, nullcontext
from contextlib import contextmanager
from functools import cache, cached_property, wraps
from typing import (
TYPE_CHECKING,
Expand All @@ -27,7 +27,7 @@
from beets.util import cached_classproperty
from beets.util.id_extractors import extract_release_id

from .plugins import BeetsPlugin, find_plugins, notify_info_yielded
from .plugins import BeetsPlugin, find_plugins, notify_info_yielded, send

Ret = TypeVar("Ret")
QueryType = Literal["album", "track"]
Expand All @@ -49,14 +49,27 @@ def find_metadata_source_plugins() -> list[MetadataSourcePlugin]:
return [p for p in find_plugins() if hasattr(p, "data_source")] # type: ignore[misc]


@cache
def get_metadata_source(name: str) -> MetadataSourcePlugin | None:
"""Get metadata source plugin by name."""
name = name.lower()
plugins = find_metadata_source_plugins()
return next((p for p in plugins if p.data_source.lower() == name), None)


@contextmanager
def handle_plugin_error(plugin: MetadataSourcePlugin, method_name: str):
def maybe_handle_plugin_error(plugin: MetadataSourcePlugin, method_name: str):
"""Safely call a plugin method, catching and logging exceptions."""
try:
if config["raise_on_error"]:
yield
except Exception as e:
log.error("Error in '{}.{}': {}", plugin.data_source, method_name, e)
log.debug("Exception details:", exc_info=True)
else:
try:
yield
except Exception as e:
log.error(
"Error in '{}.{}': {}", plugin.data_source, method_name, e
)
log.debug("Exception details:", exc_info=True)


def _yield_from_plugins(
Expand All @@ -68,11 +81,7 @@ def _yield_from_plugins(
def wrapper(*args, **kwargs) -> Iterator[Ret]:
for plugin in find_metadata_source_plugins():
method = getattr(plugin, method_name)
with (
nullcontext()
if config["raise_on_error"]
else handle_plugin_error(plugin, method_name)
):
with maybe_handle_plugin_error(plugin, method_name):
yield from filter(None, method(*args, **kwargs))

return wrapper
Expand Down Expand Up @@ -102,12 +111,26 @@ def tracks_for_ids(*args, **kwargs) -> Iterator[TrackInfo]:
yield from ()


def album_for_id(_id: str) -> AlbumInfo | None:
return next(albums_for_ids([_id]), None)
def album_for_id(_id: str, data_source: str) -> AlbumInfo | None:
"""Get AlbumInfo object for the given ID and data source."""
if plugin := get_metadata_source(data_source):
with maybe_handle_plugin_error(plugin, "album_for_id"):
if info := plugin.album_for_id(_id):
send("albuminfo_received", info=info)
return info

return None


def track_for_id(_id: str, data_source: str) -> TrackInfo | None:
"""Get TrackInfo object for the given ID and data source."""
if plugin := get_metadata_source(data_source):
with maybe_handle_plugin_error(plugin, "track_for_id"):
if info := plugin.track_for_id(_id):
send("trackinfo_received", info=info)
return info

def track_for_id(_id: str) -> TrackInfo | None:
return next(tracks_for_ids([_id]), None)
return None


@cache
Expand Down
Loading
Loading