Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions app/api_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class SearchQuery(Schema):
org_slug = String()
org_type = Enum(ORGANIZATION_TYPE_ENUM)
keyword = List(String())
publisher = String()
after = String()
spatial_filter = Enum(SPATIAL_FILTER_ENUM)
spatial_feature = GeoJson()
Expand Down Expand Up @@ -125,6 +126,16 @@ class OrganizationsResults(Schema):
total = Integer()


class PublisherResponse(Schema):
name = String()
count = Integer()


class PublishersResults(Schema):
publishers = List(Nested(PublisherResponse))
total = Integer()


class OpensearchHealth(Schema):
status = String()

Expand Down
36 changes: 35 additions & 1 deletion app/database/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def search_datasets(
per_page=DEFAULT_PER_PAGE,
org_id=None,
org_types=None,
publisher: str | None = None,
spatial_filter=None,
spatial_geometry=None,
spatial_within=True,
Expand All @@ -78,6 +79,7 @@ def search_datasets(
include_aggregations: bool = False,
keyword_size: int = 100,
org_size: int = 100,
publisher_size: int = 100,
*args,
**kwargs,
):
Expand Down Expand Up @@ -108,6 +110,7 @@ def search_datasets(
per_page=per_page,
org_id=org_id,
org_types=org_types,
publisher=publisher,
search_after=search_after,
spatial_filter=spatial_filter,
spatial_geometry=spatial_geometry,
Expand All @@ -116,6 +119,7 @@ def search_datasets(
include_aggregations=include_aggregations,
keyword_size=keyword_size,
org_size=org_size,
publisher_size=publisher_size,
)

def get_unique_keywords(self, size=100, min_doc_count=1) -> list[dict]:
Expand All @@ -135,11 +139,13 @@ def get_contextual_aggregations(
org_id=None,
org_types=None,
keywords: list[str] = None,
publisher: str | None = None,
spatial_filter=None,
spatial_geometry=None,
spatial_within=True,
keyword_size=100,
org_size=100,
publisher_size=100,
) -> dict:
"""
Get keyword and organization aggregations based on current search context.
Expand All @@ -153,14 +159,20 @@ def get_contextual_aggregations(
per_page=0,
org_id=org_id,
org_types=org_types,
publisher=publisher,
spatial_filter=spatial_filter,
spatial_geometry=spatial_geometry,
spatial_within=spatial_within,
include_aggregations=True,
keyword_size=keyword_size,
org_size=org_size,
publisher_size=publisher_size,
)
return result.aggregations or {"keywords": [], "organizations": []}
return result.aggregations or {
"keywords": [],
"organizations": [],
"publishers": [],
}

def search_locations(self, query, size=100):
"""
Expand Down Expand Up @@ -296,9 +308,14 @@ def list_datasets_for_organization(
dataset_search_query: str = "",
num_results=DEFAULT_PER_PAGE,
keywords: list[str] | None = None,
publisher: str | None = None,
spatial_filter: str | None = None,
spatial_geometry: dict | None = None,
spatial_within: bool = True,
include_aggregations: bool = False,
keyword_size: int = 100,
org_size: int = 100,
publisher_size: int = 100,
) -> SearchResult:
if not organization_id:
return SearchResult.empty()
Expand All @@ -309,9 +326,14 @@ def list_datasets_for_organization(
org_id=organization_id,
sort_by=sort_by,
per_page=num_results,
publisher=publisher,
spatial_filter=spatial_filter,
spatial_geometry=spatial_geometry,
spatial_within=spatial_within,
include_aggregations=include_aggregations,
keyword_size=keyword_size,
org_size=org_size,
publisher_size=publisher_size,
)

def get_opensearch_org_dataset_counts(self, as_dict=False):
Expand Down Expand Up @@ -402,6 +424,18 @@ def _get_organizations_from_db(self) -> list[dict]:
for row in rows
]

def get_top_publishers(self) -> list[dict]:
"""Return the top 100 publishers ordered by dataset count."""
publishers = self.opensearch.get_publisher_counts(size=100)

return sorted(
[item for item in publishers if item.get("name")],
key=lambda item: (
-int(item.get("count", 0)),
(item.get("name") or "").lower(),
),
)

@staticmethod
def to_dict(obj: Any) -> dict[str, Any] | None:
if obj is None:
Expand Down
107 changes: 105 additions & 2 deletions app/database/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def from_opensearch_result(cls, result_dict: dict, per_page_hint=0):

When the search body included aggregation "clauses", the parsed
`aggregations` dict will be populated on the returned instance with
`keywords` and `organizations` lists.
`keywords`, `organizations`, and `publishers` lists.
"""

total = result_dict["hits"]["total"]["value"]
Expand Down Expand Up @@ -96,6 +96,7 @@ def from_opensearch_result(cls, result_dict: dict, per_page_hint=0):
org_buckets = (
raw_aggs.get("organizations", {}).get("by_slug", {}).get("buckets", [])
)
publisher_buckets = raw_aggs.get("unique_publishers", {}).get("buckets", [])
aggregations = {
"keywords": [
{"keyword": b["key"], "count": b["doc_count"]}
Expand All @@ -104,6 +105,10 @@ def from_opensearch_result(cls, result_dict: dict, per_page_hint=0):
"organizations": [
{"slug": b["key"], "count": b["doc_count"]} for b in org_buckets
],
"publishers": [
{"name": b["key"], "count": b["doc_count"]}
for b in publisher_buckets
],
}

return cls(
Expand Down Expand Up @@ -195,6 +200,13 @@ class OpenSearchInterface:
"type": "text",
"analyzer": TEXT_ANALYZER,
"search_analyzer": TEXT_ANALYZER,
"fields": {
"raw": {"type": "keyword"},
"normalized": {
"type": "keyword",
"normalizer": KEYWORD_NORMALIZER,
},
},
},
"keyword": {
"type": "text",
Expand Down Expand Up @@ -885,6 +897,7 @@ def search(
org_id=None,
search_after: list = None,
org_types=None,
publisher: str | None = None,
spatial_filter=None,
spatial_geometry=None,
spatial_within=True,
Expand All @@ -893,6 +906,7 @@ def search(
include_aggregations: bool = False,
keyword_size: int = 100,
org_size: int = 100,
publisher_size: int = 100,
) -> SearchResult:
"""Search our index for a query string.

Expand Down Expand Up @@ -1019,6 +1033,9 @@ def search(
}
)

