Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
55e446a
initial commit
Jun 21, 2025
03b329c
formatting fix
Jun 21, 2025
92a6cac
ruff format updates
Jun 21, 2025
c0db5b9
CONCAT
Jun 21, 2025
fd0e0d3
variable VIDEO_DATA
Jun 23, 2025
f925660
skip ternary op for now
Jun 23, 2025
fe5ad0c
VIDEO_DATA_SOURCE
Jun 23, 2025
203d86f
use job_conf.get()
Jun 24, 2025
0526eb8
make psycopg dependency optional
Jun 24, 2025
af1aaee
containing_page_url_pattern update
Jun 24, 2025
8dcac47
type hint get_video_captures
Jun 24, 2025
667feae
ruff import block fix
Jun 24, 2025
f21d312
initial interface update
Jun 24, 2025
de4e7e0
VideoDataWrapper refined
Jun 26, 2025
046db4b
VideoDataClient, generalized
Jun 30, 2025
7d58a9a
keep it simple for now
Jun 30, 2025
db17335
fix github ruff issues
Jun 30, 2025
b4b950c
self._video_data in worker
Jul 1, 2025
4315102
dataclass VideoDataRecord
Jul 1, 2025
2fc30b0
dataclass VideoCaptureRecord instead
Jul 1, 2025
2422e9d
def create_video_capture_record minimally
Jul 1, 2025
d4e0aa6
more new fields
Jul 1, 2025
825de5f
worker._video_data and seed_id mostly
Jul 2, 2025
701707c
save video_record here?
Jul 2, 2025
3c1328f
save_video_capture_record
Jul 3, 2025
bc987b2
add get_recent_video_capture, mostly
Jul 15, 2025
3af8247
updates for QA deploy
Jul 15, 2025
5b967b9
log urls we skipped adding to outlinks
Jul 16, 2025
3d377e7
updates from qa deploy
Jul 16, 2025
3adae49
more updates post-QA-deploy
Jul 16, 2025
f979bbf
even more updates post-QA-deploy
Jul 18, 2025
ada4047
add account_id to model's populate defaults
Jul 18, 2025
c55b765
more populate_defaults maybe
Jul 21, 2025
5f3b360
use only last proxyrack endpoint for non-youtube-watch urls
Jul 21, 2025
0c4ddb7
preserve pre-2025.07.21 mtime handling
Jul 22, 2025
0985518
account_id first (alphabetical order?)
Jul 23, 2025
825cf70
skip creating video records in brozzler for now
Jul 25, 2025
ffaec9d
ruff and isort
Jul 25, 2025
c3e4046
do use uv, not pip
Jul 25, 2025
951ce8c
skip 3.14 for now with psycopg; try 3.13
Jul 25, 2025
82d4e2f
job schema tweak -- more info: https://github.com/pyeve/cerberus/issu…
Jul 25, 2025
0fd859f
one more tweak?
Jul 25, 2025
7af892a
un-update job_schema.yaml
galgeek Jul 25, 2025
1f8af16
make tests pass, even with account_id
Jul 25, 2025
4715aa0
fix typo
Jul 25, 2025
6484256
add account_id to frontier tests
Jul 25, 2025
ba16a9b
add account_id to job_schema (again)
Jul 25, 2025
a93bf13
Merge branch 'master' into predup_type_playlist
galgeek Jul 25, 2025
d2ec6de
partition_id, not account_id
Jul 26, 2025
51d2e57
... except account_id for pg_query
Jul 26, 2025
353a745
fix repeated key
Jul 26, 2025
4a73b1e
simplify job conf
Jul 27, 2025
de56dc3
check captured_youtube_watch_pages not None
Jul 28, 2025
b07d812
skip psycopg.rows.scalar_row
Jul 28, 2025
923ae1f
fix logged errors
Jul 28, 2025
59f7830
import psycopg only in class VideoDataClient
Jul 28, 2025
e8dc41c
import psycopg only at top of file
Jul 28, 2025
0bd13a6
query_tuple, check result for None
Jul 29, 2025
782cd64
tidying
Jul 29, 2025
cb3d494
tidy psycopg imports, mostly
Jul 30, 2025
73476bb
predup query for should_ytdlp
Jul 31, 2025
c0f0037
ruff'd worker.py
Jul 31, 2025
d34d42b
fix formatting errors
Jul 31, 2025
ed67168
updates for get_video_capture mostly
Aug 1, 2025
17587b0
get_recent_video_capture tuple
Aug 4, 2025
9f0344c
mv video data code to new file
Aug 13, 2025
540a57b
fix imports post-video_data move
Aug 14, 2025
8949632
formatting fix
Aug 14, 2025
ed8aad0
import video_data.py when valid VIDEO_DATA_SOURCE
Aug 14, 2025
d7fb547
datetime.now(datetime.timezone.utc)
Aug 15, 2025
6baaf4a
ruff'd
Aug 15, 2025
6a8c62f
recent_video_capture_exists
Aug 15, 2025
8abb9cd
predup check for all urls
Aug 15, 2025
7eae146
skip Bool
Aug 15, 2025
e24c8bc
use video_timestamp, mostly
Aug 20, 2025
9788583
if result_tuple[0]
Aug 20, 2025
67d0ac9
more better result handling for recent_video_capture_exists
Aug 21, 2025
ae67bcd
fix buglets
Aug 23, 2025
b7949b5
ruff'd
Aug 23, 2025
fc7fa6d
fix last buglet & tweak logging
Aug 25, 2025
bade30b
ruff'd (added final comma 8|)
Aug 25, 2025
09fd339
should_ytdlp predup only when worker.video_data
Aug 27, 2025
7c93eb7
if self._video_data
Aug 27, 2025
cdf5db4
self._video_data = None sometimes
Aug 27, 2025
5898d86
note places we might check for duplicate display_id
Oct 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/setup/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ runs:

