Skip to content
Merged
Show file tree
Hide file tree
Changes from 106 commits
Commits
Show all changes
121 commits
Select commit Hold shift + click to select a range
348a2ff
Added endpoint to router
vshekar Jun 27, 2025
80a9657
Refactored get_entry() to not use dependencies
vshekar Jun 25, 2025
20b8f7f
Formatting
vshekar Jun 25, 2025
1e338c8
get_entry only returns an entry
vshekar Jun 26, 2025
5029327
Refactored get_current_principal for websockets
vshekar Jul 3, 2025
277e250
Initial websocket endpoint implementation
vshekar Jul 3, 2025
79d4ecd
Added:
vshekar Apr 24, 2025
af48af8
Added:
vshekar Apr 24, 2025
1ac96ce
chore: stripping trailing whitespace
vshekar Apr 25, 2025
ab003ef
chore: black formatting
vshekar Apr 25, 2025
48ce06a
Added redis_client to CatalogAdapter
vshekar May 28, 2025
3b9e538
Refactored deserialization
vshekar May 28, 2025
0532b8f
Connect with websockets
vshekar Jul 3, 2025
1da75b8
Fixed get_entry() calls to match signature
vshekar Jul 8, 2025
d1d6291
Added code to push data to redis
vshekar Jul 8, 2025
3533338
Satisfy linter
danielballan Jul 8, 2025
34ee0fe
redis must be optional
danielballan Jul 8, 2025
10b0e51
Missing API should return 401 (not 500)
danielballan Jul 8, 2025
363c56a
Do not decode the payload.
danielballan Jul 8, 2025
249575c
Handle msgpack and JSON serialization correctly.
danielballan Jul 8, 2025
ef73d29
Fix exit logic?
danielballan Jul 9, 2025
3329b72
Added stop stream endpoint
vshekar Jul 10, 2025
9fca2ba
Changed post /stream/close to a delete verb
vshekar Jul 14, 2025
e482874
Closing stream should be idempotent
vshekar Jul 14, 2025
4c6b520
Set seq_num:node_id to expire when stream is closed
vshekar Jul 14, 2025
54aa563
Satisfy linters
danielballan Jul 14, 2025
4f90693
Namespace /stream to /stream/single
danielballan Jul 14, 2025
993b164
Remove vestigial parameter.
danielballan Jul 14, 2025
a6f57cb
Closing works and updates database.
danielballan Jul 14, 2025
892dd4f
Add search on is_streaming.
danielballan Jul 15, 2025
c7e1070
Expose DELETE /stream/close in client.
danielballan Jul 15, 2025
eccfa26
Use py39-compat usage.
danielballan Jul 15, 2025
a42c193
Validate input for envelope_format
danielballan Jul 15, 2025
6463e28
Decode *after* streaming, and give more metadata
danielballan Jul 15, 2025
e16b2aa
Implement streaming for write_block
danielballan Jul 15, 2025
fb22746
Implement streaming for PATCH array
danielballan Jul 15, 2025
0b37bd8
Clean up errors on put_data_source codepath.
danielballan Jul 15, 2025
bfadc79
No mimetype for data_source
danielballan Jul 15, 2025
2a56857
Handle data_source or payload
danielballan Jul 15, 2025
03d7a7b
Use out-of-band signaling.
danielballan Jul 15, 2025
8642f2d
Separated websocket handling and redis interaction
vshekar Jul 15, 2025
ca9e6c8
Default content-type for arrays
vshekar Jul 15, 2025
654c505
Support streaming container, composite.
danielballan Jul 15, 2025
5f000c2
Move data_source into metadata. Close explicitly.
danielballan Jul 15, 2025
71ca5e9
Rename ?seq_num to ?start and include sequence in metadata.
danielballan Jul 15, 2025
f5da0b5
Add client Subscription object based on caproto.
danielballan Jul 16, 2025
90e4ef0
Refine Subscription API
danielballan Jul 16, 2025
6e98224
Renamed encoder to envelope_formatter for consistency
vshekar Jul 16, 2025
68e4dc5
Bug fixes and variable name clarification
vshekar Jul 16, 2025
5f8bd8c
Fix ws/wss conditional
danielballan Jul 17, 2025
e42aa82
Handle weakrefs correctly
danielballan Jul 17, 2025
8fb6388
Revoke API key after use.
danielballan Jul 17, 2025
7721698
Add types and docstring to Subscription.
danielballan Jul 18, 2025
a44ec56
Added more metadata
vshekar Jul 18, 2025
e43a1e8
Changed order of metadata
vshekar Jul 18, 2025
137f5af
Added patch
vshekar Jul 18, 2025
edf95ac
Fix Patch scheme: tuple of multiple ints
danielballan Jul 21, 2025
a467271
Added uri of slices to stream
vshekar Jul 22, 2025
55c07ee
checking if i can commit
gwbischof Aug 13, 2025
8e33736
update pyproject from rebase
gwbischof Aug 13, 2025
de71846
is_streaming migration
gwbischof Aug 19, 2025
c9efa22
update test_write_array_internal_direct
gwbischof Aug 19, 2025
f86d0c8
Changed command line flag --redis to --cache
vshekar Aug 19, 2025
fbdde6e
add test_websockets
gwbischof Aug 19, 2025
7d52e82
trying to get the TestClient to connect to websocket
gwbischof Aug 19, 2025
6a9add5
touch up root_tree
gwbischof Aug 20, 2025
12ed190
websocket test touch up
gwbischof Aug 20, 2025
fde57e0
use context manager for TestClient
gwbischof Aug 20, 2025
ec7fbf0
the test is working finally
gwbischof Aug 20, 2025
5dfb3d9
touch ups websocket test
gwbischof Aug 20, 2025
9a12928
make a tiled_websocket_context fixture
gwbischof Aug 20, 2025
2a3d944
add the rest of the tests from test-redis-ws
gwbischof Aug 20, 2025
27dabe0
last test is hanging waiting for historical message
gwbischof Aug 20, 2025
a765cc7
basic websocket tests passing
gwbischof Aug 20, 2025
9852b0a
add redis for ci
gwbischof Aug 20, 2025
ab94664
skip websocket tests on windows
gwbischof Aug 21, 2025
d830699
skip the whole module on win
gwbischof Aug 21, 2025
8b0c9ed
test Subscription, need a better way to close the thread in _receive
gwbischof Aug 21, 2025
2fb5e52
test_subscription is passing
gwbischof Aug 21, 2025
d77ad91
add WebSocketWrapper
gwbischof Aug 21, 2025
de83982
tests working with wrapper
gwbischof Aug 21, 2025
3dee417
add more tests from Subscription
gwbischof Aug 21, 2025
5aa2374
add socket_timeout and socket_connect_timeout cache_settings
gwbischof Aug 22, 2025
9d3243d
test the close endpoint
gwbischof Aug 22, 2025
c578694
I think there is a minor problem with the close endpoint
gwbischof Aug 22, 2025
5f006a5
first pass at adding locust websocket tests
gwbischof Aug 22, 2025
4104279
change cache_ttl to integer from float
gwbischof Aug 22, 2025
36e5c81
update the websocket header to include 'Apikey', close endpoint retur…
gwbischof Aug 25, 2025
2a455b0
update Subscription header to be prefixed by Apikey
gwbischof Aug 25, 2025
2d41140
locust streaming tests are working, but only in headless mode
gwbischof Aug 27, 2025
0c231fb
update locust readme
gwbischof Aug 27, 2025
db83420
Fix zarr declaration
danielballan Aug 27, 2025
7d2e083
remove SpecialUser
gwbischof Aug 27, 2025
a2450da
update websocket wrapper classes
gwbischof Aug 27, 2025
505a29b
Add redis dep for pixi too
danielballan Aug 27, 2025
24ee7e5
The websockets library is a client-side dep.
danielballan Aug 27, 2025
2925521
Fix mistake introduced in rebase
danielballan Aug 27, 2025
11a704e
Missed purge of 'composite' in rebase
danielballan Aug 27, 2025
3c4580a
Disambiguate between single-user and anonymous
danielballan Aug 27, 2025
28e4e7d
rebase mistake
danielballan Aug 27, 2025
5490f94
more tests are passing
gwbischof Aug 27, 2025
ce04989
websocket tests all passing
gwbischof Aug 27, 2025
63d7a6c
remove is_streaming
gwbischof Aug 27, 2025
1578b82
update adapter streaming logic to use redis
gwbischof Aug 27, 2025
a9a42f5
add auth tests
gwbischof Aug 27, 2025
855ddc6
clear state between subcription tests
gwbischof Aug 27, 2025
0cf94f1
Apply move API key to HTTP (not WS) requests.
danielballan Aug 28, 2025
a4abf4d
fixes based on dans comments
gwbischof Aug 28, 2025
7ded59b
Merge branch 'main' into websocket-endpoint
danielballan Aug 28, 2025
0ad3fbf
load cache settings from config file
gwbischof Aug 28, 2025
27059b8
fix duplicate config key name
gwbischof Aug 28, 2025
e4b4e31
Parse streaming_cache from config.
danielballan Aug 28, 2025
0fde970
add cache_settings to in_memory
gwbischof Aug 28, 2025
4fd3fac
Supply TILED_TEST_REDIS explicitly. Remove randomness.
danielballan Aug 28, 2025
39d3769
Address pydantic deprecation warning on dict()
danielballan Aug 28, 2025
9b38d30
Remove TODO; this looks good
danielballan Aug 28, 2025
328d552
The referenced issues has been closed, types are tightened
danielballan Aug 28, 2025
382d2c4
Disable LDAP tests for now
danielballan Aug 28, 2025
874aeb8
Properly detect request scopes and access_tags.
danielballan Aug 28, 2025
d99f55c
Drop commented-out vestigial parameters.
danielballan Aug 28, 2025
c393106
Properly integrate access tags, scopes with WS endpoint
danielballan Aug 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ jobs:
shell: bash -l {0}
run: source continuous_integration/scripts/start_postgres.sh

