Skip to content

Websocket endpoint#999

Merged
danielballan merged 121 commits intobluesky:mainfrom
vshekar:websocket-endpoint
Aug 28, 2025
Merged

Websocket endpoint#999
danielballan merged 121 commits intobluesky:mainfrom
vshekar:websocket-endpoint

Conversation

@vshekar
Copy link
Contributor

@vshekar vshekar commented Jul 3, 2025

Checklist

  • Add a Changelog entry
  • Add the ticket number which this PR closes to the comment section

@danielballan
Copy link
Member

Notes on schema:

register (CatalogNodeAdapter.put_data_source):

hset:

  • datasource (JSON)
  • metadata (JSON)
    • timestamp

upload (CatalogArrayAdapter.write):
hset:

  • payload (binary)
  • metadata (JSON)
    • timestamp
    • mimetype
    • structure-specific details (JSON)
      • for array: {"block": (0, 0)} or {"block": null}
      • for table: {} ??
      • ...

@danielballan danielballan force-pushed the websocket-endpoint branch 2 times, most recently from 171304f to ed9f470 Compare July 9, 2025 00:24
@danielballan
Copy link
Member

danielballan commented Jul 9, 2025

Start server, with optional --redis-uri argument. (We will rename this to something more general, like --cache to accommodate other implementations like NATS.)

tiled serve catalog --temp --api-key secret --redis-uri redis://localhost:6379

Connect with the Python client, and create an array.

c = from_uri('http://localhost:8000', api_key='secret')
x = c.write_array([1,2,3], key='x')

Subscribe for updates on this array.

websocat 'ws://localhost:8000/api/v1/stream/x' -H 'Authorization:secret'

Back in the Python client, alter (overwrite, in this case) the array.

x.write(np.array([4,5,7]))

Websocket gets an update!

websocat 'ws://localhost:8000/api/v1/stream/x' -H 'Authorization:secret'
{"sequence":1,"timestamp":"2025-07-08T20:19:32.361533","payload":[4,5,7]}

Above we get JSON but msgpack can be requested:

websocat 'ws://localhost:8000/api/v1/stream/x?envelope_format=msgpack' -H 'Authorization: Apikey'

A whole lot is missing—too much to bother enumerating here—but this shows the basic pieces communicating.

@vshekar
Copy link
Contributor Author

vshekar commented Jul 10, 2025

Couple of considerations to make for the /stream/stop endpoint
As it is currently implemented, the data indicating the stream is done, is in-band
This would pose 2 problems:

  1. Do we allow the producer from updating the node after the stream is closed?
  2. If we only have in-band encoding of the end-of-stream, data will be lost after redis ttl so a new client will be connected indefinitely
    Out of band encoding of this info should solve both issues. Question is where will it go?

@danielballan
Copy link
Member

Agreed. We added a column to the nodes table which I think is the right out-of-band encoded to address these questions.

That will also facilitate (in a future PR...) search queries like, "List the nodes that are streaming."

@danielballan
Copy link
Member

danielballan commented Aug 8, 2025

TO DO:

  • Tests
  • At least a start at docs
  • Rename interfaces to avoid hard-coding redis in the public API (paving the way for NATS etc.)

@danielballan danielballan added this to the v0.1.0 release milestone Aug 8, 2025
@danielballan
Copy link
Member

A heartbeat for @kivel: We have removed "redis" from public APIs. The next step toward generalization would be some polymorphic adapter ("adapter" in the general sense, not a "Tiled Adapter") that will enable a common interface to Redis-backed and NATS-backed (etc.) implementations. That might happen in a later PR, but the track is laid.

@danielballan
Copy link
Member

Skipping these tests on Windows, for now, would be acceptable IMO.

yield context


@pytest.mark.skipif(sys.platform == "win32", reason="Requires Redis service")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be done at the module level, I believe. Less work for Future Us when we add more tests or if we add Windows support.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, updated