- name: Install pip dependencies
run: |
uv sync --python ${{ inputs.python-version }} --extra rethinkdb --extra warcprox --extra yt-dlp
uv sync --python ${{ inputs.python-version }} --extra rethinkdb --extra warcprox --extra yt-dlp --extra psycopg
shell: bash
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
version: ['3.9', '3.12', '3.14']
version: ['3.9', '3.12', '3.13']
steps:
- uses: actions/checkout@v4

Expand Down
2 changes: 1 addition & 1 deletion brozzler/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
brozzler/models.py - model classes representing jobs, sites, and pages, with
related logic

Copyright (C) 2014-2024 Internet Archive
Copyright (C) 2014-2025 Internet Archive

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
198 changes: 198 additions & 0 deletions brozzler/video_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
"""
brozzler/video_data.py - video data support for brozzler predup

Copyright (C) 2025 Internet Archive

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import datetime
import os
from dataclasses import dataclass
from typing import Any, List, Optional

import structlog
import urlcanon

logger = structlog.get_logger(logger_name=__name__)


# video_title, video_display_id, video_resolution, video_capture_status are new fields, mostly from yt-dlp metadata
@dataclass(frozen=True)
class VideoCaptureRecord:
crawl_job_id: int
is_test_crawl: bool
seed_id: int
collection_id: int
containing_page_timestamp: str
containing_page_digest: str
containing_page_media_index: int
containing_page_media_count: int
video_digest: str
video_timestamp: str
video_mimetype: str
video_http_status: int
video_size: int
containing_page_url: str
video_url: str
video_title: str
video_display_id: (
str # aka yt-dlp metadata as display_id, e.g., youtube watch page v param
)
video_resolution: str
video_capture_status: str # recrawl? what else?


class VideoDataClient:
from psycopg_pool import ConnectionPool, PoolTimeout

VIDEO_DATA_SOURCE = os.getenv("VIDEO_DATA_SOURCE")

def __init__(self):
from psycopg_pool import ConnectionPool

pool = ConnectionPool(self.VIDEO_DATA_SOURCE, min_size=1, max_size=9)
pool.wait()
logger.info("pg pool ready")
# atexit.register(pool.close)

self.pool = pool

def _execute_pg_query(self, query_tuple, fetchall=False) -> Optional[Any]:
from psycopg_pool import PoolTimeout

query_str, params = query_tuple
try:
with self.pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(query_str, params)
return cur.fetchall() if fetchall else cur.fetchone()
except PoolTimeout as e:
logger.warn("hit PoolTimeout: %s", e)
self.pool.check()
except Exception as e:
logger.warn("postgres query failed: %s", e)
return None

def _timestamp4datetime(self, timestamp):
"""split `timestamp` into a tuple of 6 integers.

