Skip to content

Commit 04981fe

Browse files
committed
resolving conflicts
1 parent 1f22a63 commit 04981fe

File tree

4 files changed

+66
-339
lines changed

4 files changed

+66
-339
lines changed

integrations/opensearch/src/haystack_integrations/components/retrievers/opensearch/sql_retriever.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#
33
# SPDX-License-Identifier: Apache-2.0
44

5-
from typing import Any, Optional
5+
from typing import Any
66

77
from haystack import component, default_from_dict, default_to_dict, logging
88

@@ -84,8 +84,8 @@ def from_dict(cls, data: dict[str, Any]) -> "OpenSearchSQLRetriever":
8484
def run(
8585
self,
8686
query: str,
87-
response_format: Optional[ResponseFormat] = None,
88-
document_store: Optional[OpenSearchDocumentStore] = None,
87+
response_format: ResponseFormat | None = None,
88+
document_store: OpenSearchDocumentStore | None = None,
8989
) -> dict[str, Any]:
9090
"""
9191
Execute a raw OpenSearch SQL query against the index.
@@ -138,8 +138,8 @@ def run(
138138
async def run_async(
139139
self,
140140
query: str,
141-
response_format: Optional[ResponseFormat] = None,
142-
document_store: Optional[OpenSearchDocumentStore] = None,
141+
response_format: ResponseFormat | None = None,
142+
document_store: OpenSearchDocumentStore | None = None,
143143
) -> dict[str, Any]:
144144
"""
145145
Asynchronously execute a raw OpenSearch SQL query against the index.

integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py

