diff --git a/Dockerfile b/Dockerfile index ab40f19..3642ca4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,7 @@ WORKDIR /opt/kobodl/src ENV PATH="/opt/kobodl/local/venv/bin:$PATH" ENV VIRTUAL_ENV="/opt/kobodl/local/venv" -RUN apk add --no-cache gcc libc-dev libffi-dev +RUN apk add --no-cache gcc libc-dev libffi-dev ffmpeg ADD https://install.python-poetry.org /install-poetry.py RUN POETRY_VERSION=1.1.7 POETRY_HOME=/opt/kobodl/local python /install-poetry.py diff --git a/kobodl/actions.py b/kobodl/actions.py index 99ef4dc..0b662ad 100644 --- a/kobodl/actions.py +++ b/kobodl/actions.py @@ -1,6 +1,7 @@ import json import os import platform +import subprocess from typing import List, TextIO, Tuple, Union import click @@ -51,12 +52,16 @@ def __MakeFileNameForBook(bookMetadata: dict, formatStr: str) -> str: fileName = '' author = __SanitizeString(__GetBookAuthor(bookMetadata)) title = __SanitizeString(bookMetadata['Title']) + bookSeries = __SanitizeString(bookMetadata.get('Series', {}).get('Name', '')) + bookSeriesNumber = __SanitizeString(bookMetadata.get('Series', {}).get('Number', '')) return formatStr.format_map( { **bookMetadata, 'Author': author, 'Title': title, + 'Series': bookSeries, + 'SeriesNumber': bookSeriesNumber, # Append a portion of revisionId to prevent name collisions. 'ShortRevisionId': bookMetadata['RevisionId'][:8], } @@ -144,6 +149,90 @@ def __GetBookList(kobo: Kobo, listAll: bool, exportFile: Union[TextIO, None]) -> return rows +def __CreateM4BFile(outputPath: str, filename: str, bookMetadata: dict) -> None: + # Check if ffmpeg is installed + try: + subprocess.run(["ffmpeg", "-version"], check=True) + except subprocess.CalledProcessError: + click.echo("ffmpeg is not installed. Please install ffmpeg and try again.") + return + + # Concatenate all mp3 files into one file + subprocess.run( + [ + "ffmpeg", + "-f", + "concat", + "-safe", + "0", + "-i", + "files.txt", + "-c", + "copy", + "-y", + "build_01_concat.mp3", + ], + check=True, + cwd=outputPath, + ) + + # Add cover + subprocess.run( + [ + "ffmpeg", + "-i", + "build_01_concat.mp3", + "-i", + "cover.jpg", + "-c", + "copy", + "-map", + "0", + "-map", + "1", + "-y", + "build_02_cover.mp3", + ], + check=True, + cwd=outputPath, + ) + + # Convert mp3 to m4a + subprocess.run( + ["ffmpeg", "-y", "-i", "build_02_cover.mp3", "-c:v", "copy", "build_03_m4a.m4a"], + check=True, + cwd=outputPath, + ) + + # Add metadata to the m4a file and convert to m4b + subprocess.run( + [ + "ffmpeg", + "-i", + "build_03_m4a.m4a", + "-i", + "metadata.txt", + "-map", + "0", + "-map_metadata", + "1", + "-c", + "copy", + "-y", + f'{filename}.m4b', + ], + check=True, + cwd=outputPath, + ) + + # Remove all build_* files using python + for f in os.listdir(outputPath): + if f.startswith("build_"): + os.remove(os.path.join(outputPath, f)) + + return os.path.join(outputPath, f'{bookMetadata["Title"]}.m4b') + + def ListBooks(users: List[User], listAll: bool, exportFile: Union[TextIO, None]) -> List[Book]: '''list all books currently in the account''' for user in users: @@ -174,6 +263,7 @@ def GetBookOrBooks( outputPath: str, formatStr: str = r'{Author} - {Title} {ShortRevisionId}', productId: str = '', + generateAudiobook: bool = False, ) -> Union[None, str]: """ download 1 or all books to file @@ -203,6 +293,10 @@ def GetBookOrBooks( click.echo('Skipping subscribtion entity') continue + # Save metadata to JSON file + with open(os.path.join(outputPath, f'{bookMetadata["Title"]}.json'), 'w') as f: + f.write(json.dumps(bookMetadata, indent=2)) + fileName = __MakeFileNameForBook(bookMetadata, formatStr) if book_type == BookType.EBOOK: # Audiobooks go in sub-directories @@ -210,7 +304,15 @@ def GetBookOrBooks( fileName += '.epub' outputFilePath = os.path.join(outputPath, fileName) - if not productId and os.path.exists(outputFilePath): + if ( + not productId + and os.path.exists(outputFilePath) + and ( + generateAudiobook + and book_type == BookType.AUDIOBOOK + and os.path.exists(os.path.join(outputFilePath, f'{fileName}.m4b')) + ) + ): # when downloading ALL books, skip books we've downloaded before click.echo(f'Skipping already downloaded book {outputFilePath}') continue @@ -242,6 +344,13 @@ def GetBookOrBooks( err=True, ) + # Create final audiobook file + if book_type == BookType.AUDIOBOOK and generateAudiobook: + try: + __CreateM4BFile(outputFilePath, fileName, bookMetadata) + except Exception as e: + click.echo(f'Failed to create audiobook file: {str(e)}', err=True) + if productId: # TODO: support audiobook downloads from web return outputFilePath diff --git a/kobodl/commands/book.py b/kobodl/commands/book.py index 5bd25fc..c343ece 100644 --- a/kobodl/commands/book.py +++ b/kobodl/commands/book.py @@ -45,9 +45,18 @@ def book(): default=r'{Author} - {Title} {ShortRevisionId}', help=r"default: '{Author} - {Title} {ShortRevisionId}'", ) +@click.option('--generate-audiobook', is_flag=True, help='generate m4b audiobook bundle file') @click.argument('product-id', nargs=-1, type=click.STRING) @click.pass_obj -def get(ctx, user, output_dir: Path, get_all: bool, format_str: str, product_id: List[str]): +def get( + ctx, + user, + output_dir: Path, + get_all: bool, + format_str: str, + generate_audiobook: bool, + product_id: List[str], +): if len(Globals.Settings.UserList.users) == 0: click.echo('error: no users found. Did you `kobodl user add`?', err=True) exit(1) @@ -77,10 +86,18 @@ def get(ctx, user, output_dir: Path, get_all: bool, format_str: str, product_id: os.makedirs(output_dir, exist_ok=True) if get_all: - actions.GetBookOrBooks(usercls, output_dir, formatStr=format_str) + actions.GetBookOrBooks( + usercls, output_dir, formatStr=format_str, generateAudiobook=generate_audiobook + ) else: for pid in product_id: - actions.GetBookOrBooks(usercls, output_dir, formatStr=format_str, productId=pid) + actions.GetBookOrBooks( + usercls, + output_dir, + formatStr=format_str, + productId=pid, + generateAudiobook=generate_audiobook, + ) @book.command(name='list', help='list books') diff --git a/kobodl/kobo.py b/kobodl/kobo.py index fa05fa9..b6bcfdc 100644 --- a/kobodl/kobo.py +++ b/kobodl/kobo.py @@ -6,10 +6,12 @@ import sys import urllib import uuid +import json from enum import Enum from shutil import copyfile -from typing import Dict, Tuple +from typing import Dict, Tuple, List +import dataclasses import requests from bs4 import BeautifulSoup from dataclasses_json import dataclass_json @@ -30,21 +32,54 @@ class Book: Audiobook: bool Owner: User - class BookType(Enum): EBOOK = 1 AUDIOBOOK = 2 SUBSCRIPTION = 3 +@dataclass_json +@dataclasses.dataclass +class Drm: + DrmType: str + Keys: List[str] + +@dataclass_json +@dataclasses.dataclass +class NavigationItem: + Offset: float + PartId: int + Title: str + +@dataclass_json +@dataclasses.dataclass +class SpineItem: + Bitrate: float + Duration: float + Id: int + MediaType: str + Url: str + FileExtension: str + +@dataclass_json +@dataclasses.dataclass +class AudiobookData: + Drm: Drm + Navigation: List[NavigationItem] + Spine: List[SpineItem] + + def get_spine(self, part_id: int) -> 'SpineItem | None': + for s in self.Spine: + if s.Id == part_id: + return s + + raise KoboException(f"No matching spine found for PartId {part_id}. This will cause book generation to fail.") class NotAuthenticatedException(Exception): pass - class KoboException(Exception): pass - class Kobo: Affiliate = "Kobo" ApplicationVersion = "8.11.24971" @@ -270,21 +305,96 @@ def __DownloadToFile(self, url, outputPath: str) -> None: for chunk in response.iter_content(chunk_size=1024 * 256): f.write(chunk) + def __prepareAudiobookMetadata(self, bookMetadata: dict, outputPath: str) -> None: + os.makedirs(outputPath, exist_ok=True) + + # Download cover image as cover.jpg + coverImageId = bookMetadata.get("CoverImageId") + if coverImageId: + coverImageUrl = f'https://cdn.kobo.com/book-images/{coverImageId}/353/569/90/False/.jpg' + response = self.Session.get(coverImageUrl) + response.raise_for_status() + with open(os.path.join(outputPath, "cover.jpg"), "wb") as f: + f.write(response.content) + + # Write metadata to metadata.txt before processing the chapters download + with open(os.path.join(outputPath, "metadata.txt"), "w") as chaptersFile: + chaptersFile.write(self.__buildFFMpegChapterHeader(bookMetadata)) + + def __createFFMpegChapter(self, start: int, duration: int, title: str) -> str: + return f'''[CHAPTER] +TIMEBASE=1/1000 +START={start} +END={start + duration} +title={title}''' + + def __buildFFMpegChapterHeader(self, bookMetadata: dict) -> None: + authors: List[str] = [] + composers: List[str] = [] + + for creator in bookMetadata.get("ContributorRoles"): + if creator.get("Role") == "Author": + authors.append(creator["Name"]) + if creator.get("Role") == "Narrator": + composers.append(creator["Name"]) + + return f''';FFMETADATA1 +title={bookMetadata.get("Title")} +artist={", ".join(authors)} +composer={", ".join(composers)} +publisher={bookMetadata.get("Publisher").get("Name")} +date={bookMetadata.get("PublicationDate")} + +''' + def __DownloadAudiobook(self, url, outputPath: str) -> None: response = self.Session.get(url) - + files = [] response.raise_for_status() if not os.path.isdir(outputPath): os.mkdir(outputPath) - data = response.json() + # Deserialize using AudiobookData dataclass + data = AudiobookData.from_dict(response.json()) + + # Write response data to JSON file + with open(os.path.join(outputPath, "data.json"), "w") as f: + f.write(data.to_json(indent=4)) + + metadataHandler = open(os.path.join(outputPath, "metadata.txt"), "a") + + start = 0 + + for item in data.Navigation: + part_id = item.PartId + if part_id > len(data.Spine): + raise KoboException(f"PartId {part_id} is out of range for Spine list. This will cause book generation to fail.") + + spine = data.get_spine(part_id) + + spine_id = spine.Id + filename = f'spine_{spine_id:04}.{spine.FileExtension}' + metadataHandler.write( + self.__createFFMpegChapter(start, int(spine.Duration * 1000), item.Title) + '\n\n' + ) + start += int(spine.Duration * 1000) + + if filename not in files: + files.append(filename) + + # Download chapter if missing + if not os.path.isfile(os.path.join(outputPath, filename)): + resp_chapter = self.Session.get(spine.Url, stream=True) + filePath = os.path.join(outputPath, filename) + with open(filePath, "wb") as f: + for chunk in resp_chapter.iter_content(chunk_size=1024 * 256): + f.write(chunk) + + with open(os.path.join(outputPath, "files.txt"), "w") as filesHandler: + for f in files: + filesHandler.write(f"file '{f}'\n") - for item in data['Spine']: - fileNum = int(item['Id']) + 1 - response = self.Session.get(item['Url'], stream=True) - filePath = os.path.join(outputPath, str(fileNum) + '.' + item['FileExtension']) - with open(filePath, "wb") as f: - for chunk in response.iter_content(chunk_size=1024 * 256): - f.write(chunk) + metadataHandler.close() + filesHandler.close() # PUBLIC METHODS: @staticmethod @@ -342,6 +452,7 @@ def Download(self, bookMetadata: dict, isAudiobook: bool, outputPath: str) -> No try: if isAudiobook: + self.__prepareAudiobookMetadata(bookMetadata, outputPath) self.__DownloadAudiobook(downloadUrl, outputPath) else: self.__DownloadToFile(downloadUrl, temporaryOutputPath)