Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,071 changes: 591 additions & 480 deletions Cargo.lock

Large diffs are not rendered by default.

121 changes: 79 additions & 42 deletions psst-core/src/cdn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ use std::{
};

use serde::Deserialize;
use ureq::http::StatusCode;

use crate::{
error::Error,
item_id::FileId,
session::{access_token::TokenProvider, SessionService},
error::Error, item_id::FileId, oauth::refresh_access_token, session::SessionService,
util::default_ureq_agent_builder,
};

Expand All @@ -18,7 +17,6 @@ pub type CdnHandle = Arc<Cdn>;
pub struct Cdn {
session: SessionService,
agent: ureq::Agent,
token_provider: TokenProvider,
}

impl Cdn {
Expand All @@ -27,7 +25,6 @@ impl Cdn {
Ok(Arc::new(Self {
session,
agent: agent.into(),
token_provider: TokenProvider::new(),
}))
}

Expand All @@ -36,16 +33,42 @@ impl Cdn {
"https://api.spotify.com/v1/storage-resolve/files/audio/interactive/{}",
id.to_base16()
);
let access_token = self.token_provider.get(&self.session)?;
let response = self
.agent
.get(&locations_uri)
.query("version", "10000000")
.query("product", "9")
.query("platform", "39")
.query("alt", "json")
.header("Authorization", &format!("Bearer {}", access_token.token))
.call()?;
// OAuth-only: requires a browser OAuth bearer; no Keymaster fallback for CDN.
let mut access_token = self
.session
.oauth_bearer()
.ok_or_else(|| Error::OAuthError("OAuth access token required".to_string()))?;

let call = |token: &str| {
self.agent
.get(&locations_uri)
.query("version", "10000000")
.query("product", "9")
.query("platform", "39")
.query("alt", "json")
.header("Authorization", &format!("Bearer {}", token))
.call()
};

// First attempt; if unauthorized/forbidden, refresh access token and retry once.
let response = match call(&access_token) {
Ok(r) => r,
Err(ureq::Error::StatusCode(code)) if code == 401 || code == 403 => {
let Some(refresh_token) = self.session.oauth_refresh_token() else {
return Err(Error::OAuthError("Missing refresh token".into()));
};
let (new_access, new_refresh) = refresh_access_token(&refresh_token)
.map_err(|_| Error::OAuthError("Failed to refresh token".into()))?;
// Update session tokens so future requests use the fresh token
self.session.set_oauth_bearer(Some(new_access.clone()));
if let Some(r) = new_refresh {
self.session.set_oauth_refresh_token(Some(r));
}
access_token = new_access;
call(&access_token)?
}
Err(e) => return Err(Error::AudioFetchingError(Box::new(e))),
};

#[derive(Deserialize)]
struct AudioFileLocations {
Expand All @@ -54,15 +77,10 @@ impl Cdn {

// Deserialize the response and pick a file URL from the returned CDN list.
let locations: AudioFileLocations = response.into_body().read_json()?;
let file_uri = locations
.cdnurl
.into_iter()
// TODO:
// Now, we always pick the first URL in the list, figure out a better strategy.
// Choosing by random seems wrong.
.next()
// TODO: Avoid panicking here.
.expect("No file URI found");
let file_uri = match locations.cdnurl.into_iter().next() {
Some(uri) => uri,
None => return Err(Error::UnexpectedResponse),
};

let uri = CdnUrl::new(file_uri);
Ok(uri)
Expand All @@ -74,14 +92,25 @@ impl Cdn {
offset: u64,
length: u64,
) -> Result<(u64, impl Read), Error> {
let response = self
let req = self
.agent
.get(uri)
.header("Range", &range_header(offset, length))
.call()?;
let total_length = parse_total_content_length(&response);
let data_reader = response.into_body().into_reader();
Ok((total_length, data_reader))
.header("Range", &range_header(offset, length));
match req.call() {
Ok(response) => {
let status = response.status();
if status != StatusCode::PARTIAL_CONTENT {
return Err(Error::HttpStatus(status.as_u16()));
}
let total_length = parse_total_content_length(&response)?;
let data_reader = response.into_body().into_reader();
Ok((total_length, data_reader))
}
Err(e) => match e {
ureq::Error::StatusCode(code) => Err(Error::HttpStatus(code)),
other => Err(Error::AudioFetchingError(Box::new(other))),
},
}
}
}

Expand Down Expand Up @@ -128,18 +157,26 @@ fn range_header(offfset: u64, length: u64) -> String {
///
/// For example, returns 146515 for a response with header
/// "Content-Range: bytes 0-1023/146515".
fn parse_total_content_length(response: &ureq::http::response::Response<ureq::Body>) -> u64 {
response
.headers()
.get("Content-Range")
.expect("Content-Range header not found")
.to_str()
.expect("Failed to parse Content-Range Header")
.split('/')
.next_back()
.expect("Failed to parse Content-Range Header")
.parse()
.expect("Failed to parse Content-Range Header")
fn parse_total_content_length(
response: &ureq::http::response::Response<ureq::Body>,
) -> Result<u64, Error> {
let header = match response.headers().get("Content-Range") {
Some(h) => h,
None => return Err(Error::UnexpectedResponse),
};
let s = match header.to_str() {
Ok(s) => s,
Err(_) => return Err(Error::UnexpectedResponse),
};
let total_str = match s.split('/').next_back() {
Some(x) => x,
None => return Err(Error::UnexpectedResponse),
};
let total = match total_str.parse::<u64>() {
Ok(n) => n,
Err(_) => return Err(Error::UnexpectedResponse),
};
Ok(total)
}

/// Parses an expiration of an audio file URL.
Expand Down
2 changes: 2 additions & 0 deletions psst-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub enum Error {
SendError,
RecvTimeoutError(RecvTimeoutError),
JoinError,
HttpStatus(u16),
OAuthError(String),
}

Expand Down Expand Up @@ -62,6 +63,7 @@ impl fmt::Display for Error {
Self::SendError => write!(f, "Failed to send into a channel"),
Self::RecvTimeoutError(err) => write!(f, "Channel receive timeout: {err}"),
Self::JoinError => write!(f, "Failed to join thread"),
Self::HttpStatus(code) => write!(f, "HTTP status {}", code),
Self::OAuthError(msg) => write!(f, "OAuth error: {msg}"),
}
}
Expand Down
40 changes: 31 additions & 9 deletions psst-core/src/oauth.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::error::Error;
use oauth2::{
basic::BasicClient, reqwest::http_client, AuthUrl, AuthorizationCode, ClientId, CsrfToken,
PkceCodeChallenge, PkceCodeVerifier, RedirectUrl, Scope, TokenResponse, TokenUrl,
PkceCodeChallenge, PkceCodeVerifier, RedirectUrl, RefreshToken, Scope, TokenResponse, TokenUrl,
};
use std::{
io::{BufRead, BufReader, Write},
Expand All @@ -17,15 +17,13 @@ pub fn listen_for_callback_parameter(
timeout: Duration,
parameter_name: &'static str,
) -> Result<String, Error> {
log::info!(
"starting callback listener for '{parameter_name}' on {socket_address:?}",
);
log::debug!("starting callback listener for '{parameter_name}' on {socket_address:?}",);

// Create a simpler, linear flow
// 1. Bind the listener
let listener = match TcpListener::bind(socket_address) {
Ok(l) => {
log::info!("listener bound successfully");
log::debug!("listener bound successfully");
l
}
Err(e) => {
Expand Down Expand Up @@ -79,7 +77,7 @@ fn handle_callback_connection(
if reader.read_line(&mut request_line).is_ok() {
match extract_parameter_from_request(&request_line, parameter_name) {
Some(value) => {
log::info!("received callback parameter '{parameter_name}'.");
log::debug!("received callback parameter '{parameter_name}'.");
send_success_response(stream);
let _ = tx.send(Ok(value));
}
Expand Down Expand Up @@ -178,7 +176,7 @@ pub fn exchange_code_for_token(
redirect_port: u16,
code: AuthorizationCode,
pkce_verifier: PkceCodeVerifier,
) -> String {
) -> (String, Option<String>) {
let client = create_spotify_oauth_client(redirect_port);

let token_response = client
Expand All @@ -187,12 +185,36 @@ pub fn exchange_code_for_token(
.request(http_client)
.expect("Failed to exchange code for token");

token_response.access_token().secret().to_string()
let access = token_response.access_token().secret().to_string();
let refresh = token_response
.refresh_token()
.map(|t| t.secret().to_string());
(access, refresh)
}

fn get_scopes() -> Vec<Scope> {
crate::session::access_token::ACCESS_SCOPES
// Use a broader OAuth scope set for initial AP login (includes streaming).
const OAUTH_SCOPES: &str = "streaming,user-read-email,user-read-private,playlist-read-private,playlist-read-collaborative,playlist-modify-public,playlist-modify-private,user-follow-modify,user-follow-read,user-library-read,user-library-modify,user-top-read,user-read-recently-played,app-remote-control";
OAUTH_SCOPES
.split(',')
.map(|s| Scope::new(s.trim().to_string()))
.collect()
}

/// Refresh an access token using a stored refresh token. Returns the new access token and
/// an optional new refresh token if Spotify rotates it.
pub fn refresh_access_token(refresh_token: &str) -> Result<(String, Option<String>), Error> {
// Reuse the same OAuth client configuration; redirect URI is irrelevant for refresh flow.
let client = create_spotify_oauth_client(0);

let token_response = client
.exchange_refresh_token(&RefreshToken::new(refresh_token.to_string()))
.request(http_client)
.map_err(|e| Error::OAuthError(format!("Failed to refresh token: {e}")))?;

let access = token_response.access_token().secret().to_string();
let refresh = token_response
.refresh_token()
.map(|t| t.secret().to_string());
Ok((access, refresh))
}
22 changes: 18 additions & 4 deletions psst-core/src/player/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,11 @@ impl StreamedFile {

fn service_streaming(&self) -> Result<(), Error> {
let mut last_url = self.url.clone();
let force_resolve = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let mut fresh_url = || -> Result<CdnUrl, Error> {
if last_url.is_expired() {
if last_url.is_expired()
|| force_resolve.swap(false, std::sync::atomic::Ordering::SeqCst)
{
last_url = self.cdn.resolve_audio_file_url(self.path.file_id)?;
}
Ok(last_url.clone())
Expand All @@ -254,6 +257,7 @@ impl StreamedFile {
let mut writer = self.storage.writer()?;
let file_path = self.storage.path().to_path_buf();
let file_id = self.path.file_id;
let force_resolve = force_resolve.clone();
move || {
match load_range(&mut writer, &cdn, &url, offset, length) {
Ok(_) => {
Expand All @@ -266,9 +270,19 @@ impl StreamedFile {
}
}
Err(err) => {
log::error!("failed to download: {err}");
// Range failed to download, remove it from the requested set.
writer.mark_as_not_requested(offset, length);
// On auth error, try once to re-resolve the CDN URL and retry; otherwise mark as not requested.
let retry_after_auth = |w: &mut StreamWriter| -> Result<(), ()> {
let new_url = cdn.resolve_audio_file_url(file_id).map_err(|_| ())?;
load_range(w, &cdn, &new_url.url, offset, length).map_err(|_| ())
};

let retried_ok = matches!(err, Error::HttpStatus(code) if code == 401 || code == 403)
&& retry_after_auth(&mut writer).is_ok();

if !retried_ok {
force_resolve.store(true, std::sync::atomic::Ordering::SeqCst);
writer.mark_as_not_requested(offset, length);
}
}
}
}
Expand Down
29 changes: 24 additions & 5 deletions psst-core/src/session/access_token.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Keymaster token acquisition is deprecated in Psst.
// OAuth/PKCE is the primary auth path; Keymaster remains only for legacy compatibility.
// See librespot discussion for context (403/code=4, scope restrictions):
// https://github.com/librespot-org/librespot/issues/1532#issuecomment-3188123661
use std::time::{Duration, Instant};

use parking_lot::Mutex;
Expand All @@ -11,12 +15,13 @@ use super::SessionService;
pub const CLIENT_ID: &str = "65b708073fc0480ea92a077233ca87bd";

// All scopes we could possibly require.
pub const ACCESS_SCOPES: &str = "streaming,user-read-email,user-read-private,playlist-read-private,playlist-read-collaborative,playlist-modify-public,playlist-modify-private,user-follow-modify,user-follow-read,user-library-read,user-library-modify,user-top-read,user-read-recently-played";
pub const ACCESS_SCOPES: &str = "user-read-email,user-read-private,playlist-read-private,playlist-read-collaborative,playlist-modify-public,playlist-modify-private,user-follow-modify,user-follow-read,user-library-read,user-library-modify,user-top-read,user-read-recently-played";

// Consider token expired even before the official expiration time. Spotify
// seems to be reporting excessive token TTLs so let's cut it down by 30
// minutes.
const EXPIRATION_TIME_THRESHOLD: Duration = Duration::from_secs(60 * 30);
// Avoid repeatedly hammering keymaster when errors occur.

#[derive(Clone)]
pub struct AccessToken {
Expand All @@ -35,10 +40,10 @@ impl AccessToken {
pub fn request(session: &SessionService) -> Result<Self, Error> {
#[derive(Deserialize)]
struct MercuryAccessToken {
#[serde(rename = "expiresIn")]
expires_in: u64,
#[serde(rename = "accessToken")]
#[serde(alias = "accessToken", alias = "access_token")]
access_token: String,
#[serde(alias = "expiresIn", alias = "expires_in")]
expires_in: u64,
}

let token: MercuryAccessToken = session.connected()?.get_mercury_json(format!(
Expand Down Expand Up @@ -67,10 +72,24 @@ impl TokenProvider {
}
}

pub fn invalidate(&self) {
let mut token = self.token.lock();
*token = AccessToken::expired();
}

pub fn get(&self, session: &SessionService) -> Result<AccessToken, Error> {
// Prefer an OAuth bearer if the session provides one.
if let Some(tok) = session.oauth_bearer() {
return Ok(AccessToken {
token: tok,
// Give the bearer a reasonable lifetime; it will be replaced when refreshed.
expires: Instant::now() + Duration::from_secs(3600),
});
}

let mut token = self.token.lock();
if token.is_expired() {
log::info!("access token expired, requesting");
log::debug!("access token expired, requesting");
*token = AccessToken::request(session)?;
}
Ok(token.clone())
Expand Down
Loading
Loading