Lines changed: 14 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -1255,7 +1255,6 @@ def count_unique_metadata_by_filter(self, filters: dict[str, Any], metadata_fiel
12551255
mapping = self._client.indices.get_mapping(index=self._index)
12561256
index_mapping = mapping[self._index]["mappings"]["properties"]
12571257

1258-
12591258
# normalize field names
12601259
normalized_metadata_fields = [self._normalize_metadata_field_name(field) for field in metadata_fields]
12611260
# validate that all requested fields exist in the index mapping
@@ -1274,19 +1273,6 @@ def count_unique_metadata_by_filter(self, filters: dict[str, Any], metadata_fiel
12741273
result = self._client.search(index=self._index, body=body)
12751274

12761275
# extract cardinality values from aggregations
1277-
<<<<<<< HEAD
1278-
return self._extract_distinct_counts_from_aggregations(result.get("aggregations", {}), index_mapping)
1279-
1280-
async def count_unique_metadata_by_filter_async(self, filters: dict[str, Any]) -> dict[str, int]:
1281-
"""
1282-
Asynchronously returns the number of unique values for each metadata field of the documents that match the
1283-
provided filters.
1284-
1285-
:param filters: The filters to apply to count documents.
1286-
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
1287-
:returns: A dictionary mapping each metadata field name to the count of its unique values among the filtered
1288-
documents.
1289-
=======
12901276
return self._extract_distinct_counts_from_aggregations(
12911277
result.get("aggregations", {}), index_mapping, normalized_metadata_fields
12921278
)
@@ -1305,7 +1291,6 @@ async def count_unique_metadata_by_filter_async(
13051291
:returns: A dictionary mapping each metadata field name to the count of its unique values among the filtered
13061292
documents.
13071293
:raises ValueError: If any of the requested fields don't exist in the index mapping.
1308-
>>>>>>> main
13091294
"""
13101295
await self._ensure_initialized_async()
13111296
assert self._async_client is not None
@@ -1324,7 +1309,6 @@ async def count_unique_metadata_by_filter_async(
13241309

13251310
# build aggregations for specified metadata fields
13261311
aggs = self._build_cardinality_aggregations(index_mapping, normalized_metadata_fields)
1327-
13281312
if not aggs:
13291313
return {}
13301314

@@ -1338,23 +1322,26 @@ async def count_unique_metadata_by_filter_async(
13381322
)
13391323

13401324
def get_metadata_fields_info(self) -> dict[str, dict[str, str]]:
1341-
>>>>>>> main
13421325
"""
13431326
Returns the information about the fields in the index.
13441327
13451328
If we populated the index with documents like:
13461329
1330+
```python
13471331
Document(content="Doc 1", meta={"category": "A", "status": "active", "priority": 1})
13481332
Document(content="Doc 2", meta={"category": "B", "status": "inactive"})
1333+
```
13491334
13501335
This method would return:
13511336
1337+
```python
13521338
{
13531339
'content': {'type': 'text'},
13541340
'category': {'type': 'keyword'},
13551341
'status': {'type': 'keyword'},
13561342
'priority': {'type': 'long'},
13571343
}
1344+
```
13581345
13591346
:returns: The information about the fields in the index.
13601347
"""
@@ -1367,23 +1354,12 @@ def get_metadata_fields_info(self) -> dict[str, dict[str, str]]:
13671354
index_mapping = {k: v for k, v in index_mapping.items() if k not in SPECIAL_FIELDS}
13681355
return index_mapping
13691356

1370-
<<<<<<< HEAD
1371-
async def get_metadata_fields_info_async(self) -> dict[str, dict]:
1372-
=======
13731357
async def get_metadata_fields_info_async(self) -> dict[str, dict[str, str]]:
1374-
>>>>>>> main
13751358
"""
13761359
Asynchronously returns the information about the fields in the index.
13771360
13781361
If we populated the index with documents like:
13791362
1380-
<<<<<<< HEAD
1381-
Document(content="Doc 1", meta={"category": "A", "status": "active", "priority": 1})
1382-
Document(content="Doc 2", meta={"category": "B", "status": "inactive"})
1383-
1384-
This method would return:
1385-
1386-
=======
13871363
```python
13881364
Document(content="Doc 1", meta={"category": "A", "status": "active", "priority": 1})
13891365
Document(content="Doc 2", meta={"category": "B", "status": "inactive"})
@@ -1392,17 +1368,13 @@ async def get_metadata_fields_info_async(self) -> dict[str, dict[str, str]]:
13921368
This method would return:
13931369
13941370
```python
1395-
>>>>>>> main
13961371
{
13971372
'content': {'type': 'text'},
13981373
'category': {'type': 'keyword'},
13991374
'status': {'type': 'keyword'},
14001375
'priority': {'type': 'long'},
14011376
}
1402-
<<<<<<< HEAD
1403-
=======
14041377
```
1405-
>>>>>>> main
14061378
14071379
:returns: The information about the fields in the index.
14081380
"""
@@ -1485,18 +1457,6 @@ async def get_metadata_field_min_max_async(self, metadata_field: str) -> dict[st
14851457
return self._extract_min_max_from_stats(stats)
14861458

14871459
def get_metadata_field_unique_values(
1488-
<<<<<<< HEAD
1489-
self, metadata_field: str, search_term: str | None, from_: int, size: int
1490-
) -> tuple[list[str], int]:
1491-
"""
1492-
Returns unique values for a metadata field, optionally filtered by a search term in the content.
1493-
1494-
:param metadata_field: The metadata field to get unique values for.
1495-
:param search_term: Optional search term to filter documents by matching in the content field.
1496-
:param from_: The starting index for pagination.
1497-
:param size: The number of unique values to return.
1498-
:returns: A tuple containing (list of unique values, total count of unique values).
1499-
=======
15001460
self,
15011461
metadata_field: str,
15021462
search_term: str | None = None,
@@ -1515,7 +1475,6 @@ def get_metadata_field_unique_values(
15151475
:returns: A tuple containing (list of unique values, after_key for pagination).
15161476
The after_key is None when there are no more results. Use it in the `after` parameter
15171477
for the next page.
1518-
>>>>>>> main
15191478
"""
15201479
self._ensure_initialized()
15211480
assert self._client is not None
@@ -1528,14 +1487,6 @@ def get_metadata_field_unique_values(
15281487
# Use match_phrase for exact phrase matching to avoid tokenization issues
15291488
query = {"match_phrase": {"content": search_term}}
15301489

1531-
<<<<<<< HEAD
1532-
# Build aggregations
1533-
# Terms aggregation for paginated unique values
1534-
# Note: Terms aggregation doesn't support 'from' parameter directly,
1535-
# so we fetch from_ + size results and slice them
1536-
# Cardinality aggregation for total count
1537-
terms_size = from_ + size if from_ > 0 else size
1538-
=======
15391490
# Build composite aggregation for proper pagination
15401491
composite_agg: dict[str, Any] = {
15411492
"size": size,
@@ -1544,57 +1495,19 @@ def get_metadata_field_unique_values(
15441495
if after is not None:
15451496
composite_agg["after"] = after
15461497

1547-
>>>>>>> main
15481498
body = {
15491499
"query": query,
15501500
"aggs": {
15511501
"unique_values": {
1552-
<<<<<<< HEAD
1553-
"terms": {
1554-
"field": field_name,
1555-
"size": terms_size,
1556-
}
1557-
},
1558-
"total_count": {
1559-
"cardinality": {
1560-
"field": field_name,
1561-
}
1562-
},
1563-
=======
15641502
"composite": composite_agg,
15651503
}
1566-
>>>>>>> main
15671504
},
15681505
"size": 0, # we only need aggregations, not documents
15691506
}
15701507

15711508
result = self._client.search(index=self._index, body=body)
15721509
aggregations = result.get("aggregations", {})
15731510

1574-
<<<<<<< HEAD
1575-
# Extract unique values from terms aggregation buckets
1576-
unique_values_buckets = aggregations.get("unique_values", {}).get("buckets", [])
1577-
# Apply pagination by slicing the results
1578-
paginated_buckets = unique_values_buckets[from_ : from_ + size]
1579-
unique_values = [str(bucket["key"]) for bucket in paginated_buckets]
1580-
1581-
# Extract total count from cardinality aggregation
1582-
total_count = int(aggregations.get("total_count", {}).get("value", 0))
1583-
1584-
return unique_values, total_count
1585-
1586-
async def get_metadata_field_unique_values_async(
1587-
self, metadata_field: str, search_term: str | None, from_: int, size: int
1588-
) -> tuple[list[str], int]:
1589-
"""
1590-
Asynchronously returns unique values for a metadata field, optionally filtered by a search term in the content.
1591-
1592-
:param metadata_field: The metadata field to get unique values for.
1593-
:param search_term: Optional search term to filter documents by matching in the content field.
1594-
:param from_: The starting index for pagination.
1595-
:param size: The number of unique values to return.
1596-
:returns: A tuple containing (list of unique values, total count of unique values).
1597-
=======
15981511
# Extract unique values from composite aggregation buckets
15991512
unique_values_agg = aggregations.get("unique_values", {})
16001513
unique_values_buckets = unique_values_agg.get("buckets", [])
@@ -1627,7 +1540,6 @@ async def get_metadata_field_unique_values_async(
16271540
:returns: A tuple containing (list of unique values, after_key for pagination).
16281541
The after_key is None when there are no more results. Use it in the `after` parameter
16291542
for the next page.
1630-
>>>>>>> main
16311543
"""
16321544
await self._ensure_initialized_async()
16331545
assert self._async_client is not None
@@ -1640,14 +1552,6 @@ async def get_metadata_field_unique_values_async(
16401552
# Use match_phrase for exact phrase matching to avoid tokenization issues
16411553
query = {"match_phrase": {"content": search_term}}
16421554

1643-
<<<<<<< HEAD
1644-
# Build aggregations
1645-
# Terms aggregation for paginated unique values
1646-
# Note: Terms aggregation doesn't support 'from' parameter directly,
1647-
# so we fetch from_ + size results and slice them
1648-
# Cardinality aggregation for total count
1649-
terms_size = from_ + size if from_ > 0 else size
1650-
=======
16511555
# Build composite aggregation for proper pagination
16521556
composite_agg: dict[str, Any] = {
16531557
"size": size,
@@ -1656,44 +1560,31 @@ async def get_metadata_field_unique_values_async(
16561560
if after is not None:
16571561
composite_agg["after"] = after
16581562

1659-
>>>>>>> main
16601563
body = {
16611564
"query": query,
16621565
"aggs": {
16631566
"unique_values": {
1664-
<<<<<<< HEAD
1665-
"terms": {
1666-
"field": field_name,
1667-
"size": terms_size,
1668-
}
1669-
},
1670-
"total_count": {
1671-
"cardinality": {
1672-
"field": field_name,
1673-
}
1674-
},
1675-
=======
16761567
"composite": composite_agg,
16771568
}
1678-
>>>>>>> main
16791569
},
16801570
"size": 0, # we only need aggregations, not documents
16811571
}
16821572

16831573
result = await self._async_client.search(index=self._index, body=body)
16841574
aggregations = result.get("aggregations", {})
16851575

1686-
<<<<<<< HEAD
1687-
# Extract unique values from terms aggregation buckets
1688-
unique_values_buckets = aggregations.get("unique_values", {}).get("buckets", [])
1689-
# Apply pagination by slicing the results
1690-
paginated_buckets = unique_values_buckets[from_ : from_ + size]
1691-
unique_values = [str(bucket["key"]) for bucket in paginated_buckets]
1576+
# Extract unique values from composite aggregation buckets
1577+
unique_values_agg = aggregations.get("unique_values", {})
1578+
unique_values_buckets = unique_values_agg.get("buckets", [])
1579+
unique_values = [str(bucket["key"][field_name]) for bucket in unique_values_buckets]
16921580

1693-
# Extract total count from cardinality aggregation
1694-
total_count = int(aggregations.get("total_count", {}).get("value", 0))
1581+
# Extract after_key for pagination
1582+
# If we got fewer results than requested, we've reached the end
1583+
after_key = unique_values_agg.get("after_key")
1584+
if after_key is not None and size is not None and len(unique_values_buckets) < size:
1585+
after_key = None
16951586

1696-
return unique_values, total_count
1587+
return unique_values, after_key
16971588

16981589
def _prepare_sql_http_request_params(
16991590
self, base_url: str, response_format: ResponseFormat
@@ -1866,17 +1757,3 @@ async def _query_sql_async(self, query: str, response_format: ResponseFormat = "
18661757
except Exception as e:
18671758
msg = f"Failed to execute SQL query in OpenSearch: {e!s}"
18681759
raise DocumentStoreError(msg) from e
1869-
=======
1870-
# Extract unique values from composite aggregation buckets
1871-
unique_values_agg = aggregations.get("unique_values", {})
1872-
unique_values_buckets = unique_values_agg.get("buckets", [])
1873-
unique_values = [str(bucket["key"][field_name]) for bucket in unique_values_buckets]
1874-
1875-
# Extract after_key for pagination
1876-
# If we got fewer results than requested, we've reached the end
1877-
after_key = unique_values_agg.get("after_key")
1878-
if after_key is not None and size is not None and len(unique_values_buckets) < size:
1879-
after_key = None
1880-
1881-
return unique_values, after_key
1882-
>>>>>>> main

0 commit comments

Comments
 (0)