Skip to content

Opt-in caching to speed up search of redundant tracks #137

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions spotify_to_ytmusic/controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ def all(args):
count = count + 1
try:
playlist = spotify.getSpotifyPlaylist(p["external_urls"]["spotify"])
videoIds = ytmusic.search_songs(playlist["tracks"])
videoIds = ytmusic.search_songs(
playlist["tracks"], use_cached=args.use_cached
)
playlist_id = ytmusic.create_playlist(
p["name"],
p["description"],
Expand All @@ -61,7 +63,7 @@ def _create_ytmusic(args, playlist, ytmusic):
date = " " + datetime.today().strftime("%m/%d/%Y")
name = args.name + date if args.name else playlist["name"] + date
info = playlist["description"] if (args.info is None) else args.info
videoIds = ytmusic.search_songs(playlist["tracks"])
videoIds = ytmusic.search_songs(playlist["tracks"], use_cached=args.use_cached)
if args.like:
for id in videoIds:
ytmusic.rate_song(id, "LIKE")
Expand Down Expand Up @@ -90,7 +92,7 @@ def update(args):
spotify, ytmusic = _init()
playlist = _get_spotify_playlist(spotify, args.playlist)
playlistId = ytmusic.get_playlist_id(args.name)
videoIds = ytmusic.search_songs(playlist["tracks"])
videoIds = ytmusic.search_songs(playlist["tracks"], use_cached=args.use_cached)
if not args.append:
ytmusic.remove_songs(playlistId)
time.sleep(2)
Expand All @@ -102,5 +104,30 @@ def remove(args):
ytmusic.remove_playlists(args.pattern)


def search(args):
spotify, ytmusic = _init()
track = spotify.getSingleTrack(args.link)
tracks = {
"name": track["name"],
"artist": track["artists"][0]["name"],
"duration": track["duration_ms"] / 1000,
"album": track["album"]["name"],
}

video_id = ytmusic.search_songs([tracks], use_cached=args.use_cached)

if not video_id:
print("Error: No Match found.")
return
print(f"https://music.youtube.com/watch?v={video_id[0]}")


def cache_clear(args):
from spotify_to_ytmusic.utils.cache_manager import CacheManager

cacheManager = CacheManager()
cacheManager.remove_cache_file()


def setup(args):
setup_func(args.file)
28 changes: 25 additions & 3 deletions spotify_to_ytmusic/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ def get_args(args=None):
"--file", type=Path, help="Optional path to a settings.ini file"
)

cache_parser = argparse.ArgumentParser(add_help=False)
cache_parser.add_argument(
"--use-cached",
action="store_true",
default=False,
help="(Optional) Enable the use of a cache file to save and retrieve query results.",
)

