Skip to content

Feature Request: Streaming API specific to Bluesky #6

@thopkins32

Description

@thopkins32

I was playing around with Tiled Streaming and I think it would be a useful feature to have a Bluesky-specific API for setting up subscriptions to Tiled.

I'm not sure if something like this exists already.

One use-case I have is that I only want to subscribe to a specific stream ("baseline") and this is how I currently do it. I'm not sure if it is "correct" to do it this way, but having this filtering sit behind an API would be very nice to have!

import traceback
import threading
from tiled.client import from_uri
from bluesky_tiled_plugins import BlueskyEventStream


def tprint(msg):
    "Thread-aware print"
    thread_name = threading.current_thread().name
    print(f"[{thread_name}] {msg}")


def on_new_child(update):
    "A new child node has been created in a container."
    try:
        child = update.child()
        # Filter: only subscribe to 'baseline' streams
        if isinstance(child, BlueskyEventStream) and child.item.get("id") != "baseline":
            tprint(f"Skipping stream '{child.item.get("id")}' (only subscribing to 'baseline')")
            return
        
        sub = child.subscribe()
        
        # Is the child also a container?
        if child.structure_family == "container":
            # Recursively subscribe to the children of this new container.
            sub.child_created.add_callback(on_new_child)
        else:
            # Subscribe to data updated (maybe appended).
            sub.new_data.add_callback(on_new_data)
        
        # Launch the subscription. Request that the server replay from
        # the very first update (sequence number 1) if we missed any
        # during the time it took to set up this subscription.
        sub.start_in_thread(1)
    except Exception:
        traceback.print_exc()


def on_new_data(update):
    "Data has been updated (maybe appended) to an array or table."
    try:
        data = update.data()
        tprint(f"Data received:\n{data}")
    except Exception:
        traceback.print_exc()


sandbox = from_uri("http://127.0.0.1:8000", api_key="secret")
catalog_sub = sandbox.subscribe()
catalog_sub.child_created.add_callback(on_new_child)
catalog_sub.start()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions