Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions invenio_app_ils/circulation/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from invenio_circulation.proxies import current_circulation
from invenio_indexer.api import RecordIndexer
from invenio_pidstore.errors import PIDDeletedError
from invenio_search import current_search_client

from invenio_app_ils.circulation.utils import resolve_item_from_loan
from invenio_app_ils.documents.api import DOCUMENT_PID_TYPE
Expand Down Expand Up @@ -97,3 +98,75 @@ def index_extra_fields_for_loan(loan_dict):

can_circulate_items_count = document["circulation"]["can_circulate_items_count"]
loan_dict["can_circulate_items_count"] = can_circulate_items_count


def index_stats_fields_for_loan(loan_dict):
"""Indexer hook to modify the loan record dict before indexing"""

creation_date = datetime.fromisoformat(loan_dict["_created"]).date()
start_date = (
datetime.fromisoformat(loan_dict["start_date"]).date()
if loan_dict.get("start_date")
else None
)
end_date = (
datetime.fromisoformat(loan_dict["end_date"]).date()
if loan_dict.get("end_date")
else None
)

# Collect extra information relevant for stats
stats = {}

# Time ranges in days
if start_date and end_date:
loan_duration = (end_date - start_date).days
stats["loan_duration"] = loan_duration

if creation_date and start_date:
waiting_time = (start_date - creation_date).days
stats["waiting_time"] = waiting_time if waiting_time >= 0 else None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it better to return 0 than None?
If we want to distinguish self-checkout from normal loans, this won't be enough (since a normal loan can be retrieved on the same day).

Copy link
Contributor Author

@JakobMiesner JakobMiesner Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to make sure there are no negative values, which would skew the statistics. Like when doing a sum over the waiting time and there is one loan where the start_date was manually set before the creation_date I don't think it should influence the waiting time statistic. But it might be good to have a discussion about how to handle such cases?

Regarding distinguishing self-checkout and normal loans: one can filter with the q parameter for the delivery option.


# Document availability during loan request
stat_events_index_name = "events-stats-loan-transitions"
if current_search_client.indices.exists(index=stat_events_index_name):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to fail rather than silently not index things if this conditional statatement is not fulfiled. ping @ntarocco WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for context: the check was added because if there is no event at the moment a loan is indexed, there will be no events index and the loan indexer will throw an error in the lines below the if, leading to not/wrongly indexed loans.
I could manually create the index in our systems during deploy and add it to the release notes that the stats index needs to be created for the system to work. But then the setup script should also create the index for new systems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, if it is required before running this code (weird, shouldn't it be created automatically if not existing?), then I would fail instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a warning when this happens but still would not fail. While the index is required for adding a specific stat field (namely available_items_during_request) to the loans, the stat is completely optional and I would not let the whole indexing of the loan fail because of it.
Also, the required index events-stats-loan-transitions is handled by invenio-stats. It is created up to 30 minutes after a loan goes through a transition for the first time. (30 minutes because the indexing of the events for invenio-stats is handled by a celery beat task)
Making it fail would also crash all tests and make new systems fail for at least the first 30 minutes.
If we really want the loan indexer to fail here I see only the following solution to execute for all tests and new systems:

  • Manually create the index in invenio-app-ils. I think this is a bad idea because the index is created and managed by invenio-stats.
  • Creating a loan, sending it through a transition and forcing invenio-stats to index the event. This feels super hacky.

Also note:
If this happens to a loan and it skips this stat, and an event for this loan later gets indexed, the loan will also be reindexed and collect the missing information.

loan_pid = loan_dict["pid"]
search_body = {
"query": {
"bool": {
"must": [
{"term": {"trigger": "request"}},
{"term": {"pid_value": loan_pid}},
],
}
},
}

search_result = current_search_client.search(
index=stat_events_index_name, body=search_body
)
hits = search_result["hits"]["hits"]
if len(hits) == 1:
request_transition_event = hits[0]["_source"]
available_items_during_request_count = request_transition_event[
"extra_data"
]["available_items_during_request_count"]
stats["available_items_during_request"] = (
available_items_during_request_count > 0
)
elif len(hits) > 1:
raise ValueError(
f"Multiple request transition events for loan {loan_pid}."
"Expected zero or one."
)
else:
current_app.logger.error(
"Stats events index '{stat_events_index_name}' does not exist. "
"This is normal during initial setup or if no events have been processed yet. "
"No data is lost, as soon as the events are processed, " \
"the loan wil lbe reindex and the the stat will be available."
)

if not "extra_data" in loan_dict:
loan_dict["extra_data"] = {}
loan_dict["extra_data"]["stats"] = stats
102 changes: 101 additions & 1 deletion invenio_app_ils/circulation/stats/api.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 CERN.
# Copyright (C) 2019-2025 CERN.
#
# invenio-app-ils is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""APIs for ILS circulation statistics."""

from invenio_search.engine import dsl

from invenio_app_ils.circulation.search import get_most_loaned_documents
from invenio_app_ils.circulation.stats.schemas import (
_OS_NATIVE_AGGREGATE_FUNCTION_TYPES,
)
from invenio_app_ils.proxies import current_app_ils


Expand Down Expand Up @@ -49,3 +54,98 @@ def fetch_most_loaned_documents(from_date, to_date, bucket_size):
)

