-
-
Notifications
You must be signed in to change notification settings - Fork 4.7k
InstantIndexedDataWriter. #8265
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Reviewer's GuideThis PR refactors InstantIndexedDataWriter to use async-native locking, restructures the periodic indexing task for better concurrency and error isolation, enhances trait signatures for robust error handling, and cleans up public APIs and logging for clarity and performance. Sequence diagram for periodic indexing task in InstantIndexedDataWritersequenceDiagram
participant Provider as InstantIndexedDataWriter
participant CollabMap as collab_by_object
participant Consumers as consumers
participant Collab as CollabIndexedData
loop every 30 seconds
Provider->>CollabMap: read object snapshots
Provider->>Consumers: read consumer list
alt for each object
CollabMap->>Collab: upgrade Weak reference
alt Collab is alive
Collab->>Provider: get_unindexed_data(collab_type)
Provider->>Consumers: consume_collab(workspace_id, data, object_id, collab_type)
Consumers-->>Provider: Result<bool, FlowyError>
else Collab dropped
Provider->>CollabMap: mark for removal
end
end
alt stale entries exist
Provider->>CollabMap: remove stale entries
end
end
Class diagram for refactored InstantIndexedDataWriter and related traitsclassDiagram
class InstantIndexedDataWriter {
+collab_by_object: Arc<RwLock<HashMap<String, WriteObject>>>
+consumers: Arc<RwLock<Vec<Box<dyn InstantIndexedDataConsumer>>>>
+new() InstantIndexedDataWriter
+num_consumers() usize
+clear_consumers()
+register_consumer(consumer: Box<dyn InstantIndexedDataConsumer>)
+spawn_instant_indexed_provider(runtime: &Runtime) FlowyResult<()>
+support_collab_type(t: &CollabType) bool
+index_encoded_collab(workspace_id: Uuid, object_id: Uuid, data: EncodedCollab, collab_type: CollabType) FlowyResult<()>
+index_unindexed_collab(data: UnindexedCollab) FlowyResult<()>
+queue_collab_embed(collab_object: CollabObject, collab: Weak<dyn CollabIndexedData>)
}
class WriteObject {
+collab_object: CollabObject
+collab: Weak<dyn CollabIndexedData>
}
class CollabIndexedData {
<<interface>>
+get_unindexed_data(collab_type: &CollabType) FlowyResult<Option<UnindexedData>>
}
class InstantIndexedDataConsumer {
<<interface>>
+consumer_id() String
+consume_collab(workspace_id: &Uuid, data: Option<UnindexedData>, object_id: &Uuid, collab_type: CollabType) Result<bool, FlowyError>
+did_delete_collab(workspace_id: &Uuid, object_id: &Uuid) Result<(), FlowyError>
}
InstantIndexedDataWriter --> WriteObject
WriteObject --> CollabObject
WriteObject --> "collab: Weak" CollabIndexedData
InstantIndexedDataWriter --> "consumers: Vec<Box>" InstantIndexedDataConsumer
CollabIndexedData <|.. CollabRwLock
InstantIndexedDataConsumer <|.. SomeConsumerImpl
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
updated |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `frontend/rust-lib/collab-integrate/src/instant_indexed_data_provider.rs:260` </location>
<code_context>
- None => Err(FlowyError::internal().with_context("Failed to create unindexed collab")),
- Some(data) => {
- self.index_unindexed_collab(data).await?;
+ pub async fn index_unindexed_collab(&self, data: UnindexedCollab) -> FlowyResult<()> {
+ let consumers_guard = self.consumers.read().await;
+ for consumer in consumers_guard.iter() {
</code_context>
<issue_to_address>
**nitpick:** Error handling for consumer failures is inconsistent with periodic provider.
Use the same log level for consumer errors as in the periodic provider to maintain consistency.
</issue_to_address>
### Comment 2
<location> `frontend/rust-lib/collab-integrate/src/instant_indexed_data_provider.rs:69` </location>
<code_context>
- guard.push(consumer);
- }
-
- pub async fn spawn_instant_indexed_provider(&self, runtime: &Runtime) -> FlowyResult<()> {
- let weak_collab_by_object = Arc::downgrade(&self.collab_by_object);
- let consumers_weak = Arc::downgrade(&self.consumers);
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the snapshot and cleanup logic into helper functions to simplify and clarify the core spawn loop.
Consider moving the in‐loop “snapshot” and “cleanup” bits into small helpers to keep the spawn loop focused. For example, after you upgrade the Arcs you could:
```rust
// new helper at impl InstantIndexedDataWriter
async fn snapshot_entries(
collab_by_object: &RwLock<HashMap<String, WriteObject>>,
) -> Vec<(String, CollabObject, Weak<dyn CollabIndexedData>)> {
let guard = collab_by_object.read().await;
guard
.iter()
.map(|(id, wo)| (id.clone(), wo.collab_object.clone(), wo.collab.clone()))
.collect()
}
async fn remove_stale(
collab_by_object: &RwLock<HashMap<String, WriteObject>>,
to_remove: &[String],
) {
if to_remove.is_empty() {
return;
}
let mut guard = collab_by_object.write().await;
guard.retain(|k, _| !to_remove.contains(k));
}
```
Then your spawn loop reduces to:
```rust
// inside runtime.spawn(async move { … })
let object_snapshots = Self::snapshot_entries(&collab_by_object).await;
let consumers = consumers.read().await;
if consumers.is_empty() { continue; }
let mut to_remove = Vec::new();
for (id, collab_object, weak_rc) in object_snapshots {
if let Some(rc) = weak_rc.upgrade() {
if let Err(e) =
Self::process_single_collab(id.clone(), collab_object, rc, &consumers).await
{
error!("[Indexing] process failed {}: {}", id, e);
}
} else {
to_remove.push(id);
}
}
Self::remove_stale(&collab_by_object, &to_remove).await;
```
This trims down the body of your core loop by ~40 lines, isolates snapshot/cleanup logic for easier testing, and keeps all functionality intact.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
None => Err(FlowyError::internal().with_context("Failed to create unindexed collab")), | ||
Some(data) => { | ||
self.index_unindexed_collab(data).await?; | ||
pub async fn index_unindexed_collab(&self, data: UnindexedCollab) -> FlowyResult<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: Error handling for consumer failures is inconsistent with periodic provider.
Use the same log level for consumer errors as in the periodic provider to maintain consistency.
Feature Preview
PR Checklist
Summary by Sourcery
Introduce async-native locking and trait abstractions to improve the instant indexed data pipeline and refactor the periodic provider loop for clarity, robustness, and observability.
New Features:
Enhancements: