Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ __pycache__/
env/
env2/
venv/
.venv/
Copy link
Copy Markdown
Contributor Author

@natlibfi-kaisa natlibfi-kaisa Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not relate to the actual ticket at all but just need to push it.

build/
develop-eggs/
dist/
Expand Down
12 changes: 12 additions & 0 deletions bin/informational/license_report
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/usr/bin/env python3

import os
import sys

bin_dir = os.path.split(__file__)[0]
package_dir = os.path.join(bin_dir, "..", "..")
sys.path.append(os.path.abspath(package_dir))

from scripts import LicenseReportScript # noqa: E402

LicenseReportScript().run()
12 changes: 12 additions & 0 deletions core/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2391,6 +2391,18 @@ def explain_license_pool(self, pool):
pool.licenses_reserved,
)
)
self.write("Licenses: %s" % (len(pool.licenses)))
for license in pool.licenses:
self.write("License ID: %s:" % (license.identifier))
self.write(
" Checkouts left: %s, Checkouts available: %s, Concurrency: %s , Expires: %s"
% (
license.checkouts_left,
license.checkouts_available,
license.terms_concurrency,
license.expires,
)
)

def explain_work(self, work):
self.write("Work info:")
Expand Down
172 changes: 172 additions & 0 deletions scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,178 @@ def explain(self, licensepool):
print("\t".join([str(x) for x in data]))


class LicenseReportScript(Script):

"""Print a CSV-format report of all our licensepools and their licenses."""

def do_run(self):
qu = (
self._db.query(LicensePool)
.filter(LicensePool.open_access == False)
.order_by(LicensePool.availability_time.desc())
)
first_row = [
"Work/Licensepool Identifier",
"Title",
"Author",
"First seen",
"Last seen (best guess)",
"Current licenses owned",
"Current licenses available",
"Changes in number of licenses",
"Changes in title availability",
"License identifier",
"License status",
"License checkouts left",
"License checkouts available",
"License concurrency",
"Active loans",
"License expiration",
]
print(",".join(first_row))

for pool in qu:
self.explain(pool)

def investigate(self, licensepool):
"""Show events about all our licensepools and information
about each license in the pool.

:param licensepool: A LicensePool.

:return: a 3-tuple (last_seen, title_removal_events,
license_removal_events).

`last_seen` is the latest point at which we knew the book was
circulating. If we never knew the book to be circulating, this
is the first time we ever saw the LicensePool.

`title_removal_events` is a query that returns CirculationEvents
in which this LicensePool was removed from the remote collection.

`license_removal_events` is a query that returns
CirculationEvents in which LicensePool.licenses_owned went
from having a positive number to being zero or a negative
number.
"""
first_activity = None
most_recent_activity = None

# If we have absolutely no information about the book ever
# circulating, we act like we lost track of the book
# immediately after seeing it for the first time.
last_seen = licensepool.availability_time

# If there's a recorded loan or hold on the book, that can
# push up the last time the book was known to be circulating.
for l in (licensepool.loans, licensepool.holds):
for item in l:
if not last_seen or item.start > last_seen:
last_seen = item.start

# Now we look for relevant circulation events. First, an event
# where the title was explicitly removed is pretty clearly
# a 'last seen'.
base_query = (
self._db.query(CirculationEvent)
.filter(CirculationEvent.license_pool == licensepool)
.order_by(CirculationEvent.start.desc())
)
title_removal_events = base_query.filter(
CirculationEvent.type == CirculationEvent.DISTRIBUTOR_TITLE_REMOVE
)
if title_removal_events.count():
candidate = title_removal_events[-1].start
if not last_seen or candidate > last_seen:
last_seen = candidate

# Also look for an event where the title went from a nonzero
# number of licenses to a zero number of licenses. That's a
# good 'last seen'.
license_removal_events = (
base_query.filter(
CirculationEvent.type == CirculationEvent.DISTRIBUTOR_LICENSE_REMOVE,
)
.filter(CirculationEvent.old_value > 0)
.filter(CirculationEvent.new_value <= 0)
)
if license_removal_events.count():
candidate = license_removal_events[-1].start
if not last_seen or candidate > last_seen:
last_seen = candidate

return last_seen, title_removal_events, license_removal_events

format = "%Y-%m-%d"

def explain(self, licensepool):
edition = licensepool.presentation_edition
identifier = licensepool.identifier
last_seen, title_removal_events, license_removal_events = self.investigate(
licensepool
)

data = [f"{identifier.type} {identifier.identifier}"]
if edition:
data.extend([f'"{edition.title}"', f'"{edition.author}"'])
if licensepool.availability_time:
first_seen = licensepool.availability_time.strftime(self.format)
else:
first_seen = ""
data.append(first_seen)
if last_seen:
last_seen = last_seen.strftime(self.format)
else:
last_seen = ""
data.append(last_seen)
data.append(licensepool.licenses_owned)
data.append(licensepool.licenses_available)

license_removals = []
for event in license_removal_events:
description = "{}: {}→{}".format(
event.start.strftime(self.format),
event.old_value,
event.new_value,
)
license_removals.append(description)
data.append(", ".join(license_removals))

title_removals = [
event.start.strftime(self.format) for event in title_removal_events
]
data.append(", ".join(title_removals))
# Print the main license pool information
print(",".join(str(item) for item in data)) # Convert all items to strings

# Then fetch each license in the pool
for license in licensepool.licenses:
# Append each field's data as a separate entry in the data list
expire_date = (
license.expires.strftime(self.format) if license.expires else ""
)
license_data = [
"",
"",
"",
"",
"",
"",
"",
"",
"", # Fill the first 9 columns with empty strings
license.identifier,
license.status,
license.checkouts_left,
license.checkouts_available,
license.terms_concurrency,
len(license.loans),
expire_date,
]
# And print each license on a new line
print(",".join(str(item) for item in license_data))


class NYTBestSellerListsScript(TimestampScript):
name = "Update New York Times best-seller lists"

Expand Down