return res


def _generate_metric_agg_field_name(metric):
"""Return the aggregation name used for a metric.

:param metric: Must include 'field' and 'aggregation' keys.
:returns: The aggregation field name in the form '<aggregation>_<field>'.
"""

return f"{metric['aggregation']}__{metric['field']}"


def get_loan_statistics(date_fields, search, requested_group_by, requested_metrics):
"""Aggregate loan statistics for requested metrics.

:param date_fields: List of date fields for the record type.
Date fields require different handling when using them to group by.
:param search: The base search object to apply aggregations on
:param requested_group_by: List of group dictionaries with 'field' and optional 'interval' keys.
Example: [{"field": "start_date", "interval": "monthly"}, {"field": "state"}]
:param requested_metrics: List of metric dictionaries with 'field' and 'aggregation' keys.
Example: [{"field": "loan_duration", "aggregation": "avg"}]
:returns: OpenSearch aggregation results with multi-terms histogram and optional metrics
"""

# Build composite aggregation
sources = []
for grouping in requested_group_by:
grouping_field = grouping["field"]

if grouping_field in date_fields:
sources.append(
{
grouping_field: {
"date_histogram": {
"field": grouping_field,
"calendar_interval": grouping["interval"],
"format": "yyyy-MM-dd",
}
}
}
)
else:
sources.append({grouping_field: {"terms": {"field": grouping_field}}})

composite_agg = dsl.A("composite", sources=sources, size=1000)

for metric in requested_metrics:
agg_name = _generate_metric_agg_field_name(metric)

grouping_field = metric["field"]
agg_type = metric["aggregation"]
field_config = {"field": grouping_field}
if agg_type in _OS_NATIVE_AGGREGATE_FUNCTION_TYPES:
composite_agg = composite_agg.metric(
agg_name, dsl.A(agg_type, **field_config)
)
elif agg_type == "median":
composite_agg = composite_agg.metric(
agg_name, dsl.A("percentiles", percents=[50], **field_config)
)

search.aggs.bucket("loan_aggregations", composite_agg)

# Only retrieve aggregation results
search = search[:0]
result = search.execute()

# Parse aggregation results
buckets = []
if hasattr(result.aggregations, "loan_aggregations"):
for bucket in result.aggregations.loan_aggregations.buckets:
metrics_data = {}
for metric in requested_metrics:
agg_name = _generate_metric_agg_field_name(metric)

if hasattr(bucket, agg_name):
agg_result = getattr(bucket, agg_name)
agg_type = metric["aggregation"]

if agg_type in _OS_NATIVE_AGGREGATE_FUNCTION_TYPES:
metrics_data[agg_name] = agg_result.value
elif agg_type == "median":
median_value = agg_result.values.get("50.0")
metrics_data[agg_name] = median_value

bucket_data = {
"key": bucket.key.to_dict(),
"doc_count": bucket.doc_count,
"metrics": metrics_data,
}

buckets.append(bucket_data)

return buckets
110 changes: 110 additions & 0 deletions invenio_app_ils/circulation/stats/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2025 CERN.
#
# invenio-app-ils is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Marshmallow schemas for loan statistics validation."""

import json
import re

from marshmallow import (
Schema,
ValidationError,
fields,
pre_load,
validate,
validates_schema,
)

from invenio_app_ils.errors import InvalidParameterError

