-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Description
Steps to reproduce
Hi, I was implementing a custom parser for a project that I was working on and this was my UDF:
class LandingAIRadiologyParser(pw.UDF):
"""Parse radiology reports using LandingAI agentic-doc library."""
def __init__(self, api_key: str, capacity: int, results_dir: str, cache_strategy: pw.udfs.CacheStrategy = None, *, async_mode: str = "fully_async", **kwargs):
self.api_key = api_key
self.async_mode = async_mode
self.results_dir = results_dir
self.capacity = capacity
from pathway.xpacks.llm._utils import _prepare_executor
executor = _prepare_executor(async_mode)
super().__init__(cache_strategy=cache_strategy, executor=executor)
async def parse(self, contents: bytes) -> List[tuple[str, dict]]:
"""Parse radiology reports using LandingAI."""
# Create results directory
results_dir = Path(self.results_dir)
results_dir.mkdir(exist_ok=True)
# Parse document with LandingAI
parsed_results = parse(
contents,
include_marginalia=True,
extraction_model=RadiologyExtractionModel
include_metadata_in_markdown=True,
result_save_dir=str(results_dir),
config=ParseConfig(api_key=self.api_key)
)
if not parsed_results:
return [("", {"source": "landingai", "error": "No parsing results"})]
parsed_doc = parsed_results[0]
text_content = getattr(parsed_doc, 'markdown', "")
# Create clean metadata
metadata = {
"source": "landingai",
"confidence": str(getattr(parsed_doc, 'confidence', 0.0))
}
# Ensure string types for Pathway
safe_text = str(text_content) if text_content else ""
safe_metadata = {k: str(v) if v is not None else "" for k, v in metadata.items()}
return [(safe_text, safe_metadata)]
async def __wrapped__(self, contents: bytes, **kwargs) -> list[tuple[str, dict]]:
return await self.parse(contents)
Now you can see that I am passing RadiologyExtractionModel which I tried to:
- Define in the same file
- "" same function
- by pointing a variable in the yaml and then refrencing as a parameter to parser
But every time I run this, the output model of parsed_doc.extraction comes out to be BaseModel now if we call model_dump, it gives the error "Pydantic models should inherit from BaseModel, BaseModel cannot be instantiated directly" which sounds valid.
Now a quick workaround I found was to use extraction_shema in which we have to pass a python dictionary object and bypass the whole pydantic model issue directly and it works perfectly
class LandingAIRadiologyParser(pw.UDF):
"""Parse radiology reports using LandingAI agentic-doc library."""
def __init__(self, api_key: str, capacity: int, results_dir: str, cache_strategy: pw.udfs.CacheStrategy = None, *, async_mode: str = "fully_async", **kwargs):
self.api_key = api_key
self.async_mode = async_mode
self.results_dir = results_dir
self.capacity = capacity
from pathway.xpacks.llm._utils import _prepare_executor
executor = _prepare_executor(async_mode)
super().__init__(cache_strategy=cache_strategy, executor=executor)
async def parse(self, contents: bytes) -> List[tuple[str, dict]]:
"""Parse radiology reports using LandingAI."""
# Create results directory
results_dir = Path(self.results_dir)
results_dir.mkdir(exist_ok=True)
# Define extraction schema in JSON Schema format
extraction_schema = {
"type": "object",
"properties": {
"patient_id": {
"type": "string",
"description": "Patient identification number or ID"
},
"study_type": {
"type": "string",
"description": "Type of radiological study (CT, MRI, X-ray, Ultrasound, etc.)"
},
"findings": {
"type": "string",
"description": "Key radiological findings and observations from the study"
},
"impression": {
"type": "string",
"description": "Radiologist's impression, conclusion, and clinical interpretation"
},
"critical_findings": {
"type": "string",
"description": "Any critical, urgent, or life-threatening findings requiring immediate attention"
}
},
"additionalProperties": False,
"required": ["study_type", "findings", "impression"]
}
# Parse document with LandingAI using proper JSON Schema
parsed_results = parse(
contents,
include_marginalia=True,
include_metadata_in_markdown=True,
result_save_dir=str(results_dir),
extraction_schema=extraction_schema,
config=ParseConfig(api_key=self.api_key)
)
if not parsed_results:
return [("", {"source": "landingai", "error": "No parsing results"})]
parsed_doc = parsed_results[0]
text_content = getattr(parsed_doc, 'markdown', "")
# Extract structured data from extraction_metadata if available
extraction_data = {}
if hasattr(parsed_doc, 'extraction_metadata') and parsed_doc.extraction_metadata:
for field, data in parsed_doc.extraction_metadata.items():
if isinstance(data, dict) and 'value' in data and data['value']:
extraction_data[field] = data['value']
# Create clean metadata with extracted fields
metadata = {
"source": "landingai",
"confidence": str(getattr(parsed_doc, 'confidence', 0.0)),
**{k: str(v) for k, v in extraction_data.items() if v is not None}
}
# Ensure string types for Pathway
safe_text = str(text_content) if text_content else ""
safe_metadata = {k: str(v) if v is not None else "" for k, v in metadata.items()}
return [(safe_text, safe_metadata)]
async def __wrapped__(self, contents: bytes, **kwargs) -> list[tuple[str, dict]]:
return await self.parse(contents)
Can you please investigate as to how or why this might be happening. I feel there some internal handling of pathway user defined function might be causing this issue.
Relevant log output
Pydantic models should inherit from BaseModel, BaseModel cannot be instantiated directly
What did you expect to happen?
It should return the RadiologyExtractionModel instead
Version
0.26.1
Docker Versions (if used)
No response
OS
Mac 15.5
On which CPU architecture did you run Pathway?
Arm