Skip to content

refactor(websocket): update get latest history to use ydoc v2 [FLOW-BE-135] #1162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

kasugamirai
Copy link
Member

@kasugamirai kasugamirai commented Apr 15, 2025

Overview

What I've done

What I haven't done

How I tested

Screenshot

Which point I want you to review particularly

Memo

Summary by CodeRabbit

  • Bug Fixes

    • Improved reliability and error handling when loading, saving, and synchronizing documents, reducing the likelihood of data loss or inconsistencies.
    • Enhanced cleanup logic to better manage active connections and resource cleanup.
  • Refactor

    • Updated document storage methods to use new versioned encoding and decoding for improved efficiency and consistency.
    • Streamlined internal logic for document loading, saving, and distributed locking to increase robustness and maintainability.
  • Chores

    • Improved logging for better system observability and troubleshooting.
  • Tests

    • Added comprehensive tests for document operations with mock key-value storage to ensure correct integration and data consistency.

…nt batch processing of Redis updates in BroadcastGroup and BroadcastPool
…k management and conditional stream deletion for improved concurrency handling
…nhance lock management for improved concurrency and error handling
… and related methods by removing lock value handling for improved clarity
…ions in Redis update processing for improved clarity
…n events to reduce verbosity and improve log clarity
…d BroadcastGroup methods for improved timing consistency
… logging for improved clarity in Redis update processing
@kasugamirai kasugamirai requested a review from pyshx as a code owner April 15, 2025 15:48
Copy link
Contributor

coderabbitai bot commented Apr 15, 2025

Walkthrough

This set of changes refactors the document loading, flushing, and group management logic in the WebSocket server. The updates introduce new "v2" methods for document load and flush operations, replace previous direct methods with these new versions, and enhance distributed locking around critical sections. The group creation process now includes retry and batch processing of updates from Redis streams. Cleanup and shutdown logic is simplified and made more robust, with improved error handling and logging. The trait DocOps is updated to use the new v2 method names, and fallback logic is added for document loading in case the primary v2 method fails. Additionally, a new comprehensive test module for Yrs document operations with a mock key-value store is added.

Changes

File(s) Change Summary
server/websocket/src/broadcast/group.rs Simplified shutdown logic: direct boolean lock acquisition, switched to flush_doc_v2, commented out update trimming, and streamlined lock release with improved error logging.
server/websocket/src/broadcast/pool.rs Refactored group creation with retry and batch Redis stream update processing, switched to load_doc_v2 and flush_doc_v2, added distributed locking in flush, improved error handling and logging, and added early return in cleanup for active groups.
server/websocket/src/doc/handler.rs Updated get_latest to use load_doc_v2 with fallback to legacy method, removing upfront doc creation and restructuring error handling and response construction.
server/websocket/src/storage/kv/mod.rs Renamed trait methods: load_doc_directload_doc_v2, flush_doc_directflush_doc_v2; updated implementations to use v2 encoding/decoding; no logic changes beyond renaming and updated method calls.
server/websocket/tests/ydoc.rs Added a new test module with mock key-value store implementations and async tests validating Yrs document insertion, update pushing, flushing, and loading workflows using the new v2 document storage methods.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant BroadcastPool
    participant BroadcastGroupManager
    participant Storage (DocOps)
    participant Redis

    Client->>BroadcastPool: Request group creation
    BroadcastPool->>BroadcastGroupManager: create_group(doc_id)
    BroadcastGroupManager->>Storage (DocOps): load_doc_v2(doc_id)
    alt load_doc_v2 fails
        BroadcastGroupManager->>Storage (DocOps): load_doc (fallback)
    end
    BroadcastGroupManager->>Redis: Read Redis stream in batches
    BroadcastGroupManager->>BroadcastPool: Return group (Arc)
    BroadcastPool->>Client: Respond with group

    Client->>BroadcastPool: Request flush_to_gcs(doc_id)
    BroadcastPool->>Redis: Acquire distributed lock
    alt Lock acquired
        BroadcastPool->>Storage (DocOps): load_doc_v2(doc_id)
        BroadcastPool->>Storage (DocOps): flush_doc_v2(doc_id)
        BroadcastPool->>Redis: Release distributed lock
    else Lock not acquired
        BroadcastPool->>Client: Return without flush
    end