_OS_VALID_FIELD_NAME_PATTERN = re.compile(r"^[A-Za-z0-9_.]+$")
_OS_NATIVE_AGGREGATE_FUNCTION_TYPES = {"avg", "sum", "min", "max"}
_VALID_AGGREGATE_FUNCTION_TYPES = _OS_NATIVE_AGGREGATE_FUNCTION_TYPES.union({"median"})
_VALID_DATE_INTERVALS = {"1d", "1w", "1M", "1q", "1y"}


def validate_field_name(field_name):
"""Validate a field name for search to prevent injection attacks.

:param field_name: The field name to validate
:raises InvalidParameterError: If field name is invalid or potentially malicious
"""
if not _OS_VALID_FIELD_NAME_PATTERN.match(field_name):
raise InvalidParameterError(
description=(
f"Invalid field name '{field_name}'. "
"Field names may contain only alphanumeric characters, underscores, "
"and dots."
)
)


class SecureFieldNameField(fields.String):
"""Marshmallow field that validates field names to prevent injection attacks."""

def _deserialize(self, value, attr, data, **kwargs):
"""Deserialize and validate field name."""

field_name = super()._deserialize(value, attr, data, **kwargs)
validate_field_name(field_name)
return field_name


class GroupByItemSchema(Schema):
field = SecureFieldNameField(required=True)
interval = fields.String(validate=validate.OneOf(_VALID_DATE_INTERVALS))

@validates_schema
def validate_date_fields(self, data, **kwargs):
"""Validate that date fields have an interval and non-date fields do not."""

date_fields = self.context["date_fields"]
field = data.get("field")
interval = data.get("interval")
if field in date_fields and not interval:
raise ValidationError(
{"interval": ["Interval is required for date fields."]}
)
if field not in date_fields and interval is not None:
raise ValidationError(
{"interval": ["Interval must not be provided for non-date fields."]}
)


class MetricItemSchema(Schema):
"""Schema for validating a single metric item."""

field = SecureFieldNameField(required=True)
aggregation = fields.String(
required=True, validate=validate.OneOf(_VALID_AGGREGATE_FUNCTION_TYPES)
)


class HistogramParamsSchema(Schema):
"""Schema for validating the query string parameters for the histogram endpoint"""

metrics = fields.List(fields.Nested(MetricItemSchema), required=False)
group_by = fields.List(
fields.Nested(GroupByItemSchema), required=True, validate=validate.Length(min=1)
)
q = fields.String()

def __init__(self, date_fields, *args, **kwargs):
super().__init__(*args, **kwargs)
self.context = {"date_fields": set(date_fields)}

@pre_load
def parse_query_string(self, data, **kwargs):
"""Parse the metrics and group_by parameters from JSON strings."""

try:
for key in ("metrics", "group_by"):
# default value as the field "metrics" is not required
data[key] = json.loads(data.get(key, "[]"))
except Exception as e:
raise ValidationError from e
return data
13 changes: 13 additions & 0 deletions invenio_app_ils/circulation/stats/serializers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025-2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.


from invenio_app_ils.circulation.stats.serializers.response import loan_stats_responsify
from invenio_app_ils.circulation.stats.serializers.schema import HistogramStatsV1

loan_stats_response = loan_stats_responsify(HistogramStatsV1, "application/json")
37 changes: 37 additions & 0 deletions invenio_app_ils/circulation/stats/serializers/response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025-2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Invenio App ILS loan stats response serializers."""

import json

from flask import current_app


def loan_stats_responsify(schema_class, mimetype):
"""Loan stats response serializer.

:param schema_class: Schema instance.
:param mimetype: MIME type of response.
"""

def view(data, code=200, headers=None):
"""Generate the response object."""
# return jsonify(data), code
response_data = schema_class().dump(data)

response = current_app.response_class(
json.dumps(response_data), mimetype=mimetype
)
response.status_code = code

if headers is not None:
response.headers.extend(headers)
return response

return view
33 changes: 33 additions & 0 deletions invenio_app_ils/circulation/stats/serializers/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2025-2025 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Invenio App ILS loan stats serializers schema."""

from marshmallow import Schema, fields


class BucketSchema(Schema):
"""Schema for a single histogram bucket."""

doc_count = fields.Int(required=True)
key = fields.Dict(keys=fields.String(), values=fields.String())

metrics = fields.Dict(
keys=fields.String(),
values=fields.Float(),
)


class HistogramStatsV1(Schema):
"""Schema for a stats histogram response."""

buckets = fields.List(
fields.Nested(BucketSchema),
required=True,
description="Statistics buckets.",
)
Loading
Loading