Skip to content

Commit 6484e5a

Browse files
authored
Implement segment url udf in Rust
### Related - tiny part of https://linear.app/rerun/issue/RR-3568/udf-for-creating-links-is-incomplete ### What This refactors the existing `segment_url()` stuff so that the UDF is implement in rust side, because: - the python implementation was really broken - paves the way to add more features Source-Ref: cddfb14e9a78cff092eb1cf9131fe060e51d87ab
1 parent 0f9f374 commit 6484e5a

File tree

9 files changed

+543
-74
lines changed

9 files changed

+543
-74
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10955,6 +10955,7 @@ dependencies = [
1095510955
"re_server",
1095610956
"re_sorbet",
1095710957
"re_tuid",
10958+
"re_types_core",
1095810959
"re_uri",
1095910960
"re_video",
1096010961
"re_web_viewer_server",

docs/content/reference/migration/migration-0-30.md

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,37 @@ order: 980
55

66
## 🐍 Python API
77

8-
### Deprecated UDF have been removed
8+
### `segment_url_udf` and `segment_url_with_timeref_udf` have been removed
99

10-
The deprecated `partition_url`, `partition_url_udf`, and `partition_url_with_timeref_udf` functions in
11-
`rerun.utilities.datafusion.functions.url_generation` have been removed. Use the `segment_url` equivalents instead:
10+
The `segment_url_udf()` and `segment_url_with_timeref_udf()` functions in
11+
`rerun.utilities.datafusion.functions.url_generation` have been removed. Use `segment_url()` instead,
12+
which covers both use cases:
1213

13-
| Removed | Replacement |
14-
|------------------------------------|----------------------------------|
15-
| `partition_url()` | `segment_url()` |
16-
| `partition_url_udf()` | `segment_url_udf()` |
17-
| `partition_url_with_timeref_udf()` | `segment_url_with_timeref_udf()` |
14+
Before:
15+
16+
```python
17+
from rerun.utilities.datafusion.functions.url_generation import segment_url_udf, segment_url_with_timeref_udf
18+
19+
# Without timestamp
20+
udf = segment_url_udf(dataset)
21+
df.with_column("url", udf(col("rerun_segment_id")))
22+
23+
# With timestamp
24+
udf = segment_url_with_timeref_udf(dataset, "my_timeline")
25+
df.with_column("url", udf(col("rerun_segment_id"), col("ts"), lit("my_timeline")))
26+
```
27+
28+
After:
29+
30+
```python
31+
from rerun.utilities.datafusion.functions.url_generation import segment_url
32+
33+
# Without timestamp
34+
df.with_column("url", segment_url(dataset))
35+
36+
# With timestamp
37+
df.with_column("url", segment_url(dataset, timestamp_col="ts", timeline_name="my_timeline"))
38+
```
39+
40+
Also, the previously deprecated `partition_url()`, `partition_url_udf()`, and `partition_url_with_timeref_udf()`
41+
function have been removed.

rerun_py/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ re_memory.workspace = true
7171
re_sdk = { workspace = true, features = ["data_loaders"] }
7272
re_sorbet.workspace = true
7373
re_tuid.workspace = true
74+
re_types_core.workspace = true
7475
re_uri.workspace = true
7576
re_video.workspace = true
7677
re_web_viewer_server = { workspace = true, optional = true }

rerun_py/rerun_bindings/rerun_bindings.pyi

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -926,6 +926,9 @@ class EntryId:
926926
def __str__(self) -> str:
927927
"""Return str(self)."""
928928

929+
def as_bytes(self) -> bytes:
930+
"""Return the raw 16-byte representation."""
931+
929932
class EntryKind:
930933
"""The kinds of entries that can be stored in the catalog."""
931934

@@ -1057,6 +1060,12 @@ class DatasetEntryInternal:
10571060
unsafe_allow_recent_cleanup: bool = False,
10581061
) -> None: ...
10591062

1063+
class SegmentUrlUdfInternal:
1064+
"""Rust-backed ScalarUDF for building segment URLs."""
1065+
1066+
def __datafusion_scalar_udf__(self) -> Any:
1067+
"""Scalar UDF pycapsule."""
1068+
10601069
class DatasetViewInternal:
10611070
"""Internal Rust implementation of DatasetView."""
10621071

rerun_py/rerun_sdk/rerun/utilities/datafusion/functions/url_generation.py

Lines changed: 13 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from typing import TYPE_CHECKING
44

55
import pyarrow as pa
6-
import pyarrow.compute
76

87
from rerun.error_utils import RerunMissingDependencyError
98

@@ -12,7 +11,7 @@
1211

1312
HAS_DATAFUSION = True
1413
try:
15-
from datafusion import Expr, ScalarUDF, col, udf
14+
from datafusion import Expr, ScalarUDF, col, lit
1615
except ModuleNotFoundError:
1716
HAS_DATAFUSION = False
1817

