Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/dremioai/api/dremio/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,21 @@ class LineageResponse(BaseModel):
children: List[LineageChildren]


async def get_lineage(dataset_id_or_path: str) -> Dict[str, Any]:
async def get_lineage(
dataset_id_or_path: str,
remove_catalog_name: Optional[bool] = True
) -> Dict[str, Any]:
client = AsyncHttpClient()
if "." in dataset_id_or_path:
response = await get_schema(dataset_id_or_path, by_id=False)
dataset_id_or_path = response["id"]

project_id = settings.instance().dremio.project_id
endpoint = f"/v0/projects/{project_id}/catalog" if project_id else "/api/v3/catalog"
params = {"removeCatalogName": remove_catalog_name}
result: LineageResponse = await client.get(
f"{endpoint}/{dataset_id_or_path}/graph",
params=params,
deser=LineageResponse,
)
return result.model_dump()
Expand All @@ -130,10 +135,12 @@ async def get_schema(
by_id: Optional[bool] = False,
include_tags: Optional[bool] = False,
flatten: Optional[bool] = False,
remove_catalog_name: Optional[bool] = True
) -> Dict[str, Any]:
client = AsyncHttpClient()
project_id = settings.instance().dremio.project_id
endpoint = f"/v0/projects/{project_id}/catalog" if project_id else "/api/v3/catalog"
params = {"removeCatalogName": remove_catalog_name}
if by_id:
endpoint += "/" + dataset_path_or_id
else:
Expand All @@ -142,7 +149,7 @@ async def get_schema(
reader(StringIO(dataset_path_or_id), delimiter=".", dialect=excel)
)[0]
endpoint += f'/by-path/{"/".join(dataset_path_or_id)}'
schema = await client.get(endpoint)
schema = await client.get(endpoint, params=params)

if include_tags:

Expand Down
8 changes: 7 additions & 1 deletion src/dremioai/api/dremio/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ class EnterpriseSearchResultsWrapper(BaseModel):


async def get_search_results(
search: str | Search, use_df: bool = False
search: str | Search, use_df: bool = False,
remove_catalog_name: Optional[bool] = True
) -> EnterpriseSearchResultsWrapper | pd.DataFrame:
if isinstance(search, str):
search = Search(query=search)
Expand All @@ -225,11 +226,15 @@ async def get_search_results(
if settings.instance().dremio.project_id
else "/api/v3/search"
)

params = {"removeCatalogName": remove_catalog_name}

result = []
response = await client.post(
endpoint,
body=search.model_dump(exclude_none=True),
deser=EnterpriseSearchResults,
params=params,
)
while response.results and response.error is None and response.more is None:
result.extend(response.results)
Expand All @@ -240,6 +245,7 @@ async def get_search_results(
endpoint,
body=search.model_dump(exclude_none=True),
deser=EnterpriseSearchResults,
params=params,
)

if use_df:
Expand Down
5 changes: 3 additions & 2 deletions src/dremioai/api/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,12 @@ async def post(
deser: Optional[DeserializationStrategy] = None,
file: Optional[TextIO] = None,
top_level_list: bool = False,
params: Dict[AnyStr, Any] = None,
):
async with ClientSession(middlewares=(retry_middleware,)) as session:
self.log_request("POST", endpoint)
self.log_request("POST", endpoint, params)
async with session.post(
f"{self.uri}{endpoint}", headers=self.headers, json=body, ssl=False
f"{self.uri}{endpoint}", params=params, headers=self.headers, json=body, ssl=False
) as response:
return await self.handle_response(
response, deser, file, top_level_list=top_level_list
Expand Down