Skip to content

refactor(websocket): update get latest history to use ydoc v2 [FLOW-BE-135] #1162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
c4546bb
refactor(websocket): update BroadcastGroup to use oneshot channels fo…
kasugamirai Apr 14, 2025
7bd6b36
fix(broadcast): conditionally delete Redis stream in BroadcastGroup w…
kasugamirai Apr 14, 2025
d607d3a
refactor(broadcast): remove sleep duration in cleanup_empty_group met…
kasugamirai Apr 14, 2025
68242da
feat(redis): implement read_stream_data_in_batches method for efficie…
kasugamirai Apr 14, 2025
44903d1
feat(redis): enhance read_stream_data_in_batches method with read loc…
kasugamirai Apr 14, 2025
d5852d6
feat(redis): increase batch size in read_stream_data_in_batches and e…
kasugamirai Apr 14, 2025
8055966
refactor(redis): simplify return types in read_stream_data_in_batches…
kasugamirai Apr 14, 2025
30baec4
refactor(broadcast): remove unused error handling and simplify condit…
kasugamirai Apr 14, 2025
5da4d80
refactor(websocket): comment out tracing logs for WebSocket connectio…
kasugamirai Apr 14, 2025
80645f9
refactor(broadcast): adjust sleep durations in cleanup_empty_group an…
kasugamirai Apr 14, 2025
a427d5d
refactor(broadcast): remove sleep duration in cleanup_empty_group met…
kasugamirai Apr 14, 2025
46d335d
refactor(broadcast): remove unused total_updates variable and related…
kasugamirai Apr 14, 2025
fa42490
refactor(websocket): update get latest history to use ydoc v2
kasugamirai Apr 15, 2025
971d0bf
refactor(websocket): update get latest history to use ydoc v2
kasugamirai Apr 15, 2025
3c9fe50
refactor(broadcast): comment out update trimming logic in BroadcastGr…
kasugamirai Apr 16, 2025
91af3de
feat(websocket): add ydoc v2 tests
kasugamirai Apr 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 17 additions & 30 deletions server/websocket/src/broadcast/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,26 +550,12 @@ impl BroadcastGroup {
let lock_id = format!("gcs:lock:{}", self.doc_name);
let instance_id = format!("instance-{}", rand::random::<u64>());

let lock_acquired = match self
let lock_acquired = self
.redis_store
.acquire_doc_lock(&lock_id, &instance_id)
.await
{
Ok(true) => {
debug!("Acquired lock for GCS operations on {}", self.doc_name);
Some((self.redis_store.clone(), lock_id, instance_id))
}
Ok(false) => {
warn!("Could not acquire lock for GCS operations, skipping update");
None
}
Err(e) => {
warn!("Error acquiring lock for GCS operations: {}", e);
None
}
};
.await?;

if lock_acquired.is_some() {
if lock_acquired {
let awareness = self.awareness_ref.write().await;
let awareness_doc = awareness.doc();

Expand All @@ -594,8 +580,7 @@ impl BroadcastGroup {
&update_bytes,
&self.redis_store,
);
let flush_future =
self.storage.flush_doc_direct(&self.doc_name, awareness_doc);
let flush_future = self.storage.flush_doc_v2(&self.doc_name, awareness_doc);

let (update_result, flush_result) =
tokio::join!(update_future, flush_future);
Expand All @@ -608,20 +593,22 @@ impl BroadcastGroup {
warn!("Failed to update document in storage: {}", e);
}

if let Err(e) = self
.storage
.trim_updates_logarithmic(&self.doc_name, 1)
.await
{
warn!("Failed to trim updates: {}", e);
}
// if let Err(e) = self
// .storage
// .trim_updates_logarithmic(&self.doc_name, 1)
// .await
// {
// warn!("Failed to trim updates: {}", e);
// }
}
}

if let Some((redis, lock_id, instance_id)) = lock_acquired {
if let Err(e) = redis.release_doc_lock(&lock_id, &instance_id).await {
warn!("Failed to release GCS lock: {}", e);
}
if let Err(e) = self
.redis_store
.release_doc_lock(&lock_id, &instance_id)
.await
{
warn!("Failed to release GCS lock: {}", e);
}
}
}
Expand Down
200 changes: 36 additions & 164 deletions server/websocket/src/broadcast/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use rand;
use scopeguard;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, error, warn};
use tracing::{error, warn};
use yrs::sync::Awareness;
use yrs::updates::decoder::Decode;
use yrs::{Doc, ReadTxn, StateVector, Transact, Update};
Expand Down Expand Up @@ -72,7 +72,7 @@ impl BroadcastGroupManager {
}

let mut need_initial_save = false;
let awareness: AwarenessRef = match self.store.load_doc_direct(doc_id).await {
let awareness: AwarenessRef = match self.store.load_doc_v2(doc_id).await {
Ok(direct_doc) => Arc::new(tokio::sync::RwLock::new(Awareness::new(direct_doc))),
Err(_) => {
let doc = Doc::new();
Expand Down Expand Up @@ -260,180 +260,52 @@ impl BroadcastPool {

pub async fn flush_to_gcs(&self, doc_id: &str) -> Result<()> {
let broadcast_group = match self.manager.doc_to_id_map.get(doc_id) {
Some(group) => Some(group.clone()),
Some(group) => group.clone(),
None => {
return Ok(());
}
};

if let Some(group) = broadcast_group {
let store = self.get_store();
let doc_name = group.get_doc_name();
let lock_id = format!("gcs:lock:{}", doc_id);
let instance_id = format!("sync-{}", rand::random::<u64>());

let active_connections = match self
.manager
.redis_store
.get_active_instances(&doc_name, 60)
.await
{
Ok(count) => count,
Err(e) => {
warn!("Failed to get active instances for '{}': {}", doc_id, e);
0
}
};

if active_connections > 0 {
let temp_doc = Doc::new();
let mut temp_txn = temp_doc.transact_mut();

if let Err(e) = store.load_doc(&doc_name, &mut temp_txn).await {
warn!("Failed to load current GCS state for '{}': {}", doc_id, e);
}

let gcs_state = temp_txn.state_vector();
drop(temp_txn);

let mut start_id = "0".to_string();
let batch_size = 3000;

let mut lock_value: Option<String> = None;

let awareness = group.awareness().write().await;
let mut txn = awareness.doc().transact_mut();

loop {
match self
.manager
.redis_store
.read_stream_data_in_batches(
&doc_name,
batch_size,
&start_id,
start_id == "0",
false,
&mut lock_value,
)
.await
{
Ok((updates, last_id)) => {
if updates.is_empty() {
if start_id != "0" {
if let Err(e) = self
.manager
.redis_store
.read_stream_data_in_batches(
&doc_name,
1,
&last_id,
false,
true,
&mut lock_value,
)
.await
{
warn!("Failed to release lock in final batch: {}", e);
}
}
break;
}

for update_data in &updates {
match Update::decode_v1(update_data) {
Ok(update) => {
if let Err(e) = txn.apply_update(update) {
warn!("Failed to apply Redis update: {}", e);
}
}
Err(e) => {
warn!("Failed to decode Redis update: {}", e);
}
}
}

if last_id == start_id {
if let Err(e) = self
.manager
.redis_store
.read_stream_data_in_batches(
&doc_name,
1,
&last_id,
false,
true,
&mut lock_value,
)
.await
{
warn!("Failed to release lock in final batch: {}", e);
}
break;
}

start_id = last_id;
}
Err(e) => {
warn!(
"Failed to read updates from Redis stream for document '{}': {}",
doc_id, e
);
break;
}
}
}

drop(txn);
drop(awareness);
let lock_acquired = self
.manager
.redis_store
.acquire_doc_lock(&lock_id, &instance_id)
.await?;

let lock_id = format!("gcs:lock:{}", doc_name);
let instance_id = format!("sync-{}", rand::random::<u64>());
if lock_acquired {
let redis_store = self.manager.redis_store.clone();
let awareness = broadcast_group.awareness().read().await;
let awareness_doc = awareness.doc();

let lock_acquired = match self
.manager
.redis_store
.acquire_doc_lock(&lock_id, &instance_id)
.await
{
Ok(true) => {
debug!("Acquired lock for GCS flush operation on {}", doc_name);
Some((self.manager.redis_store.clone(), lock_id, instance_id))
}
Ok(false) => {
warn!("Could not acquire lock for GCS flush operation");
None
}
Err(e) => {
warn!("Error acquiring lock for GCS flush operation: {}", e);
None
}
};
let gcs_doc = Doc::new();
let mut gcs_txn = gcs_doc.transact_mut();

if lock_acquired.is_some() {
let awareness = group.awareness().read().await;
let awareness_doc = awareness.doc();
let awareness_txn = awareness_doc.transact();
let redis_store_clone = Arc::clone(&self.manager.redis_store);
if let Err(e) = self.manager.store.load_doc(doc_id, &mut gcs_txn).await {
warn!("Failed to load current state from GCS: {}", e);
}

let update = awareness_txn.encode_diff_v1(&gcs_state);
let gcs_state = gcs_txn.state_vector();
let awareness_txn = awareness_doc.transact();
let update = awareness_txn.encode_diff_v1(&gcs_state);

if !update.is_empty() {
let update_bytes = bytes::Bytes::from(update);
if let Err(e) = store
.push_update(&doc_name, &update_bytes, &redis_store_clone)
.await
{
error!(
"Failed to flush websocket changes to GCS for '{}': {}",
doc_id, e
);
return Err(anyhow::anyhow!("Failed to flush changes to GCS: {}", e));
}
}
if !update.is_empty() {
let update_bytes = bytes::Bytes::from(update);
self.manager
.store
.push_update(doc_id, &update_bytes, &self.manager.redis_store)
.await?;

self.manager
.store
.flush_doc_v2(doc_id, awareness_doc)
.await?;
}

if let Some((redis, lock_id, instance_id)) = lock_acquired {
redis.release_doc_lock(&lock_id, &instance_id).await?;
}
}
if let Err(e) = redis_store.release_doc_lock(&lock_id, &instance_id).await {
warn!("Failed to release GCS lock: {}", e);
}
}

Expand Down
46 changes: 37 additions & 9 deletions server/websocket/src/doc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,10 @@ impl DocumentHandler {
}

let storage = state.pool.get_store();
let doc = Doc::new();

let result = async {
let mut txn = doc.transact_mut();
let load_result = storage.load_doc(&doc_id, &mut txn).await;

match load_result {
Ok(true) => {
drop(txn);
match storage.load_doc_v2(&doc_id).await {
Ok(doc) => {
let read_txn = doc.transact();
let state = read_txn.encode_diff_v1(&StateVector::default());
drop(read_txn);
Expand All @@ -60,8 +55,41 @@ impl DocumentHandler {

Ok::<_, anyhow::Error>(document)
}
Ok(false) => Err(anyhow::anyhow!("Document not found: {}", doc_id)),
Err(e) => Err(anyhow::anyhow!("Failed to load document: {}", e)),
Err(_) => {
let doc = Doc::new();
let mut txn = doc.transact_mut();
let load_result = storage.load_doc(&doc_id, &mut txn).await;

match load_result {
Ok(true) => {
drop(txn);
let read_txn = doc.transact();
let state = read_txn.encode_diff_v1(&StateVector::default());
drop(read_txn);

let metadata = storage.get_latest_update_metadata(&doc_id).await?;

let latest_clock = metadata.map(|(clock, _)| clock).unwrap_or(0);
let timestamp = if let Some((_, ts)) = metadata {
chrono::DateTime::from_timestamp(ts.unix_timestamp(), 0)
.unwrap_or(Utc::now())
} else {
Utc::now()
};

let document = Document {
id: doc_id.clone(),
version: latest_clock as u64,
timestamp,
updates: state,
};

Ok::<_, anyhow::Error>(document)
}
Ok(false) => Err(anyhow::anyhow!("Document not found: {}", doc_id)),
Err(e) => Err(anyhow::anyhow!("Failed to load document: {}", e)),
}
}
}
}
.await;
Expand Down
7 changes: 2 additions & 5 deletions server/websocket/src/storage/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,10 +454,7 @@ where
}
}

async fn load_doc_direct<K: AsRef<[u8]> + ?Sized + Sync>(
&self,
name: &K,
) -> Result<Doc, Error> {
async fn load_doc_v2<K: AsRef<[u8]> + ?Sized + Sync>(&self, name: &K) -> Result<Doc, Error> {
let doc_key = format!("direct_doc:{}", hex::encode(name.as_ref()));
let doc_key_bytes = doc_key.as_bytes();

Expand All @@ -478,7 +475,7 @@ where
}
}

async fn flush_doc_direct<K: AsRef<[u8]> + ?Sized + Sync>(
async fn flush_doc_v2<K: AsRef<[u8]> + ?Sized + Sync>(
&self,
name: &K,
doc: &Doc,
Expand Down
Loading