Loading

Poem

In the warren of code, a new tunnel we find,
With v2 methods burrowed, and old ones behind.
Locks held more tightly, updates now flow,
Through Redis and storage, in batches they go.
The doc's latest state, with care is retrieved—
A carrot of progress, this bunny's achieved!
🥕✨

✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

netlify bot commented Apr 15, 2025

Deploy Preview for reearth-flow canceled.

Name Link
🔨 Latest commit 91af3de
🔍 Latest deploy log https://app.netlify.com/sites/reearth-flow/deploys/67fffff92f20f00009f8595e

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (7)
server/websocket/src/storage/kv/mod.rs (1)

457-476: Handle partial decode failures more explicitly.

Currently, if Update::decode_v2 fails, the code simply proceeds without differentiating a corrupted or incomplete doc from a truly missing doc. This can make debugging more difficult. Consider returning a specialized error for decode failures to distinguish them from "document not found" cases.

server/websocket/src/storage/redis/mod.rs (4)

8-9: Use uuid::Uuid directly for clarity.
While importing the entire uuid crate works, referencing the type (uuid::Uuid) is clearer and helps avoid confusion with other items that might be in uuid scope.

-use uuid;
+use uuid::Uuid;

418-434: Consider unifying the read-lock check to the Lua script.
You already check the read lock in Rust before executing the script and then check it again in the script. Although the double check is safe, consider removing the Rust-side check to minimize potential race conditions and keep the logic centralized in the script.


435-495: Parse Lua script return values for better debugging.
Your script returns structured information ({acquired=..., deleted=..., reason=...}), but you currently discard it. Parsing and logging the data (especially the reason) could help diagnose unexpected deletion failures or concurrency issues.


546-635: Add documentation for read_stream_data_in_batches.
This new method is quite important for streaming data in batches and managing locks. Consider adding doc comments explaining its parameters (is_first_batch, is_final_batch, lock_value) and return values, so future contributors can use it more easily.

server/websocket/src/broadcast/group.rs (2)

95-96: Error handling on broadcast channel send.
Logging an error when the channel is closed helps diagnose mysterious message drops. Consider adding context about the root cause if feasible.


630-662: Centralized shutdown logic in Drop.
Collectively closing tasks by sending shutdown signals is a solid pattern. If sending fails, aborting the tasks is a suitable fallback.

You could factor out the repeated shutdown-signal-then-task-abort pattern into a helper method or macro to reduce duplication and potential for error.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 183a27e and fa42490.