:param timestamp: full-length timestamp
"""
timestamp = timestamp[:14]
return (
int(timestamp[:-10]),
int(timestamp[-10:-8]),
int(timestamp[-8:-6]),
int(timestamp[-6:-4]),
int(timestamp[-4:-2]),
int(timestamp[-2:]),
)

def recent_video_capture_exists(
self, site=None, containing_page_url=None, recent=30
):
# using ait_account_id as postgres partition id
partition_id = (
site["metadata"]["ait_account_id"]
if site["metadata"]["ait_account_id"]
else None
)
seed_id = (
site["metadata"]["ait_seed_id"] if site["metadata"]["ait_seed_id"] else None
)
result = False

if partition_id and seed_id and containing_page_url:
# check for postgres query for most recent record
pg_query = (
"SELECT video_timestamp from video where account_id = %s and seed_id = %s and containing_page_url = %s ORDER BY video_timestamp DESC LIMIT 1",
(partition_id, seed_id, str(urlcanon.aggressive(containing_page_url))),
)
try:
result_tuple = self._execute_pg_query(pg_query)
if result_tuple is None:
logger.info("found no result for query '%s'", pg_query)
else:
if result_tuple[0]:
logger.info(
"found most recent capture timestamp: %s", result_tuple[0]
)
capture_datetime = datetime.datetime(
*self._timestamp4datetime(result_tuple[0]),
tzinfo=datetime.timezone.utc,
)
time_diff = (
datetime.datetime.now(datetime.timezone.utc)
- capture_datetime
)
if time_diff < datetime.timedelta(recent):
logger.info(
"recent video capture exists from %s",
containing_page_url,
)
result = True
else:
logger.info(
"no recent video capture exists from %s, time_diff = %s",
containing_page_url,
time_diff,
)
else:
logger.info(
"no video timestamp in result for query '%s'", pg_query
)
except Exception as e:
logger.warn("postgres query failed: %s", e)
else:
logger.warn(
"missing partition_id/account_id, seed_id, or containing_page_url"
)

return result

def get_video_captures(self, site=None, source=None) -> List[str]:
# using ait_account_id as postgres partition id
partition_id = (
site["metadata"]["ait_account_id"]
if site["metadata"]["ait_account_id"]
else None
)
seed_id = (
site["metadata"]["ait_seed_id"] if site["metadata"]["ait_seed_id"] else None
)
results = []

if source == "youtube":
containing_page_url_pattern = "http://youtube.com/watch%" # yes, video data canonicalization uses "http"
# support other media sources here

if partition_id and seed_id and source:
pg_query = (
"SELECT containing_page_url from video where account_id = %s and seed_id = %s and containing_page_url like %s",
(
partition_id,
seed_id,
containing_page_url_pattern,
),
)
try:
result = self._execute_pg_query(pg_query, fetchall=True)
if result:
results = [row[0] for row in result]
except Exception as e:
logger.warn("postgres query failed: %s", e)
else:
logger.warn("missing partition_id/account_id, seed_id, or source")

return results
30 changes: 30 additions & 0 deletions brozzler/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import importlib.util
import io
import json
import os
import socket
import threading
import time
Expand Down Expand Up @@ -57,6 +58,7 @@ class BrozzlerWorker:
SITE_SESSION_MINUTES = 15
HEADER_REQUEST_TIMEOUT = 30
FETCH_URL_TIMEOUT = 60
VIDEO_DATA_SOURCE = os.getenv("VIDEO_DATA_SOURCE")

def __init__(
self,
Expand Down Expand Up @@ -89,6 +91,13 @@ def __init__(
self._service_registry = service_registry
self._ytdlp_proxy_endpoints = ytdlp_proxy_endpoints
self._max_browsers = max_browsers
# see video_data.py for more info
if self.VIDEO_DATA_SOURCE and self.VIDEO_DATA_SOURCE.startswith("postgresql"):
from brozzler.video_data import VideoDataClient

self._video_data = VideoDataClient()
else:
self._video_data = None

self._warcprox_auto = warcprox_auto
self._proxy = proxy
Expand Down Expand Up @@ -290,6 +299,27 @@ def should_ytdlp(self, logger, site, page, page_status):
if "chrome-error:" in ytdlp_url:
return False

# predup...
if self._video_data:
logger.info("checking for recent previous captures of %s", ytdlp_url)
recent = 90 if "youtube.com/watch" in ytdlp_url else 30
try:
recent_capture_exists = self._video_data.recent_video_capture_exists(
site, ytdlp_url, recent
)
if recent_capture_exists:
logger.info(
"recent capture of %s found, skipping ytdlp",
ytdlp_url,
)
return False
except Exception as e:
logger.warning(
"exception querying for previous capture for %s: %s",
ytdlp_url,
str(e),
)

return True

@metrics.brozzler_page_processing_duration_seconds.time()
Expand Down
Loading