spotify_playlist = argparse.ArgumentParser(add_help=False)
spotify_playlist.add_argument(
"playlist", type=str, help="Provide a playlist Spotify link."
Expand Down Expand Up @@ -75,22 +83,22 @@ def get_args(args=None):
create_parser = subparsers.add_parser(
"create",
help="Create a new playlist on YouTube Music.",
parents=[spotify_playlist, spotify_playlist_create],
parents=[spotify_playlist, spotify_playlist_create, cache_parser],
)
create_parser.set_defaults(func=controllers.create)

liked_parser = subparsers.add_parser(
"liked",
help="Transfer all liked songs of the user.",
parents=[spotify_playlist_create],
parents=[spotify_playlist_create, cache_parser],
)
liked_parser.set_defaults(func=controllers.liked)

update_parser = subparsers.add_parser(
"update",
help="Delete all entries in the provided Google Play Music playlist and "
"update the playlist with entries from the Spotify playlist.",
parents=[spotify_playlist],
parents=[spotify_playlist, cache_parser],
)
update_parser.set_defaults(func=controllers.update)
update_parser.add_argument(
Expand All @@ -109,6 +117,7 @@ def get_args(args=None):
all_parser = subparsers.add_parser(
"all",
help="Transfer all public playlists of the specified user (Spotify User ID).",
parents=[cache_parser],
)
all_parser.add_argument(
"user", type=str, help="Spotify userid of the specified user."
Expand All @@ -121,6 +130,19 @@ def get_args(args=None):
help="Like the songs in all of the public playlist",
)

search_parser = subparsers.add_parser(
"search",
help="Search for a song on YouTube Music to cross-check the algorithm match result.",
parents=[cache_parser],
)
search_parser.add_argument(
"link", type=str, help="Link of the spotify song to search."
)
search_parser.set_defaults(func=controllers.search)

cache_remove_parser = subparsers.add_parser("cache-clear", help="Clear cache file")
cache_remove_parser.set_defaults(func=controllers.cache_clear)

return parser.parse_args(args)


Expand Down
1 change: 0 additions & 1 deletion spotify_to_ytmusic/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ def setup_spotify():
),
"client_secret": input(
"Paste your client secret from the Spotify developer dashboard:"

),
"use_oauth": str(
query_yes_no(
Expand Down
3 changes: 3 additions & 0 deletions spotify_to_ytmusic/spotify.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ def getLikedPlaylist(self):
"description": "Your liked tracks from spotify",
}

def getSingleTrack(self, song_url):
return self.api.track(song_url)


def build_results(tracks, album=None):
results = []
Expand Down
24 changes: 24 additions & 0 deletions spotify_to_ytmusic/utils/cache_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import json
from spotify_to_ytmusic.settings import CACHE_DIR


class CacheManager:
def __init__(self):
self.cache_dir = CACHE_DIR
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.cache_file = self.cache_dir / "lookup.json"

def load_lookup_table(self):
try:
with self.cache_file.open("r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
return {}

def save_to_lookup_table(self, table):
with self.cache_file.open("w", encoding="utf-8") as f:
json.dump(table, f, ensure_ascii=False)

def remove_cache_file(self):
if self.cache_file.is_file():
self.cache_file.unlink()
20 changes: 19 additions & 1 deletion spotify_to_ytmusic/ytmusic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
from spotify_to_ytmusic.utils.match import get_best_fit_song_id
from spotify_to_ytmusic.settings import Settings

from spotify_to_ytmusic.utils.cache_manager import CacheManager

path = os.path.dirname(os.path.realpath(__file__)) + os.sep

cacheManager = CacheManager()


class YTMusicTransfer:
def __init__(self):
Expand All @@ -34,16 +38,27 @@ def create_playlist(self, name, info, privacy="PRIVATE", tracks=None):
def rate_song(self, id, rating):
return self.api.rate_song(id, rating)

def search_songs(self, tracks):
def search_songs(self, tracks, use_cached: bool = False):
videoIds = []
songs = list(tracks)
notFound = list()
lookup_ids = cacheManager.load_lookup_table()

if use_cached:
print("Use of cache file is enabled.")

print("Searching YouTube...")
for i, song in enumerate(songs):
name = re.sub(r" \(feat.*\..+\)", "", song["name"])
query = song["artist"] + " " + name
query = query.replace(" &", "")

if use_cached and query in lookup_ids.keys():
videoIds.append(lookup_ids[query])
continue

result = self.api.search(query)

if len(result) == 0:
notFound.append(query)
else:
Expand All @@ -52,6 +67,9 @@ def search_songs(self, tracks):
notFound.append(query)
else:
videoIds.append(targetSong)
if use_cached:
lookup_ids[query] = targetSong
cacheManager.save_to_lookup_table(lookup_ids)

if i > 0 and i % 10 == 0:
print(f"YouTube tracks: {i}/{len(songs)}")
Expand Down
49 changes: 49 additions & 0 deletions tests/test_cache_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import unittest
from pathlib import Path
from platformdirs import user_cache_dir
from spotify_to_ytmusic.utils.cache_manager import CacheManager


class TestCacheManager(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.cache_manager = CacheManager()
cls.test_data = {
"Jordan Burns Weekend": "4ga4xy7omAE",
"Robert DeLong Lights LŪN Did It To Myself - LŪN Remix": "M7jON1NmyoY",
}

def setUp(self):
self.cache_manager.remove_cache_file()

def test_save_and_load_lookup_table(self):
"""Test that data is correctly saved and loaded from cache."""
self.cache_manager.save_to_lookup_table(self.test_data)

loaded_data = self.cache_manager.load_lookup_table()
self.assertEqual(loaded_data, self.test_data)

def test_load_empty_lookup_table(self):
"""Test that loading a non-existing cache returns an empty dictionary."""
self.assertEqual(self.cache_manager.load_lookup_table(), {})

def test_remove_cache_file(self):
"""Test that the cache file is properly deleted."""
self.cache_manager.save_to_lookup_table(self.test_data)

self.assertTrue(self.cache_manager.cache_file.exists())

self.cache_manager.remove_cache_file()
self.assertFalse(self.cache_manager.cache_file.exists())

def test_cache_file_location(self):
"""Test that the cache file is created in the correct platformdirs location."""
expected_path = (
Path(user_cache_dir(appname="spotify_to_ytmusic", appauthor=False))
/ "lookup.json"
)
self.assertEqual(self.cache_manager.cache_file, expected_path)


if __name__ == "__main__":
unittest.main()
28 changes: 24 additions & 4 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from spotify_to_ytmusic import settings as settings_module
from spotify_to_ytmusic import setup
from spotify_to_ytmusic.main import get_args, main
from spotify_to_ytmusic.settings import DEFAULT_PATH, EXAMPLE_PATH, Settings
from spotify_to_ytmusic.settings import DEFAULT_PATH, EXAMPLE_PATH, CACHE_DIR, Settings

TEST_PLAYLIST = "https://open.spotify.com/playlist/4UzyZJfSQ4584FaWGwepfL"
TEST_SONG = "https://open.spotify.com/track/7bnczC5ATlZaZX0MHjX7KU?si=5a07bffaf6324717"


class TestCli(unittest.TestCase):
Expand All @@ -19,11 +20,15 @@ def setUpClass(cls):

def test_get_args(self):
args = get_args(["all", "user"])
self.assertEqual(len(vars(args)), 5)
self.assertEqual(len(vars(args)), 6)
args = get_args(["create", "playlist-link"])
self.assertEqual(len(vars(args)), 9)
self.assertEqual(len(vars(args)), 10)
args = get_args(["update", "playlist-link", "playlist-name"])
self.assertEqual(len(vars(args)), 6)
self.assertEqual(len(vars(args)), 7)
args = get_args(["liked"])
self.assertEqual(len(vars(args)), 9)
args = get_args(["search", "link"])
self.assertEqual(len(vars(args)), 5)
args = get_args(["setup"])
self.assertEqual(len(vars(args)), 4)

Expand Down Expand Up @@ -70,6 +75,21 @@ def test_create(self):
int(fakeOutput.getvalue().splitlines()[-1][0]) >= 2
) # assert number of lines deleted

def test_search(self):
with mock.patch("sys.argv", ["", "search", TEST_SONG]):
main()

def test_search_with_use_cached_flag(self):
cache_file = CACHE_DIR / "lookup.json"

# Ensure the cache file doesn't exist before running the test
if cache_file.exists():
cache_file.unlink()

with mock.patch("sys.argv", ["", "search", TEST_SONG, "--use-cached"]):
main()
self.assertTrue(cache_file.exists(), "Cache file was not created.")

def test_setup(self):
tmp_path = DEFAULT_PATH.with_suffix(".tmp")
settings = Settings()
Expand Down