📒 Files selected for processing (6)
  • server/websocket/src/broadcast/group.rs (12 hunks)
  • server/websocket/src/broadcast/pool.rs (7 hunks)
  • server/websocket/src/broadcast/publish.rs (6 hunks)
  • server/websocket/src/doc/handler.rs (2 hunks)
  • server/websocket/src/storage/kv/mod.rs (2 hunks)
  • server/websocket/src/storage/redis/mod.rs (4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
server/websocket/src/doc/handler.rs (1)
server/api/pkg/websocket/websocket.go (1)
  • Document (5-10)
server/websocket/src/storage/redis/mod.rs (1)
server/websocket/src/broadcast/group.rs (1)
  • new (76-312)
🔇 Additional comments (23)
server/websocket/src/storage/kv/mod.rs (1)

478-491: Ensure document state consistency before flush.

While flush_doc_v2 saves the entire state as a single update, there may be edge cases if partial updates exist. Before upserting, consider verifying that all partial updates are integrated, or document that flush_doc_v2 expects a fully integrated Doc.

Do you want a script that traces calls to flush_doc_v2 and ensures any preceding partial-update paths are completed first? Let me know.

server/websocket/src/broadcast/publish.rs (7)

7-7: Good addition for graceful shutdown.

Importing oneshot is essential for single-use shutdown signals; this supports cleaner task termination.


18-18: Optional oneshot sender clarifies ownership.

Storing shutdown_tx in an Option ensures that once the sender is taken (in Drop), it cannot be reused. This guards against accidental reuse of an already triggered shutdown channel.


38-38: Graceful shutdown channel creation.

Creating a new oneshot channel for shutdown is a cleaner approach compared to multi-producer channels. This is aligned with the single-sender, single-consumer model of a oneshot channel.


41-41: Slight timer interval increase.

Changing from 50ms to 55ms might reduce overhead. Confirm that this does not negatively affect flush latency requirements.


45-47: Clean exit on shutdown signal.

Breaking the loop upon receiving the shutdown signal is a robust approach, preventing abrupt aborts and improving cleanup reliability.


114-114: Flush threshold raised from 5 to 6.

This minor increase in threshold will slightly delay flush calls, but also reduce overhead. Verify that potential data loss or concurrency concerns do not arise from this higher threshold.


125-131: Graceful Drop with fallback abort.

Sending the shutdown signal in Drop is a good approach. If it fails, aborting the task is a robust fallback. This ensures resources are released properly without leaving orphan tasks.

server/websocket/src/doc/handler.rs (2)

33-57: Primary document load now uses load_doc_v2.

Adopting load_doc_v2 first aligns with the new ydoc v2 approach. The code is straightforward, returning a fully constructed document. The fallback error path is also clearly handled.


58-92: Fallback to older load logic.

Falling back to load_doc seamlessly preserves backward compatibility. Ensure that stale documents do not cause confusion when partial updates exist in v2 but not integrated in the old format.

server/websocket/src/broadcast/pool.rs (6)

13-13: Imports look good.
The addition of tracing::{error, warn} for improved logging is beneficial. No issues noticed.


54-65: Validate the sleep-and-recheck pattern for existence checks.
Sleeping 500ms and re-checking stream existence can handle transient Redis delays. Ensure you’ve considered timeouts or user cancellations if this is on a critical path, to avoid indefinite waiting in the worst case.


75-177: Good use of load_doc_v2 followed by batch updates.
Switching to the updated load_doc_v2 method and applying Redis updates in batches prevents large single fetches and locks, improving scalability. The retry logic for lock release (final batch) is also clear.


253-253: Informative logging statement.
Logging when a group is found helps with debugging group retrieval. This addition looks useful.


269-305: Graceful locking for GCS flush.
Acquiring a Redis-based doc lock before flushing to GCS ensures other instances don't interfere with the flush process. The fallback to release the lock upon errors is a solid practice.


316-320: Early return on active connections.
Preventing cleanup when the group still has connections is correct and avoids prematurely removing an active group.

server/websocket/src/broadcast/group.rs (7)

41-48: Optional fields for task handles and shutdown senders add flexibility.
Allowing these to be Option ensures you can gracefully handle or skip tasks if they’re not initialized. No issues noticed.


83-83: Channel initialization for buffered broadcasts.
Creating a channel with buffer_capacity is a good approach to throttle messages and avoid backpressure issues.


104-130: Clean oneshot shutdown pattern for awareness updater.
Using tokio::sync::oneshot::channel and selecting on its receiver is a straightforward way to shut down the loop gracefully. This block is concise and easy to follow.


164-185: Heartbeat task with graceful shutdown.
The heartbeat interval and oneshot cancellation logic make sense. Logging errors on heartbeat failures ensures troubleshooting is easier.


193-211: Dedicated channel for Redis subscriber shutdown.
This approach cleanly terminates the subscriber loop, preventing indefinite blocking on XREAD. The error handling with sleeps is also a robust fallback.


264-290: Periodic sync task with read locks.
Sending periodic sync messages can keep the doc states fresh. The oneshot channel for shutting it down is consistent with the rest of the design.


303-311: Consistent initialization of optional tasks.
All optional tasks are neatly stored in Some(...), aligning with your new flexible shutdown design.

@kasugamirai kasugamirai changed the title refactor(websocket): update get latest history to use ydoc v2 refactor(websocket): update get latest history to use ydoc v2 [FLOW-BE-135] Apr 15, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
server/websocket/tests/ydoc.rs (3)

57-63: Consider removing or implementing commented-out methods in MockRedisStore

The MockRedisStore class contains commented-out methods for lock acquisition and release that are not currently used. Either implement these methods if they're needed for future tests or remove them to keep the code clean and avoid confusion.

-        // async fn acquire_oid_lock(&self, _timeout: u64) -> Result<String, error::Error> {
-        //     Ok("mock_lock".to_string())
-        // }
-
-        // async fn release_oid_lock(&self, _lock_value: &str) -> Result<(), error::Error> {
-        //     Ok(())
-        // }

166-167: Document unused _redis parameter

The _redis parameter is not used in the test_insert_doc and test_push_update functions. Consider adding a comment explaining why it's needed or remove it if it's unnecessary.

- async fn test_insert_doc(
-     store: &MockStore,
-     name: &[u8],
-     txn: &impl ReadTxn,
-     _redis: &MockRedisStore,
- ) -> Result<(), error::Error> {
+ async fn test_insert_doc(
+     store: &MockStore,
+     name: &[u8],
+     txn: &impl ReadTxn,
+     _redis: &MockRedisStore, // Kept for API compatibility, unused for now
+ ) -> Result<(), error::Error> {

Similarly, add a comment for the parameter in test_push_update.

Also applies to: 210-211


229-230: Simplify recursive call to avoid Box::pin

The recursive call uses Box::pin to handle the future, which is a bit unusual for this case. Consider simplifying the code to use await directly without boxing, which is more idiomatic in Rust async code.

- test_insert_doc(store, name, &Doc::new().transact(), _redis).await?;
- Box::pin(test_push_update(store, name, update, _redis)).await
+ test_insert_doc(store, name, &Doc::new().transact(), _redis).await?;
+ test_push_update(store, name, update, _redis).await
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3c9fe50 and 91af3de.

📒 Files selected for processing (1)
  • server/websocket/tests/ydoc.rs (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: ci-websocket / ci
🔇 Additional comments (4)
server/websocket/tests/ydoc.rs (4)

177-180: Add error handling for byte conversion

The code assumes that the last_oid_data byte slice is at least 4 bytes long and can be converted to a [u8; 4] array. While the condition checks if the length is at least 4, the try_into().unwrap() call will panic if the conversion fails. Consider adding proper error handling for this case.

- if last_oid_data.len() >= 4 {
-     let bytes: [u8; 4] = last_oid_data[..4].try_into().unwrap();
-     let last_oid = OID::from_be_bytes(bytes);
-     last_oid + 1
+ if last_oid_data.len() >= 4 {
+     match last_oid_data[..4].try_into() {
+         Ok(bytes) => {
+             let last_oid = OID::from_be_bytes(bytes);
+             last_oid + 1
+         },
+         Err(_) => {
+             // Log an error or handle the case where conversion fails
+             1
+         }
+     }

217-220: Improve error handling for clock value extraction

The code extracts the clock value from the last 5 bytes of the key and assumes a specific format. Consider adding more robust error handling for cases where the key format is different or the slice-to-array conversion fails.

- let len = last_key.len();
- let last_clock = &last_key[(len - 5)..(len - 1)];
- u32::from_be_bytes(last_clock.try_into().unwrap())
+ let len = last_key.len();
+ if len >= 5 {
+     let last_clock = &last_key[(len - 5)..(len - 1)];
+     match last_clock.try_into() {
+         Ok(bytes) => u32::from_be_bytes(bytes),
+         Err(_) => {
+             // Log an error or handle the case where conversion fails
+             0
+         }
+     }
+ } else {
+     // Handle the case where the key is too short
+     0
+ }

234-262: Tests look comprehensive and well-structured

The test cases are well-designed and cover the essential functionality:

  1. test_insert_and_load_doc - Tests basic document insertion and loading
  2. test_push_update_and_flush - Tests update pushing and document flushing
  3. test_direct_v2_doc_storage - Tests the new v2 document storage methods

Each test follows a clear arrange-act-assert pattern and includes appropriate assertions to verify the results. The tests focus specifically on the v2 functionality mentioned in the PR description.

Also applies to: 264-299, 301-321


312-313: Good implementation of v2 document operations

The test directly verifies the new v2 document operations (flush_doc_v2 and load_doc_v2), which aligns well with the PR objective to "update get latest history to use ydoc v2". This provides confidence that the new methods work correctly and handle document serialization/deserialization as expected.

Also applies to: 315-316

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant