Skip to content

Support for Polars Dataframes #17

@ldacey

Description

@ldacey

pandantic seemed to be such a nice and simple implementation that I decided edit your model to use with Polars Dataframes and figured I would share the results.

I only recently began using polars so there might be more efficient ways, but here were the changes I had to make to your model:

  1. There is no index, so replaced it using with_row_count() to get the row number for errors
  2. Chunk logic can be handled by iter_slices() where the n_rows can be determined by the total rows / CPU count
  3. Instead of to_dict(), we use iter_rows(named=True) to pass each row into the validator
  4. We use filter() to exclude the error rows if the errors is set to "filter"
from multiprocess import Process, Queue, cpu_count
import polars as pl
import math
import os
from pydantic import BaseModel
import logging

class PolarsModel(BaseModel):

    @classmethod
    def parse_df(
        cls,
        dataframe: pl.DataFrame,
        errors: str = "raise",
        context: dict[str, object] | None = None,
        n_jobs: int = 1,
        verbose: bool = True,
    ) -> pl.DataFrame:
        
        errors_index = []
        dataframe = dataframe.clone().with_row_count()
        
        logging.info(f"Validating {dataframe.height} rows")
        logging.debug(f"Amount of available cores: {cpu_count()}")
        
        if n_jobs != 1:
            if n_jobs < 0:
                n_jobs = cpu_count()

            chunk_size = math.ceil(len(dataframe) / n_jobs)
            chunks = list(dataframe.iter_slices(n_rows=chunk_size))
            total_chunks = len(chunks)

            logging.info(f"Split the dataframe into {total_chunks} chunks to process {chunk_size} rows per chunk.")

            processes = []
            q = Queue()

            for chunk in chunks:
                p = Process(target=cls._validate_row, args=(chunk, q, context, verbose), daemon=True)
                p.start()
                processes.append(p)

            num_stops = 0
            while num_stops < total_chunks:
                index = q.get()
                if index is None:
                    num_stops += 1
                else:
                    errors_index.append(index)

            for p in processes:
                p.join()

        else:
            for row in dataframe.iter_rows(named=True):
                try:
                    cls.model_validate(obj=row, context=context)
                except Exception as exc:
                    if verbose:
                        logging.info(f"Validation error found at row {row['row_nr']}\n{exc}")
                    errors_index.append(row["row_nr"])

        logging.info(f"# invalid rows: {len(errors_index)}")

        if len(errors_index) > 0 and errors == "raise":
            raise ValueError(f"{len(errors_index)} validation errors found in dataframe.")
            
        if len(errors_index) > 0 and errors == "filter":
            return dataframe.filter(~pl.col("row_nr").is_in(errors_index)).drop(columns=["row_nr"])

        return dataframe.drop(columns=["row_nr"])

    @classmethod
    def _validate_row(cls, chunk: pl.DataFrame, q: Queue, context=None, verbose=True) -> None:
        for row in chunk.iter_rows(named=True):
            try:
                cls.model_validate(obj=row, context=context)
            except Exception as exc:
                if verbose:
                    logging.info(f"Validation error found at row {row['row_nr']}\n{exc}")
                q.put(row["row_nr"])
        q.put(None)

I tested this on a dataframe which I duplicated a bunch of times until the row count was > 1 million rows to check if n_jobs was functioning correctly.

With n_jobs = 1:
image

With n_jobs = 4 (twice as fast):
image

Example validation error if verbose=True:
image

Example with errors="filter", the resulting dataframe has the expected rows:

image

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions