ENH: Add support for executing UDF's using Bodo as the engine #11
Description
Feature Type
-
Adding new functionality to pandas
-
Changing existing functionality in pandas
-
Removing existing functionality in pandas
Problem Description
Applying User Defined Functions (UDFs) to a DataFrame can be very slow when evaluated using the default python engine. Passing engine="numba"
and leveraging Numba's Just-in-Time (JIT) compiler to transform the UDF application into an optimized binary can improve performance, however there are several limitations to the Numba UDF engine including:
- Limited set of dtypes supported (only supports numpy dtypes, does not support ExtensionDtypes)
- Parallel execution not supported (unless
raw=True
) - Difficulty troubleshooting issues due to lengthy stack traces and hard-to-read error messages.
Adding support for the Bodo engine would solve the above issues and provide a good complement to the capabilities of the currently supported engines (Python and Numba).
Bodo uses an auto-parallelizing JIT compiler to transform Python code into highly optimized, parallel binaries with an MPI backend, allowing it to scale to very large data sizes with minimal extra work required from the user (large speedups on both laptops and clusters). Bodo is also built for Pandas and supports DataFrame, Series and Array Extension types natively.
Feature Description
Allow passing the value "bodo"
to the engine
parameter in DataFrame.apply
and add an apply_bodo
method which accepts the user defined function and creates a jit function to do the apply and calls it. For example:
In pandas/core/apply.py
class FrameApply(NDFrameApply):
...
def apply_series_bodo(self) -> DataFrame | Series:
bodo = import_optional_dependency("bodo")
engine_kwargs = bodo_get_jit_arguments(self.engine_kwargs)
@bodo.jit(**engine_kwargs)
def do_apply(obj, func, axis):
return obj.apply(func, axis)
result = do_apply(self.obj, self.func, self.axis)
return result
This approach could also be applied to other API's that accepts a UDF and engine argument.
Alternative Solutions
Users could execute their UDF using a Bodo JIT'd function. For example:
import bodo
import pandas as pd
def f(x):
return x.A // x.B if x.B != 0 else 0
@bodo.jit
def apply_udf(df, func):
return df.apply(func, axis=1)
df = pd.DataFrame({"A": [1,2,3,4,5], "B": [0, 1, 2, 2, 2]})
result = apply_udf(df, f)
While this approach is fine, it has it's downsides such as requiring a larger code rewrite which could make it more difficult to quickly experiment with different engines.
Additional examples:
To demonstrate the value that the Bodo engine would bring to Pandas users, consider the following examples which highlight some of the limitations of the current engines (Python and Numba):
Numba does not support non-numeric column types such as strings and has limited support for Pandas APIs inside the UDF. The following example would not compile with the numba engine:
df = pd.DataFrame({"A": ["Hello", "Hi"] * 5, "B": [1, 2, 3, 4, 5] * 2, "C": [5,4,3,2,1] * 2})
def g(x):
last_cols = pd.Series([x.B, x.C], index=["red", "blue"])
return x.A + " " +str(last_cols.idxmin())
print(df.apply(g, axis=1, engine="bodo"))
Bodo also natively supports Timestamp (with timezone) and DateOffset types inside UDFs whereas numba only supports datetime64/timedelta64:
tz = "US/Pacific"
df = pd.DataFrame(
{
"order_date": pd.array([
pd.Timestamp('2017-01-06T12', tz=tz),
pd.Timestamp('2018-11-23T12', tz=tz),
pd.Timestamp('2017-10-02T12', tz=tz),
pd.Timestamp('2025-01-28T12', tz=tz),
])
}
)
def f(x):
dayofweek = x.order_date.dayofweek
if dayofweek in [0, 1, 2, 3]:
return "weekday"
elif dayofweek == 4:
return "friday"
else:
return "weekend"
df["time_bucket"] = df.apply(f, axis=1, engine="bodo")
tz = "US/Pacific"
df = pd.DataFrame(
{
"order_date": pd.array([
pd.Timestamp('2018-01-06T12', tz=tz),
pd.Timestamp('2018-02-23T12', tz=tz),
pd.Timestamp('2018-11-02T12', tz=tz),
pd.Timestamp('2018-12-28T12', tz=tz),
])
}
)
def f(x):
month = x.order_date.month
if month == 12:
return None
return x.order_date + pd.DateOffset(months=1)
df["next_order_this_year"] = df.apply(f, axis=1, engine="bodo")
Lastly, ExtensionDtypes are not supported by the Numba engine, so this example with pyarrow.decimal128
type would not compile:
from decimal import Decimal
import pandas as pd
import pyarrow as pa
df = pd.DataFrame(
{
"A": pd.array(
[Decimal("0.000000000000001"), Decimal("5"), Decimal("0.1")],
dtype=pd.ArrowDtype(pa.decimal128(32, 18)),
)
}
)
def f(x):
if x.A < Decimal("0.00000002"):
return "low"
elif x.A < Decimal("0.5"):
return "med"
else:
return "high"
df["bucket"] = df.apply(f, engine="numba", raw=True, axis=1)
For large Dataframes, the Bodo engine can offer performance benefits over the Python-based engine because it can translate functions into optimized machine code that executes in parallel. For example, consider this UDF which preprocesses text data by taking sections from two passages and joining them together to form a new passage:
import time
import pandas as pd
rand_df = pd.read_parquet("random_strings.pq")
def combine_str(x):
midA = len(x.A) // 2
midB = len(x.B) // 2
return x.A[midA:] + x.B[:midB]
for engine in ("bodo", "python"):
start = time.time()
res = rand_df.apply(combine_str, axis=1, engine=engine)
end = time.time()
print(f"total execution time of {engine} engine:", end-start)
For this example, we can randomly generate the string data to use:
import string
import random
import pandas as pd
import pyarrow as pa
NUM_STRINGS = 1_000_000
def get_random_string():
length = random.randint(80, 120)
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
random_string_A = [get_random_string() for _ in range(NUM_STRINGS)]
random_string_B = [get_random_string() for _ in range(NUM_STRINGS)]
string_dtype = pd.ArrowDtype(pa.large_string())
rand_df = pd.DataFrame(
{
"A": pd.array(random_string_A, dtype=string_dtype),
"B": pd.array(random_string_B, dtype=string_dtype)
}
)
print(rand_df.head())
rand_df.to_parquet("random_strings.pq")
Running on my laptop I saw:
total execution time of bodo engine: 8.819313049316406
total execution time of python engine: 45.07858085632324
Which represents a 5x improvement for 1 million strings.
Additional Context
Relevant links:
Bodo's documentation
Bodo's github repo
Proof-of-concept PR that adds support for engine="bodo"
in df.apply
.