@@ -51,73 +50,35 @@ def segment_url(
5150
"""
5251
if not HAS_DATAFUSION:
5352
raise RerunMissingDependencyError("datafusion", "datafusion")
53+
5454
if segment_id_col is None:
5555
segment_id_col = col("rerun_segment_id")
5656
if isinstance(segment_id_col, str):
5757
segment_id_col = col(segment_id_col)
5858

59+
rust_udf = _make_rust_udf()
60+
61+
origin_expr = lit(dataset.catalog.url)
62+
entry_id_expr = lit(pa.scalar(dataset.id.as_bytes(), type=pa.binary(16)))
63+
5964
if timestamp_col is not None:
6065
if timeline_name is None:
6166
timeline_name = str(timestamp_col)
6267

6368
if isinstance(timestamp_col, str):
6469
timestamp_col = col(timestamp_col)
6570

66-
inner_udf = segment_url_with_timeref_udf(dataset, timeline_name)
67-
return inner_udf(segment_id_col, timestamp_col).alias("segment_url_with_timestamp")
68-
69-
inner_udf = segment_url_udf(dataset)
70-
return inner_udf(segment_id_col).alias("segment_url")
71-
72-
73-
def segment_url_udf(dataset: DatasetEntry) -> ScalarUDF:
74-
"""
75-
Create a UDF to the URL for a segment within a Dataset.
76-
77-
This function will generate a UDF that expects one column of input,
78-
a string containing the segment ID.
79-
"""
80-
if not HAS_DATAFUSION:
81-
raise RerunMissingDependencyError("datafusion", "datafusion")
82-
83-
def inner_udf(segment_id_arr: pa.Array) -> pa.Array:
84-
return pa.compute.binary_join_element_wise(
85-
dataset.segment_url(""),
86-
segment_id_arr,
87-
"", # Required for join
71+
return rust_udf(origin_expr, entry_id_expr, segment_id_col, timestamp_col, lit(timeline_name)).alias(
72+
"segment_url"
8873
)
8974

90-
return udf(inner_udf, [pa.string()], pa.string(), "stable")
75+
return rust_udf(origin_expr, entry_id_expr, segment_id_col, lit(None), lit(None)).alias("segment_url")
9176

9277

93-
def segment_url_with_timeref_udf(dataset: DatasetEntry, timeline_name: str) -> ScalarUDF:
94-
"""
95-
Create a UDF to the URL for a segment within a Dataset with timestamp.
96-
97-
This function will generate a UDF that expects two columns of input,
98-
a string containing the segment ID and the timestamp in nanoseconds.
99-
"""
78+
def _make_rust_udf() -> ScalarUDF:
10079
if not HAS_DATAFUSION:
10180
raise RerunMissingDependencyError("datafusion", "datafusion")
10281

103-
def inner_udf(segment_id_arr: pa.Array, timestamp_arr: pa.Array) -> pa.Array:
104-
# The choice of `ceil_temporal` is important since this timestamp drives a cursor
105-
# selection. Due to Rerun latest-at semantics, in order for data from the provided
106-
# timestamp to be visible, the cursor must be set to a point in time which is
107-
# greater than or equal to the target.
108-
timestamp_us = pa.compute.ceil_temporal(timestamp_arr, unit="microsecond")
109-
110-
timestamp_us = pa.compute.strftime(
111-
timestamp_us,
112-
"%Y-%m-%dT%H:%M:%SZ",
113-
)
114-
115-
return pa.compute.binary_join_element_wise(
116-
dataset.segment_url(""),
117-
segment_id_arr,
118-
f"#when={timeline_name}@",
119-
timestamp_us,
120-
"", # Required for join
121-
)
82+
from rerun_bindings import SegmentUrlUdfInternal # type: ignore[attr-defined]
12283

123-
return udf(inner_udf, [pa.string(), pa.timestamp("ns")], pa.string(), "stable")
84+
return ScalarUDF.from_pycapsule(SegmentUrlUdfInternal())

rerun_py/src/catalog/entry.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ impl PyEntryId {
3232
pub fn __str__(&self) -> String {
3333
self.id.to_string()
3434
}
35+
36+
/// Return the raw 16-byte representation.
37+
pub fn as_bytes<'py>(&self, py: Python<'py>) -> pyo3::Bound<'py, pyo3::types::PyBytes> {
38+
pyo3::types::PyBytes::new(py, &self.id.id.as_bytes())
39+
}
3540
}
3641

3742
impl From<EntryId> for PyEntryId {

rerun_py/src/catalog/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ mod index_columns;
1313
mod indexes;
1414
mod registration_handle;
1515
mod schema;
16+
mod segment_url_udf;
1617
mod table_entry;
1718
mod table_provider_adapter;
1819
mod trace_context;
@@ -37,6 +38,7 @@ pub use self::indexes::{
3738
};
3839
pub use self::registration_handle::{PyRegistrationHandleInternal, PyRegistrationIterator};
3940
pub use self::schema::PySchemaInternal;
41+
pub use self::segment_url_udf::PySegmentUrlUdfInternal;
4042
pub use self::table_entry::{PyTableEntryInternal, PyTableInsertModeInternal};
4143
pub use self::table_provider_adapter::PyTableProviderAdapterInternal;
4244
pub use self::type_aliases::{AnyComponentColumn, IndexValuesLike, PyIndexValuesLikeInternal};
@@ -54,6 +56,7 @@ pub(crate) fn register(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()>
5456
m.add_class::<PyRegistrationHandleInternal>()?;
5557
m.add_class::<PyRegistrationIterator>()?;
5658
m.add_class::<PyTableProviderAdapterInternal>()?;
59+
m.add_class::<PySegmentUrlUdfInternal>()?;
5760
m.add_class::<PyDatasetViewInternal>()?;
5861
m.add_class::<PyRerunHtmlTable>()?;
5962

0 commit comments

Comments
 (0)