if publisher:
filters.append({"term": {"publisher.normalized": publisher.lower()}})

# Add spatial filter
if spatial_filter == "geospatial":
filters.append({"term": {"has_spatial": True}})
Expand Down Expand Up @@ -1052,7 +1069,7 @@ def search(
if search_after is not None:
search_body["search_after"] = search_after

# `keyword` and `organization` aggregations for the chips
# `keyword`, `organization`, and `publisher` aggregations for the chips
if include_aggregations:
search_body["aggs"] = {
"unique_keywords": {
Expand All @@ -1076,6 +1093,14 @@ def search(
}
},
},
"unique_publishers": {
"terms": {
"field": "publisher.raw",
"size": publisher_size,
"min_doc_count": 1,
"order": {"_count": "desc"},
}
},
}

# print("QUERY:", search_body)
Expand Down Expand Up @@ -1165,6 +1190,41 @@ def get_organization_counts(
{"slug": bucket["key"], "count": bucket["doc_count"]} for bucket in buckets
]

def get_publisher_counts(
self, size=100, min_doc_count=1, as_dict=False
) -> list[dict] | dict[str, int]:
"""Aggregate datasets by publisher name to get counts."""
agg_body = {
"size": 0,
"aggs": {
"unique_publishers": {
"terms": {
"field": "publisher.raw",
"size": size,
"min_doc_count": min_doc_count,
"order": {"_count": "desc"},
}
}
},
}

result = self.client.search(index=self.INDEX_NAME, body=agg_body)
buckets = (
result.get("aggregations", {})
.get("unique_publishers", {})
.get("buckets", [])
)

if as_dict:
output = {}
for bucket in buckets:
output[bucket["key"]] = bucket["doc_count"]
return output

return [
{"name": bucket["key"], "count": bucket["doc_count"]} for bucket in buckets
]

def get_last_harvested_stats(self) -> dict[str, Any]:
"""Get dataset age-bin counts."""

Expand Down Expand Up @@ -1231,11 +1291,13 @@ def get_contextual_aggregations(
org_id=None,
org_types=None,
keywords: list[str] = None,
publisher: str | None = None,
spatial_filter=None,
spatial_geometry=None,
spatial_within=True,
keyword_size=100,
org_size=100,
publisher_size=100,
) -> dict:
"""
Get keyword and organization aggregations based on current search context.
Expand Down Expand Up @@ -1303,6 +1365,9 @@ def get_contextual_aggregations(
}
)

if publisher:
filters.append({"term": {"publisher.normalized": publisher.lower()}})

if spatial_filter == "geospatial":
filters.append({"term": {"has_spatial": True}})
elif spatial_filter == "non-geospatial":
Expand Down Expand Up @@ -1396,6 +1461,40 @@ def get_contextual_aggregations(
.get("buckets", [])
)

if filters:
publisher_query = {
"bool": {
"filter": filters,
"must": [base_query],
}
}
else:
publisher_query = base_query

publisher_agg_body = {
"size": 0,
"query": publisher_query,
"aggs": {
"unique_publishers": {
"terms": {
"field": "publisher.raw",
"size": publisher_size,
"min_doc_count": 1,
"order": {"_count": "desc"},
}
}
},
}

publisher_result = self.client.search(
index=self.INDEX_NAME, body=publisher_agg_body
)
publisher_buckets = (
publisher_result.get("aggregations", {})
.get("unique_publishers", {})
.get("buckets", [])
)

return {
"keywords": [
{"keyword": bucket["key"], "count": bucket["doc_count"]}
Expand All @@ -1405,4 +1504,8 @@ def get_contextual_aggregations(
{"slug": bucket["key"], "count": bucket["doc_count"]}
for bucket in org_buckets
],
"publishers": [
{"name": bucket["key"], "count": bucket["doc_count"]}
for bucket in publisher_buckets
],
}
Loading
Loading