args:
uri: "catalog.db"
```
redis:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
redis:
cache:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this config respected? I suspect it might be ignored because I don't see any changes in tiled.config that would parse it and pass it along into the settings.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed up this file. But it looks like this file is just for validating config files not something that gets loaded into settings?

@gwbischof gwbischof changed the title [WIP] Websocket endpoint Websocket endpoint Aug 28, 2025
@danielballan
Copy link
Member

The low coverage number comes in large from some misconfiguration. Merging now, with a fix for that to follow, to hit our pre-announced outage window.

@danielballan danielballan merged commit 1939851 into bluesky:main Aug 28, 2025
9 of 11 checks passed
ZohebShaikh pushed a commit that referenced this pull request Feb 21, 2026
* Added endpoint to router

* Refactored get_entry() to not use dependencies

* Formatting

* get_entry only returns an entry

* Refactored get_current_principal for websockets

* Initial websocket endpoint implementation

* Added:
- config schema def for redis client
- CLI
- redis client

* Added:
- redis client init
- Removed client config from service config schema

* chore: stripping trailing whitespace

* chore: black formatting

* Added redis_client to CatalogAdapter

* Refactored deserialization

* Connect with websockets

* Fixed get_entry() calls to match signature

* Added code to push data to redis

* Satisfy linter

* redis must be optional

* Missing API should return 401 (not 500)

* Do not decode the payload.

* Handle msgpack and JSON serialization correctly.

* Fix exit logic?

* Added stop stream endpoint

* Changed post /stream/close to a delete verb

* Closing stream should be idempotent

* Set seq_num:node_id to expire when stream is closed

* Satisfy linters

* Namespace /stream to /stream/single

* Remove vestigial parameter.

* Closing works and updates database.

* Add search on is_streaming.

* Expose DELETE /stream/close in client.

* Use py39-compat usage.

* Validate input for envelope_format

* Decode *after* streaming, and give more metadata

* Implement streaming for write_block

* Implement streaming for PATCH array

* Clean up errors on put_data_source codepath.

* No mimetype for data_source

* Handle data_source or payload

* Use out-of-band signaling.

* Separated websocket handling and redis interaction

* Default content-type for arrays

* Support streaming container, composite.

* Move data_source into metadata. Close explicitly.

* Rename ?seq_num to ?start and include sequence in metadata.

* Add client Subscription object based on caproto.

* Refine Subscription API

* Renamed encoder to envelope_formatter for consistency

* Bug fixes and variable name clarification

* Fix ws/wss conditional

* Handle weakrefs correctly

* Revoke API key after use.

* Add types and docstring to Subscription.

* Added more metadata

* Changed order of metadata

* Added patch

* Fix Patch scheme: tuple of multiple ints

* Added uri of slices to stream

* checking if i can commit

* update pyproject from rebase

* is_streaming migration

* update test_write_array_internal_direct

* Changed command line flag --redis to --cache

* add test_websockets

* trying to get the TestClient to connect to websocket

* touch up root_tree

* websocket test touch up

* use context manager for TestClient

* the test is working finally

* touch ups websocket test

* make a tiled_websocket_context fixture

* add the rest of the tests from test-redis-ws

* last test is hanging waiting for historical message

* basic websocket tests passing

* add redis for ci

* skip websocket tests on windows

* skip the whole module on win

* test Subscription, need a better way to close the thread in _receive

* test_subscription is passing

* add WebSocketWrapper

* tests working with wrapper

* add more tests from Subscription

* add socket_timeout and socket_connect_timeout cache_settings

* test the close endpoint

* I think there is a minor problem with the close endpoint

* first pass at adding locust websocket tests

* change cache_ttl to integer from float

* update the websocket header to include 'Apikey', close endpoint returns 404 if node doesn't exist or is already closed

* update Subscription header to be prefixed by Apikey

* locust streaming tests are working, but only in headless mode

* update locust readme

* Fix zarr declaration

* remove SpecialUser

* update websocket wrapper classes

* Add redis dep for pixi too

* The websockets library is a client-side dep.

* Fix mistake introduced in rebase

* Missed purge of 'composite' in rebase

* Disambiguate between single-user and anonymous

* rebase mistake

* more tests are passing

* websocket tests all passing

* remove is_streaming

* update adapter streaming logic to use redis

* add auth tests

* clear state between subcription tests

* Apply move API key to HTTP (not WS) requests.

* fixes based on dans comments

* load cache settings from config file

* fix duplicate config key name

* Parse streaming_cache from config.

* add cache_settings to in_memory

* Supply TILED_TEST_REDIS explicitly. Remove randomness.

* Address pydantic deprecation warning on dict()

* Remove TODO; this looks good

* The referenced issues has been closed, types are tightened

* Disable LDAP tests for now

#1109

* Properly detect request scopes and access_tags.

* Drop commented-out vestigial parameters.

* Properly integrate access tags, scopes with WS endpoint

---------

Co-authored-by: Dan Allan <dallan@bnl.gov>
Co-authored-by: gbischof <bisc8233@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants