Skip to content

Commit

Permalink
Supporting spotify python app (#385)
Browse files Browse the repository at this point in the history
  • Loading branch information
philippkahr authored Jan 29, 2025
1 parent ced23e5 commit 4efa4ef
Show file tree
Hide file tree
Showing 9 changed files with 567 additions and 0 deletions.
3 changes: 3 additions & 0 deletions supporting-blog-content/spotify-to-elasticsearch/.flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[flake8]

max-line-length = 180
4 changes: 4 additions & 0 deletions supporting-blog-content/spotify-to-elasticsearch/.pylintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[FORMAT]

# Maximum number of characters on a single line.
max-line-length=180
147 changes: 147 additions & 0 deletions supporting-blog-content/spotify-to-elasticsearch/README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Spotify to Elasticsearch

What does it do?

- It works with the spotify API to retrieve metadata.
- It imports your Spotify Privacy export.
- It sends all your songs to an Elasticsearch Cluster for analyzing.

## Requirements

It uses the [Spotipy](https://spotipy.readthedocs.io/en/2.25.0/) library to connect and interact with Spotify API. Therefore you need to create your own Spotify Developer Account.

To minimize the strain on the Spotify API, we are creating a local `metadata_cache.json` file that stores all the unique songs ID + metadata retrieved. If you listen to a song twice, we will only ask the Spotify API once for the metadata.

This was tested and written with Python 3.13.

### Spotify data export

This can take up to 30 days. You will get a mail as soon as the download is ready.

1. Go to [Spotify Privacy](https://www.spotify.com/account/privacy/)
2. Scroll down and select: `Extended Streaming History` it is the right top one.
3. Click in the bottom `request data`
4. You will get a mail to validate that you want this data.
5. Wait until you receive a mail that your data is ready for download.

### Spotify developer account

We need a Spotify developer account otherwise we are not allowed to ask the API.

1. Go to [Spotify Developer](http://developer.spotify.com/)
2. In the top right corner `Log In`
3. Log In with your normal Spotify Account
4. In the top right corner, where the `Log In` button was, click on your name and select `Dashboard`
5. Click on `Create App`
6. Give it an App name like `Elasticsearch Wrapped`
7. Give it a description like `Reading metadata about songs for Elasticsearch`
8. Redirect URIs put `http://localhost:9100`
9. Under: `Which API/SDKs are you planning to use?`
1. Select `Web API`
10. Accept the terms and conditions
11. In the top right corner select `Settings`
12. Copy `client ID` and `client secret`. (We pass this as parameters when we run the script)

### Elastic API Key & Elasticsearch URL

1. Log into your Elastic cluster [Elastic Cloud](https://cloud.elastic.co) and either do a serverless project or hosted deployment. (It works with On Premise or any other form of deployment as well)
2. Serverless, you would create an `Observability Project`
1. Go to manage, click in the top right corner on: `Connection details` and mark down the `Elasticsearch URL`. Should look something like this `https://<project-name>-number.es.<region>`
2. API Key (Please note that this will give the API key the same permissions you have, easiest and quickest)
1. UI:
1. Project Settings => Management => API keys => Create API Key => `spotify` as name. Copy the `endcoded` value. It will only be shown once.
2. Developer Tools:

```json
POST _security/api_key
{
"name": "spotify"
}
```

3. Hosted deployment:
1. Go onto your deployment, or create a new one.
2. Press the `copy endpoint` button for Elasticsearch.
3. API Key:

```json
POST _security/api_key
{
"name": "spotify"
}
```

> Note: If you want more fine grained control, this is the minimum the application needs:
<details>
<summary> API Request </summary>

```json
POST _security/api_key
{
"name": "spotify",
"role_descriptors": {
"spotify_history": {
"cluster": [
"monitor",
"manage_ingest_pipelines"
],
"indices": [
{
"names": [
"spotify-history"
],
"privileges": [
"all"
],
"field_security": {
"grant": [
"*"
],
"except": []
},
"allow_restricted_indices": false
}
],
"applications": [],
"run_as": [],
"metadata": {},
"transient_metadata": {
"enabled": true
}
}
}
}
```

</details>

## Executing

1. Place the extracted files from the zip folder into the `to_read` folder. It needs to be the JSON files directly and not the zip.
1. Execute `pip install -r requirements.txt` and install all the dependencies.
1. Just run in your favorite shell and it will execute and find all the files in the `to_read` folder.

```shell
python3 python/main.py \
--es-url "https://spotify.es....:443" \
--es-api-key "WFdNcE1KTU...==" \
--spotify-client-id "f972762..." \
--spotify-client-secret "74bcf5196b..." \
--user-name "philipp"
```

The `--user-name` is optional but helpful if you index the data of your friends and family as well. The field in Elastic is then called `user`

## Caveats

- It only works with songs. No support for videos, podcasts or anything else yet.
- If you restart it at any point, it will just index everything again and overwrite what is in there. It moves the finished file to the `processed` folder. Once a file is fully done, it won't be touched again unless you move it to the `to_read` folder again.
- The way I set the `_id` means that you can only listen to one artist per second.
- It will log the track for which it cannot find any metadata. That could be due to spotify changing the track ID, because it removed the album the track was part of.

## Kibana Dashboard

There is a prebuild dashboard available, you can import that through the saved objects in Kibana. It was built on 8.17.

![Kibana Dashboard Preview](kibana/dashboard.jpeg)
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

Large diffs are not rendered by default.

187 changes: 187 additions & 0 deletions supporting-blog-content/spotify-to-elasticsearch/python/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import logging
from pathlib import Path
import typer
from datetime import datetime
import json
from rich.logging import RichHandler
from rich.console import Console
from rich.progress import (
Progress,
SpinnerColumn,
BarColumn,
TaskProgressColumn,
TimeElapsedColumn,
)
from services import SpotifyService, ElasticsearchService
from models import SpotifyTrack

logger = None


def try_parsing_date(text):
"""Attempt to parse a date"""
for fmt in ("%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S.%fZ"):
try:
return datetime.strptime(text, fmt)
except ValueError:
logger.error(f"Error parsing date: {text}")
pass


def process_history_file(
file_path: str,
spotify_svc: SpotifyService,
es_svc: ElasticsearchService,
user_name: str,
):
"""Main processing function"""
# Set up rich logging
logging.basicConfig(
level=logging.INFO,
format="%(message)s",
handlers=[RichHandler(rich_tracebacks=True)],
)
logger = logging.getLogger(__name__)
console = Console()

with open(file_path) as f:
history = json.load(f)

console.print(f"[green]Processing {file_path}")

documents = []
with Progress(
SpinnerColumn(),
"[progress.description]{task.description}",
BarColumn(),
TaskProgressColumn(),
TimeElapsedColumn(),
) as progress:
task = progress.add_task("[cyan]Processing tracks...", total=len(history))

total_entries = len(history)
batch_size = 50
for i in range(0, total_entries, batch_size):
entries_batch = history[i : i + batch_size]
metadata_batch = spotify_svc.get_tracks_metadata(entries_batch)
for entry in entries_batch:
try:
# let's make sure to only look at songs
# we do not support videos, podcats or
# anything else yet.
if entry["spotify_track_uri"] is not None and entry[
"spotify_track_uri"
].startswith("spotify:track:"):
track_id = entry["spotify_track_uri"].replace(
"spotify:track:", ""
)
metadata = metadata_batch.get(track_id, None)
played_at = try_parsing_date(entry["ts"])
if metadata is not None:
documents.append(
SpotifyTrack(
id=str(
int(
(
played_at - datetime(1970, 1, 1)
).total_seconds()
)
)
+ "_"
+ entry["master_metadata_album_artist_name"],
artist=[
artist["name"] for artist in metadata["artists"]
],
album=metadata["album"]["name"],
country=entry["conn_country"],
duration=metadata["duration_ms"],
explicit=metadata["explicit"],
listened_to_pct=(
entry["ms_played"] / metadata["duration_ms"]
if metadata["duration_ms"] > 0
else None
),
listened_to_ms=entry["ms_played"],
ip=entry["ip_addr"],
reason_start=entry["reason_start"],
reason_end=entry["reason_end"],
shuffle=entry["shuffle"],
skipped=entry["skipped"],
offline=entry["offline"],
title=metadata["name"],
platform=entry["platform"],
played_at=played_at,
spotify_metadata=metadata,
hourOfDay=played_at.hour,
dayOfWeek=played_at.strftime("%A"),
url=metadata["external_urls"]["spotify"],
user=user_name,
)
)
else:
console.print(f"[red]Metadata not found for track: {entry}")
if len(documents) >= 500:
console.print(
f"[green]Indexing batch of tracks... {len(documents)}"
)
es_svc.bulk_index(documents)
documents = []
progress.advance(task)

except Exception as e:
logger.error(f"Error processing track: {e}")
spotify_svc.metadata_cache.save_cache()
raise

if documents:
console.print(f"[green]Indexing final batch of tracks... {len(documents)}")
es_svc.bulk_index(documents)
console.print(f"[green]Done! {file_path} processed!")

spotify_svc.metadata_cache.save_cache()


app = typer.Typer()


@app.command()
def process_history(
es_url: str = typer.Option(..., help="Elasticsearch URL"),
es_api_key: str = typer.Option(..., help="Elasticsearch API Key"),
spotify_client_id: str = typer.Option(None, help="Spotify Client ID"),
spotify_client_secret: str = typer.Option(None, help="Spotify Client Secret"),
user_name: str = typer.Option(None, help="User name"),
):
"""Setup the services"""
if spotify_client_id and spotify_client_secret:
spotify_svc = SpotifyService(
client_id=spotify_client_id,
client_secret=spotify_client_secret,
redirect_uri="http://localhost:9100",
)
es_svc = ElasticsearchService(es_url=es_url, api_key=es_api_key)
# Ensure index exists
es_svc.check_index()
es_svc.check_pipeline()

files = list(Path("to_read").glob("*Audio*.json"))
if not files:
raise ValueError(
"No JSON files found in 'to_read' directory, expected them to be named *Audio*.json, like Streaming_History_Audio_2023_8.json"
)
else:
for file_path in files:
process_history_file(file_path, spotify_svc, es_svc, user_name)
move_file(file_path)


def move_file(file_path: Path):
"""Move the file to the 'processed' directory"""
processed_dir = Path("processed")
processed_dir.mkdir(exist_ok=True)
new_path = Path("processed") / file_path.name
file_path.rename(new_path)


if __name__ == "__main__":
app()
Loading

0 comments on commit 4efa4ef

Please sign in to comment.