- name: Start Redis service in container.
shell: bash -l {0}
run: source continuous_integration/scripts/start_redis.sh


- name: Ensure example data is migrated to current catalog database schema.
# The example data is expected to be kept up to date to the latest Tiled
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,6 @@ pixi.lock
# uv environments
uv.lock
.venv
# pixi environments
.pixi/*
!.pixi/config.toml
5 changes: 5 additions & 0 deletions continuous_integration/scripts/start_redis.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash
set -e

docker run -d --rm --name tiled-test-redis -p 6379:6379 docker.io/redis:7-alpine
docker ps
96 changes: 84 additions & 12 deletions locust/README.md
Original file line number Diff line number Diff line change
@@ -1,35 +1,107 @@
# Tiled Load Testing with Locust

Simple load testing for Tiled using the `reader.py` file.
Load testing for Tiled using Locust. Two test files are available:
- `reader.py` - Tests HTTP read operations and search endpoints
- `streaming.py` - Tests streaming data writes and WebSocket delivery latency

## Quick Start

```bash
# Install dependencies (dev environment includes locust)
pixi install -e dev
# Install dependencies (locust should already be available in the environment)
# If not installed, add it to your requirements or install with:
# uv add locust
```

## Starting Test Server

Before running locust tests, start a Tiled server:

```bash
# Basic server (works for most tests)
uv run tiled serve catalog \
--host 0.0.0.0 \
--port 8000 \
--api-key secret \
--temp \
--init
```

For streaming tests with Redis cache (optional):
```bash
# Start Redis first
redis-server

# Start Tiled server with Redis cache
uv run tiled serve catalog \
--host 0.0.0.0 \
--port 8000 \
--api-key secret \
--cache-uri "redis://localhost:6379" \
--cache-ttl 60 \
--temp \
--init
```

This creates a temporary catalog with:
- API key authentication (key: "secret")
- Temporary writable storage (automatically cleaned up)
- Optional Redis cache for enhanced streaming performance
- Server running on http://localhost:8000

## Reading Performance Tests (`reader.py`)

Tests various HTTP endpoints for reading data, metadata, and search operations.

### Examples
Run with default localhost server (uses default API key 'secret'):
```bash
pixi run -e dev locust -f reader.py --host http://localhost:8000
uv run locust -f reader.py --headless -u 100 -r 10 -t 60s --host http://localhost:8000
```

Run with custom API key:
```bash
pixi run -e dev locust -f reader.py --host http://localhost:8000 --api-key your-api-key
uv run locust -f reader.py --headless -u 100 -r 10 -t 60s --host http://localhost:8000 --api-key your-api-key
```

Run with custom container name (defaults to locust_testing):
```bash
pixi run -e dev locust -f reader.py --host http://localhost:8000 --container-name my_test_container
uv run locust -f reader.py --headless -u 100 -r 10 -t 60s --host http://localhost:8000 --container-name my_test_container
```

## Streaming Performance Tests (`streaming.py`)

Tests streaming data writes and WebSocket delivery with end-to-end latency measurement.

**Note:** The `--node-name` parameter is required for streaming tests to avoid conflicts when multiple test runs create nodes with the same name.

### Examples
Run with required node name:
```bash
uv run locust -f streaming.py --headless -u 10 -r 2 -t 120s --host http://localhost:8000 --node-name my_test_stream
```

## Headless Mode
Run without the web interface:
Run with custom API key:
```bash
pixi run -e dev locust -f reader.py --headless -u 100 -r 10 -t 60s
uv run locust -f streaming.py --headless -u 10 -r 2 -t 120s --host http://localhost:8000 --api-key your-api-key --node-name my_test_stream
```
- `-u 100`: 100 concurrent users
- `-r 10`: Spawn 10 users per second
- `-t 60s`: Run for 60 seconds

Control user types with environment variables:
```bash
# 2 writers for every 1 streaming reader
WRITER_WEIGHT=2 STREAMING_WEIGHT=1 uv run locust -f streaming.py --headless -u 10 -r 2 -t 120s --host http://localhost:8000 --node-name my_test_stream
```

### Streaming Test Components
- **WriterUser**: Writes timestamped array data to streaming nodes
- **StreamingUser**: Connects via WebSocket to measure write-to-delivery latency

## Parameters
- `-u N`: N concurrent users
- `-r N`: Spawn N users per second
- `-t Ns`: Run for N seconds
- `--headless`: Run without web interface (required for automation)

## Notes
- All examples use `--headless` mode for reliable automation
- For streaming tests, `--node-name` is required to avoid conflicts
- Use environment variables `WRITER_WEIGHT` and `STREAMING_WEIGHT` to control user distribution
238 changes: 238 additions & 0 deletions locust/streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import json
import logging
import os
import threading
import time
from urllib.parse import urlparse

import msgpack
import numpy as np
import websocket

from locust import HttpUser, between, events, task

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@events.init_command_line_parser.add_listener
def _(parser):
parser.add_argument(
"--api-key",
type=str,
default="secret",
help="API key for Tiled authentication (default: secret)",
)
parser.add_argument(
"--node-name",
type=str,
required=True,
help="Node name for streaming test (required)",
)


@events.init.add_listener
def on_locust_init(environment, **kwargs):
if environment.host is None:
raise ValueError(
"Host must be specified with --host argument, or through the web-ui."
)

# Create the streaming node once for all users
create_streaming_node(
environment.host,
environment.parsed_options.api_key,
environment.parsed_options.node_name,
)


def create_streaming_node(host, api_key, node_name):
"""Create a streaming array node using Tiled client"""
from tiled.client import from_uri

# Connect to Tiled server using client
client = from_uri(host, api_key=api_key)

# Create initial streaming array
arr = np.full(5, 0.0, dtype=np.float64) # Initial array with zeros
client.write_array(arr, key=node_name)

logger.info(f"Created streaming node: {node_name}")
client.logout()


class WriterUser(HttpUser):
"""User that writes streaming data to a Tiled node"""

wait_time = between(0.1, 0.2) # Wait 0.1-0.2 seconds between writes
weight = int(os.getenv("WRITER_WEIGHT", 1))

def on_start(self):
"""Initialize user state"""
self.node_name = self.environment.parsed_options.node_name
self.message_count = 0
self.api_key = self.environment.parsed_options.api_key

# Set authentication header
self.client.headers.update({"Authorization": f"Apikey {self.api_key}"})

@task(10) # Run 10x as often as cleanup
def write_data(self):
"""Write streaming data to the node"""
# Create data with current timestamp as all values
current_time = time.time()
data = np.full(5, current_time, dtype=np.float64)
binary_data = data.tobytes()

# Post data to the streaming endpoint
response = self.client.put(
f"/api/v1/array/full/{self.node_name}",
data=binary_data,
headers={"Content-Type": "application/octet-stream"},
)

# Log status
if response.status_code == 200:
logger.debug(f"Wrote message {self.message_count} to node {self.node_name}")
self.message_count += 1
else:
logger.error(
f"Failed to write message {self.message_count}: {response.status_code} - {response.text}"
)

@task(1)
def cleanup(self):
"""Periodically cleanup the stream"""
if self.message_count > 50:
# Close the stream
response = self.client.delete(f"/api/v1/stream/close/{self.node_name}")
if response.status_code == 200:
logger.info(f"Closed stream for node {self.node_name}")

# Reset message count (node persists for other users)
self.message_count = 0


class StreamingUser(HttpUser):
"""User that connects to websocket stream and measures latency"""

wait_time = between(1, 2)
weight = int(os.getenv("STREAMING_WEIGHT", 1))

def on_start(self):
"""Connect to the streaming endpoint"""
self.node_name = self.environment.parsed_options.node_name
self.api_key = self.environment.parsed_options.api_key
self.envelope_format = "msgpack" # Use msgpack for efficiency
self.ws = None
self.connected = False

# Set up authentication for HTTP requests
self.client.headers.update({"Authorization": f"Apikey {self.api_key}"})

self._connect_websocket()

def _connect_websocket(self):
"""Connect to the websocket stream"""
try:
# Parse host to get websocket URL
parsed = urlparse(self.host)
ws_scheme = "wss" if parsed.scheme == "https" else "ws"
host = f"{ws_scheme}://{parsed.netloc}"

ws_url = f"{host}/api/v1/stream/single/{self.node_name}?envelope_format={self.envelope_format}&start=0"

# Create websocket connection
self.ws = websocket.WebSocketApp(
ws_url,
header=[
f"Authorization: Apikey {self.api_key}"
], # Proper Apikey format for websockets
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
)

# Start websocket in background thread
self.ws_thread = threading.Thread(target=self.ws.run_forever)
self.ws_thread.daemon = True
self.ws_thread.start()

# Wait a bit for connection
time.sleep(0.5)

except Exception as e:
logger.error(f"Failed to connect websocket: {e}")

def _on_open(self, ws):
"""Websocket connection opened"""
self.connected = True
logger.info(f"WebSocket connected to {self.node_name}")

def _on_message(self, ws, message):
"""Process websocket messages and measure latency"""
try:
received_time = time.time()

if isinstance(message, bytes):
data = msgpack.unpackb(message)
else:
data = json.loads(message)

# Extract timestamp from the payload (first element of the array)
payload = data.get("payload")
if payload and len(payload) > 0:
# Convert bytes back to numpy array to get the timestamp
payload_array = np.frombuffer(payload, dtype=np.float64)
if len(payload_array) > 0:
write_time = payload_array[0]
latency_ms = (received_time - write_time) * 1000

logger.debug(
f"WS latency (sequence {data.get('sequence', 'N/A')}): {latency_ms:.1f}ms"
)

# Report to Locust
events.request.fire(
request_type="WS",
name="write_to_websocket_delivery",
response_time=latency_ms,
response_length=len(message),
exception=None,
)

except Exception as e:
logger.error(f"Error processing message: {e}")
events.request.fire(
request_type="WS",
name="write_to_websocket_delivery",
response_time=0,
response_length=0,
exception=e,
)

def _on_error(self, ws, error):
"""Websocket error occurred"""
logger.error(f"WebSocket error: {error}")
self.connected = False

def _on_close(self, ws, close_status_code, close_msg):
"""Websocket connection closed"""
logger.info(f"WebSocket closed: {close_status_code} - {close_msg}")
self.connected = False

@task
def keep_alive(self):
"""Dummy task to keep the user active while listening for messages"""
if not self.connected and self.ws:
# Try to reconnect if disconnected
logger.info("Attempting to reconnect WebSocket...")
self._connect_websocket()

def on_stop(self):
"""Clean up websocket connection"""
if self.ws:
self.ws.close()
logger.info("WebSocket connection closed")
2 changes: 2 additions & 0 deletions pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ entrypoints = "*"
rich = "*"
stamina = "*"
watchfiles = "*"
websockets = "*"

[feature.compression.dependencies]
# python-blosc2 = "*" # not available on Python < 3.11
Expand Down Expand Up @@ -98,6 +99,7 @@ prometheus_client = "*"
python-dateutil = "*"
python-jose = "*"
python-multipart = "*"
redis-py = "*"
sqlalchemy = ">=2"
starlette = ">=0.38.0"
uvicorn = "*"
Expand Down
Loading
Loading