diff --git a/.github/workflows/setup/action.yml b/.github/workflows/setup/action.yml index 940c65b8..7f83532b 100644 --- a/.github/workflows/setup/action.yml +++ b/.github/workflows/setup/action.yml @@ -30,5 +30,5 @@ runs: - name: Install pip dependencies run: | - uv sync --python ${{ inputs.python-version }} --extra rethinkdb --extra warcprox --extra yt-dlp + uv sync --python ${{ inputs.python-version }} --extra rethinkdb --extra warcprox --extra yt-dlp --extra psycopg shell: bash diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 4413c6a4..4cedf7b2 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -16,7 +16,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - version: ['3.9', '3.12', '3.14'] + version: ['3.9', '3.12', '3.13'] steps: - uses: actions/checkout@v4 diff --git a/brozzler/model.py b/brozzler/model.py index eab4b1c1..28979bb5 100644 --- a/brozzler/model.py +++ b/brozzler/model.py @@ -2,7 +2,7 @@ brozzler/models.py - model classes representing jobs, sites, and pages, with related logic -Copyright (C) 2014-2024 Internet Archive +Copyright (C) 2014-2025 Internet Archive Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/brozzler/video_data.py b/brozzler/video_data.py new file mode 100644 index 00000000..187fe7fe --- /dev/null +++ b/brozzler/video_data.py @@ -0,0 +1,198 @@ +""" +brozzler/video_data.py - video data support for brozzler predup + +Copyright (C) 2025 Internet Archive + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import datetime +import os +from dataclasses import dataclass +from typing import Any, List, Optional + +import structlog +import urlcanon + +logger = structlog.get_logger(logger_name=__name__) + + +# video_title, video_display_id, video_resolution, video_capture_status are new fields, mostly from yt-dlp metadata +@dataclass(frozen=True) +class VideoCaptureRecord: + crawl_job_id: int + is_test_crawl: bool + seed_id: int + collection_id: int + containing_page_timestamp: str + containing_page_digest: str + containing_page_media_index: int + containing_page_media_count: int + video_digest: str + video_timestamp: str + video_mimetype: str + video_http_status: int + video_size: int + containing_page_url: str + video_url: str + video_title: str + video_display_id: ( + str # aka yt-dlp metadata as display_id, e.g., youtube watch page v param + ) + video_resolution: str + video_capture_status: str # recrawl? what else? + + +class VideoDataClient: + from psycopg_pool import ConnectionPool, PoolTimeout + + VIDEO_DATA_SOURCE = os.getenv("VIDEO_DATA_SOURCE") + + def __init__(self): + from psycopg_pool import ConnectionPool + + pool = ConnectionPool(self.VIDEO_DATA_SOURCE, min_size=1, max_size=9) + pool.wait() + logger.info("pg pool ready") + # atexit.register(pool.close) + + self.pool = pool + + def _execute_pg_query(self, query_tuple, fetchall=False) -> Optional[Any]: + from psycopg_pool import PoolTimeout + + query_str, params = query_tuple + try: + with self.pool.connection() as conn: + with conn.cursor() as cur: + cur.execute(query_str, params) + return cur.fetchall() if fetchall else cur.fetchone() + except PoolTimeout as e: + logger.warn("hit PoolTimeout: %s", e) + self.pool.check() + except Exception as e: + logger.warn("postgres query failed: %s", e) + return None + + def _timestamp4datetime(self, timestamp): + """split `timestamp` into a tuple of 6 integers. + + :param timestamp: full-length timestamp + """ + timestamp = timestamp[:14] + return ( + int(timestamp[:-10]), + int(timestamp[-10:-8]), + int(timestamp[-8:-6]), + int(timestamp[-6:-4]), + int(timestamp[-4:-2]), + int(timestamp[-2:]), + ) + + def recent_video_capture_exists( + self, site=None, containing_page_url=None, recent=30 + ): + # using ait_account_id as postgres partition id + partition_id = ( + site["metadata"]["ait_account_id"] + if site["metadata"]["ait_account_id"] + else None + ) + seed_id = ( + site["metadata"]["ait_seed_id"] if site["metadata"]["ait_seed_id"] else None + ) + result = False + + if partition_id and seed_id and containing_page_url: + # check for postgres query for most recent record + pg_query = ( + "SELECT video_timestamp from video where account_id = %s and seed_id = %s and containing_page_url = %s ORDER BY video_timestamp DESC LIMIT 1", + (partition_id, seed_id, str(urlcanon.aggressive(containing_page_url))), + ) + try: + result_tuple = self._execute_pg_query(pg_query) + if result_tuple is None: + logger.info("found no result for query '%s'", pg_query) + else: + if result_tuple[0]: + logger.info( + "found most recent capture timestamp: %s", result_tuple[0] + ) + capture_datetime = datetime.datetime( + *self._timestamp4datetime(result_tuple[0]), + tzinfo=datetime.timezone.utc, + ) + time_diff = ( + datetime.datetime.now(datetime.timezone.utc) + - capture_datetime + ) + if time_diff < datetime.timedelta(recent): + logger.info( + "recent video capture exists from %s", + containing_page_url, + ) + result = True + else: + logger.info( + "no recent video capture exists from %s, time_diff = %s", + containing_page_url, + time_diff, + ) + else: + logger.info( + "no video timestamp in result for query '%s'", pg_query + ) + except Exception as e: + logger.warn("postgres query failed: %s", e) + else: + logger.warn( + "missing partition_id/account_id, seed_id, or containing_page_url" + ) + + return result + + def get_video_captures(self, site=None, source=None) -> List[str]: + # using ait_account_id as postgres partition id + partition_id = ( + site["metadata"]["ait_account_id"] + if site["metadata"]["ait_account_id"] + else None + ) + seed_id = ( + site["metadata"]["ait_seed_id"] if site["metadata"]["ait_seed_id"] else None + ) + results = [] + + if source == "youtube": + containing_page_url_pattern = "http://youtube.com/watch%" # yes, video data canonicalization uses "http" + # support other media sources here + + if partition_id and seed_id and source: + pg_query = ( + "SELECT containing_page_url from video where account_id = %s and seed_id = %s and containing_page_url like %s", + ( + partition_id, + seed_id, + containing_page_url_pattern, + ), + ) + try: + result = self._execute_pg_query(pg_query, fetchall=True) + if result: + results = [row[0] for row in result] + except Exception as e: + logger.warn("postgres query failed: %s", e) + else: + logger.warn("missing partition_id/account_id, seed_id, or source") + + return results diff --git a/brozzler/worker.py b/brozzler/worker.py index 43e2a021..fd4569a1 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -22,6 +22,7 @@ import importlib.util import io import json +import os import socket import threading import time @@ -57,6 +58,7 @@ class BrozzlerWorker: SITE_SESSION_MINUTES = 15 HEADER_REQUEST_TIMEOUT = 30 FETCH_URL_TIMEOUT = 60 + VIDEO_DATA_SOURCE = os.getenv("VIDEO_DATA_SOURCE") def __init__( self, @@ -89,6 +91,13 @@ def __init__( self._service_registry = service_registry self._ytdlp_proxy_endpoints = ytdlp_proxy_endpoints self._max_browsers = max_browsers + # see video_data.py for more info + if self.VIDEO_DATA_SOURCE and self.VIDEO_DATA_SOURCE.startswith("postgresql"): + from brozzler.video_data import VideoDataClient + + self._video_data = VideoDataClient() + else: + self._video_data = None self._warcprox_auto = warcprox_auto self._proxy = proxy @@ -290,6 +299,27 @@ def should_ytdlp(self, logger, site, page, page_status): if "chrome-error:" in ytdlp_url: return False + # predup... + if self._video_data: + logger.info("checking for recent previous captures of %s", ytdlp_url) + recent = 90 if "youtube.com/watch" in ytdlp_url else 30 + try: + recent_capture_exists = self._video_data.recent_video_capture_exists( + site, ytdlp_url, recent + ) + if recent_capture_exists: + logger.info( + "recent capture of %s found, skipping ytdlp", + ytdlp_url, + ) + return False + except Exception as e: + logger.warning( + "exception querying for previous capture for %s: %s", + ytdlp_url, + str(e), + ) + return True @metrics.brozzler_page_processing_duration_seconds.time() diff --git a/brozzler/ydl.py b/brozzler/ydl.py index b3e19fe5..37af206f 100644 --- a/brozzler/ydl.py +++ b/brozzler/ydl.py @@ -47,8 +47,8 @@ def isyoutubehost(url): - # split 1 splits scheme from url, split 2 splits path from hostname, split 3 splits query string on hostname - return "youtube.com" in url.split("//")[-1].split("/")[0].split("?")[0] + # split 1 splits scheme from url, split 2 splits path from hostname + return "youtube.com" in url.split("//")[-1].split("/")[0] class ExtraHeaderAdder(urllib.request.BaseHandler): @@ -287,12 +287,18 @@ def ydl_postprocess_hook(d): # recommended to avoid bot detection "sleep_interval": 7, "max_sleep_interval": 27, + # preserve pre-2025.07.21 mtime handling + "updatetime": True, } ytdlp_url = page.redirect_url if page.redirect_url else page.url is_youtube_host = isyoutubehost(ytdlp_url) if is_youtube_host and ytdlp_proxy_endpoints: - ydl_opts["proxy"] = random.choice(ytdlp_proxy_endpoints) + # use last proxy_endpoint only for youtube user, channel, playlist pages + if "com/watch" not in ytdlp_url: + ydl_opts["proxy"] = ytdlp_proxy_endpoints[4] + else: + ydl_opts["proxy"] = random.choice(ytdlp_proxy_endpoints[0:4]) # don't log proxy value secrets ytdlp_proxy_for_logs = ( ydl_opts["proxy"].split("@")[1] if "@" in ydl_opts["proxy"] else "@@@" @@ -300,6 +306,7 @@ def ydl_postprocess_hook(d): logger.info("using yt-dlp proxy ...", proxy=ytdlp_proxy_for_logs) # skip warcprox proxying yt-dlp v.2023.07.06: youtube extractor using ranges + # should_proxy_vid_maybe = not ydl.is_youtube_host # if worker._proxy_for(site): # ydl_opts["proxy"] = "http://{}".format(worker._proxy_for(site)) @@ -312,7 +319,7 @@ def ydl_postprocess_hook(d): return ydl -def _remember_videos(page, pushed_videos=None): +def _remember_videos(page, ie_result, pushed_videos=None): """ Saves info about videos captured by yt-dlp in `page.videos`. """ @@ -326,6 +333,12 @@ def _remember_videos(page, pushed_videos=None): "content-type": pushed_video["content-type"], "content-length": pushed_video["content-length"], } + # should be only 1 video for youtube watch pages, maybe vimeo, too, and other similar... + if len(pushed_videos) == 1: + video["title"] = ie_result.get("title") + video["display_id"] = ie_result.get("display_id") + video["resolution"] = ie_result.get("resolution") + video["capture_status"] = None logger.debug("embedded video", video=video) page.videos.append(video) @@ -336,11 +349,6 @@ def _try_youtube_dl(worker, ydl, site, page): while attempt < max_attempts: try: logger.info("trying yt-dlp", url=ydl.url) - # should_download_vid = not ydl.is_youtube_host - # then - # ydl.extract_info(str(urlcanon.whatwg(ydl.url)), download=should_download_vid) - # if ydl.is_youtube_host and ie_result: - # download_url = ie_result.get("url") with brozzler.thread_accept_exceptions(): # we do whatwg canonicalization here to avoid "" resulting in ProxyError @@ -400,7 +408,10 @@ def _try_youtube_dl(worker, ydl, site, page): logger.info("ytdlp completed successfully") - _remember_videos(page, ydl.pushed_videos) + # NOTE: good place for a check for duplicate display_id = ie_result.get("display_id")? + # see ydl_postprocess_hook... best to check for duplicate display_id before writing video! + + _remember_videos(page, ie_result, ydl.pushed_videos) if worker._using_warcprox(site): info_json = json.dumps(ie_result, sort_keys=True, indent=4) logger.info( @@ -444,10 +455,55 @@ def do_youtube_dl(worker, site, page, ytdlp_proxy_endpoints): ie_result.get("extractor") == "youtube:playlist" or ie_result.get("extractor") == "youtube:tab" ): - # youtube watch pages as outlinks - outlinks = { - "https://www.youtube.com/watch?v=%s" % e["id"] - for e in ie_result.get("entries_no_dl", []) - } - # any outlinks for other cases? soundcloud, maybe? + if worker._video_data: + logger.info( + "checking for previously captured youtube watch pages for account %s, seed_id %s", + site["metadata"]["ait_account_id"], + site["metadata"]["ait_seed_id"], + ) + try: + captured_youtube_watch_pages = ( + worker._video_data.get_video_captures(site, source="youtube") + ) + if captured_youtube_watch_pages: + logger.info( + "found %s previously captured youtube watch pages for account %s, seed_id %s", + len(captured_youtube_watch_pages), + site["metadata"]["ait_account_id"], + site["metadata"]["ait_seed_id"], + ) + captured_watch_pages = set() + captured_watch_pages.update(captured_youtube_watch_pages) + uncaptured_watch_pages = [] + for e in ie_result.get("entries_no_dl", []): + # note: http matches, not https + youtube_watch_url = str( + urlcanon.aggressive( + f"http://www.youtube.com/watch?v={e['id']}" + ) + ) + if youtube_watch_url in captured_watch_pages: + logger.info( + "skipping adding %s to yt-dlp outlinks", + youtube_watch_url, + ) + continue + uncaptured_watch_pages.append( + f"https://www.youtube.com/watch?v={e['id']}" + ) + except Exception as e: + logger.warning("hit exception processing worker._video_data: %s", e) + if uncaptured_watch_pages: + logger.info( + "adding %s uncaptured watch pages to yt-dlp outlinks", + len(uncaptured_watch_pages), + ) + outlinks.update(uncaptured_watch_pages) + else: + outlinks = { + "https://www.youtube.com/watch?v=%s" % e["id"] + for e in ie_result.get("entries_no_dl", []) + } + + # todo: handle outlinks for instagram and soundcloud, other media source, here (if anywhere) return outlinks diff --git a/pyproject.toml b/pyproject.toml index c0fc947f..12fd956b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "brozzler" -version = "1.7.0" +version = "1.7.1" authors = [ { name="Noah Levitt", email="nlevitt@archive.org" }, ] @@ -40,6 +40,7 @@ license = "Apache-2.0" [project.optional-dependencies] yt-dlp = ["yt-dlp[default,curl-cffi]>=2024.7.25"] +psycopg = ["psycopg[pool]>=3.2.6"] dashboard = ["flask>=1.0", "gunicorn>=19.8.1"] warcprox = ["warcprox>=2.4.31"] rethinkdb = [