Skip to content

MultiSink Support for rr.BinaryStream #11441

@pablovela5620

Description

@pablovela5620

Is your feature request related to a problem? Please describe.

When building an annotation app using Gradio and gradio-rerun-viewer, Id like the ability to use set_sink functionlatiy so that I can view and save the logged data. This is based on the conversation @jprochazk and I had a while back https://rerunio.slack.com/archives/C04UBTMMGJK/p1749483644307869?thread_ts=1749216143.679839&cid=C04UBTMMGJK

Describe the solution you'd like

Right now rr.BinaryStream does not work with the CallbackSink setup from python, making it work means that only RecordingStream setup would need to be modified to allow viewing and saving at the same time

Describe alternatives you've considered

I have tried two things

  1. Manually holding the state of things I want logged in a custom python object and calling rr.log twice, once for the viewer and one for saving. This is not ideal as it can be error prone if I miss something I needed to log and calling rr.log twice is unweidly

  2. Storing a per-user buffer for each session in buffer bytes in session state

@dataclass(slots=True)
class SimpleAppState:
    """Session-local recording metadata and buffered stream bytes."""

    recording_id: uuid.UUID
    """Recording identifier reused for incremental logging."""
    rrd_buffer: bytearray
    """In-memory copy of every chunk streamed to the viewer."""

def _log_img_state_buffer(
    state: SimpleAppState, image: UInt8[ndarray, "height width 3"]
) -> Generator[tuple[SimpleAppState, bytes], None, None]:
    recording: rr.RecordingStream = get_recording(recording_id=state.recording_id)
    stream: rr.BinaryStream = recording.binary_stream()
    RRD_SAVE_DIR.mkdir(parents=True, exist_ok=True)

    blueprint = rrb.Blueprint(
        rrb.Horizontal(
            rrb.Spatial2DView(origin="image/original"),
            rrb.Spatial2DView(origin="image/blurred"),
        ),
        collapse_panels=True,
    )

    recording.send_blueprint(blueprint)
    recording.set_time("iteration", sequence=0)
    recording.log("image/original", rr.Image(image))

    chunk: bytes | None = stream.read()
    if chunk is not None:
        state.rrd_buffer.extend(chunk)
        yield state, chunk

    blur: UInt8[ndarray, "height width 3"] = image.copy()
    for i in range(50):
        recording.set_time("iteration", sequence=i)
        time.sleep(0.1)
        blur = cv2.GaussianBlur(blur, (5, 5), 0)
        recording.log("image/blurred", rr.Image(blur))

        chunk = stream.read()
        if chunk is not None:
            state.rrd_buffer.extend(chunk)
            yield state, chunk

    if state.rrd_buffer:
        rrd_path: Path = RRD_SAVE_DIR / f"{state.recording_id}{RRD_SUFFIX}"
        rrd_path.write_bytes(bytes(state.rrd_buffer))

Unfortunately, this ends up only logging the first image and none of the subsequent iterations

What it looks like in the gradio viewer

Image

What it looks like in the saved rrd file

Image

Additional context

This is what the simple barebones gradio app looks like

import time
import uuid
from collections.abc import Generator
from dataclasses import dataclass, field
from pathlib import Path
from typing import Final

import cv2
import gradio as gr
import rerun as rr
import rerun.blueprint as rrb
from gradio_rerun import Rerun
from jaxtyping import UInt8
from numpy import ndarray

RRD_SAVE_DIR: Final[Path] = Path("data") / "rrd-gradio-saves"
RRD_SUFFIX: Final[str] = ".rrd"


def get_recording(recording_id: uuid.UUID | None, application_id: str = "Application ID") -> rr.RecordingStream:
    return rr.RecordingStream(application_id=application_id, recording_id=recording_id)


@dataclass(slots=True)
class SimpleAppState:
    """Session-local recording metadata and buffered stream bytes."""

    recording_id: uuid.UUID
    """Recording identifier reused for incremental logging."""
    rrd_buffer: bytearray = field(default_factory=bytearray)
    """In-memory copy of every chunk streamed to the viewer."""



def _log_img(
    state: SimpleAppState, image: UInt8[ndarray, "height width 3"]
) -> Generator[tuple[SimpleAppState, bytes], None, None]:
    recording: rr.RecordingStream = get_recording(recording_id=state.recording_id)

    stream: rr.BinaryStream = recording.binary_stream()

    RRD_SAVE_DIR.mkdir(parents=True, exist_ok=True)

    blueprint = rrb.Blueprint(
        rrb.Horizontal(
            rrb.Spatial2DView(origin="image/original"),
            rrb.Spatial2DView(origin="image/blurred"),
        ),
        collapse_panels=True,
    )

    recording.send_blueprint(blueprint)
    recording.set_time("iteration", sequence=0)
    recording.log("image/original", rr.Image(image))

    chunk: bytes | None = stream.read()
    if chunk is not None:
        yield state, chunk

    blur: UInt8[ndarray, "height width 3"] = image.copy()
    for i in range(50):
        recording.set_time("iteration", sequence=i)

        # Pretend blurring takes a while so we can see streaming in action.
        time.sleep(0.1)
        blur = cv2.GaussianBlur(blur, (5, 5), 0)
        recording.log("image/blurred", rr.Image(blur))

        # Each time we yield bytes from the stream back to Gradio, they
        # are incrementally sent to the viewer. Make sure to yield any time
        # you want the user to be able to see progress.
        chunk: bytes | None = stream.read()
        if chunk is not None:
            yield state, chunk

    chunk: bytes | None = stream.read()
    if chunk is not None:
        yield state, chunk


# When exposing Gradio callbacks that stream results to the Rerun viewer,
# leave the callback parameters unannotated to avoid Beartype issues.
def log_img(state, image):
    if image is None:
        raise gr.Error("Must provide an image to blur.")

    app_state: SimpleAppState = state
    image_uint8: UInt8[ndarray, "height width 3"] = image

    yield from _log_img_state_buffer(app_state, image_uint8)


def build() -> gr.Blocks:
    session_state: gr.State = gr.State(SimpleAppState(recording_id=uuid.uuid4()))
    with gr.Blocks() as demo:
        with gr.Row():
            with gr.Column(scale=2):
                input_image: gr.Image = gr.Image(
                    label="Input Image",
                    type="numpy",
                    image_mode="RGB",
                )
                gr.Examples(
                    examples=[
                        ["https://huggingface.co/datasets/pablovela5620/minicoco/resolve/main/000000000785.jpg"],
                        ["https://huggingface.co/datasets/pablovela5620/minicoco/resolve/main/000000001296.jpg"],
                    ],
                    inputs=[input_image],
                    cache_examples=False,
                )
                blur_button: gr.Button = gr.Button("Blur Image")
            with gr.Column(scale=5):
                rerun_viewer: Rerun = Rerun(
                    streaming=True,
                    panel_states={
                        "time": "collapsed",
                        "blueprint": "collapsed",
                        "selection": "collapsed",
                    },
                )

        blur_button.click(
            log_img,
            inputs=[session_state, input_image],
            outputs=[session_state, rerun_viewer],
        )

    return demo

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request👀 needs triageThis issue needs to be triaged by the Rerun team

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions