Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 85 additions & 18 deletions archive_query_log/monitoring/home.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from datetime import datetime
from gzip import open as gzip_open
from typing import NamedTuple, Type
from pathlib import Path

from elasticsearch_dsl.query import Exists, Query, Term
from expiringdict import ExpiringDict
from flask import render_template, Response, make_response
from warcio import ArchiveIterator

from archive_query_log.config import Config
from archive_query_log.orm import (
Expand All @@ -23,13 +26,14 @@
from archive_query_log.utils.time import utc_now

_CACHE_SECONDS_STATISTICS = 60 * 5 # 5 minutes
_CACHE_SECONDS_WARC_CACHE_STATISTICS = 60 * 1 # 1 minute
_CACHE_SECONDS_PROGRESS = 60 * 10 # 10 minutes


class Statistics(NamedTuple):
name: str
description: str
total: str
total: int
disk_size: str | None
last_modified: datetime | None

Expand Down Expand Up @@ -69,22 +73,19 @@ def _get_statistics(
description: str,
index: str,
document: DocumentType,
filter_query: Query | None = None,
last_modified_field: str = "last_modified"
) -> Statistics:
key = (document, index, repr(filter_query))
key = (document, index, last_modified_field)
if key in _statistics_cache:
return _statistics_cache[key]

config.es.client.indices.refresh(index=index)
stats = config.es.client.indices.stats(index=index)
search = document.search(using=config.es.client, index=index)
if filter_query is not None:
search = search.filter(filter_query)
search = search.filter(Exists(field=last_modified_field))
total = search.count()
# FIXME: Use specific last modified field.
last_modified_response = (
search.query(Exists(field="last_modified"))
.sort("-last_modified")
search.sort(f"-{last_modified_field}")
.extra(size=1)
.execute()
)
Expand All @@ -95,7 +96,7 @@ def _get_statistics(

disk_size = (
_convert_bytes(stats["_all"]["total"]["store"]["size_in_bytes"])
if filter_query is None
if last_modified_field == "last_modified"
else None
)

Expand All @@ -109,6 +110,60 @@ def _get_statistics(
_statistics_cache[key] = statistics
return statistics

_warc_cache_statistics_cache: dict[
tuple[Path, bool],
Statistics,
] = ExpiringDict(
max_len=100,
max_age_seconds=_CACHE_SECONDS_WARC_CACHE_STATISTICS,
)

def _get_warc_cache_statistics(
config: Config,
name: str,
description: str,
cache_path: Path,
temporary: bool = False
) -> Statistics:
"""Retrieve WARC cache statistics."""
key = (cache_path.resolve(), temporary)
if key in _warc_cache_statistics_cache:
return _warc_cache_statistics_cache[key]

file_paths: list[Path]
if temporary:
file_paths = list(cache_path.glob(".*.warc.gz"))

else:
file_paths = list(cache_path.glob("[!.]*.warc.gz"))

disk_size_bytes: int = 0
last_modified: float | None = None
warc_count: int = 0

if len(file_paths) > 0:
disk_size_bytes = sum(file_path.stat().st_size for file_path in file_paths)
last_modified = max(file_path.stat().st_mtime for file_path in file_paths)
for file_path in file_paths:
with gzip_open(file_path, mode="rb") as gzip_file:
iterator = ArchiveIterator(
fileobj=gzip_file,
no_record_parse=True,
)
warc_count += sum(
1 for record in iterator if record.rec_type == "request"
)

statistics = Statistics(
name=name,
description=description,
total=warc_count,
disk_size=_convert_bytes(disk_size_bytes),
last_modified=datetime.fromtimestamp(last_modified) if last_modified else None,
)
_warc_cache_statistics_cache[key] = statistics
return statistics


_progress_cache: dict[
tuple[DocumentType, str, str, str],
Expand Down Expand Up @@ -198,48 +253,60 @@ def home(config: Config) -> str | Response:
description="SERPs for which the query has been parsed from the URL.",
document=Serp,
index=config.es.index_serps,
filter_query=Exists(field="url_query"),
),
_get_statistics(
config=config,
name="+ URL page",
description="SERPs for which the page has been parsed from the URL.",
document=Serp,
index=config.es.index_serps,
filter_query=Exists(field="url_page"),
last_modified_field="url_page_parser.last_parsed",
),
_get_statistics(
config=config,
name="+ URL offset",
description="SERPs for which the offset has been parsed from the URL.",
document=Serp,
index=config.es.index_serps,
filter_query=Exists(field="url_offset"),
last_modified_field="url_offset_parser.last_parsed",
),
_get_statistics(
config=config,
name="+ WARC",
description="SERPs for which the WARC has been downloaded.",
document=Serp,
index=config.es.index_serps,
filter_query=Exists(field="warc_location"),
last_modified_field="warc_downloader.last_downloaded",
),
_get_statistics(
config=config,
name="+ WARC query",
description="SERPs for which the query has been parsed from the WARC.",
document=Serp,
index=config.es.index_serps,
filter_query=Exists(field="warc_query"),
last_modified_field="warc_query_parser.last_parsed",
),
_get_statistics(
config=config,
name="+ WARC snippets",
description="SERPs for which the snippets have been parsed "
"from the WARC.",
description="SERPs for which the snippets have been parsed from the WARC.",
document=Serp,
index=config.es.index_serps,
filter_query=Exists(field="warc_snippets_parser.id"),
last_modified_field="warc_snippets_parser.last_parsed",
),
_get_warc_cache_statistics(
config=config,
name="→ WARC cache (ready)",
description="Downloaded SERP WARC records, ready to be uploaded to S3.",
cache_path = config.warc_cache.path_serps,
temporary=False,
),
_get_warc_cache_statistics(
config=config,
name="→ WARC cache (in progress)",
description="Downloaded SERP WARC records, still locked by a downloader.",
cache_path = config.warc_cache.path_serps,
temporary=True,
),
_get_statistics(
config=config,
Expand Down Expand Up @@ -279,7 +346,7 @@ def home(config: Config) -> str | Response:
_get_statistics(
config=config,
name="WARC snippets parsers",
description="Parser to get the snippets from a SERP's " "WARC contents.",
description="Parser to get the snippets from a SERP's WARC contents.",
document=WarcSnippetsParser,
index=config.es.index_warc_snippets_parsers,
),
Expand Down