-
Notifications
You must be signed in to change notification settings - Fork 109
Consistent schema across partitions #3860
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ca7a790
to
057eaea
Compare
self.record_keys().matches_key_query(query) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is currently not used (and will never be) a new draft for envelope redesign can be checked here
Can we wait to merge this? |
@slinkydeveloper of course. Not urgent. let's discuss how we going to maintain the schema type. moving forward |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, as discussed offline, we need to verify what happens with large schemas and how we want to mitigate eventual issues in that direction.
crates/wal-protocol/src/lib.rs
Outdated
Command::InvocationResponse(response) => Keys::Single(response.partition_key()), | ||
Command::NotifySignal(sig) => Keys::Single(sig.partition_key()), | ||
Command::NotifyGetInvocationOutputResponse(res) => Keys::Single(res.partition_key()), | ||
Command::UpsertSchema(_) => Keys::Single(self.partition_key()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should apply to the entire partition range. Imagine if the partition is split, would you want only one of the sub-partitions to see this upsert or both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. I didn't take the sub-partitions scenario into consideration.
.map(|_| { | ||
let schema = Metadata::with_current(|m| m.schema().clone()); | ||
ActionEffect::UpsertSchema(schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps it's better to check the version that was returned from Metadata instead of relying on the implicit assumption that the returned metadata will be >= last observed from the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it stands, this cannot be merged unless writes are gated with a check for restate version >= 1.7
ccc5178
to
a8ded64
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are very close :)
crates/wal-protocol/src/control.rs
Outdated
pub modification_time: MillisSinceEpoch, | ||
} | ||
|
||
/// Consistency store schema across partition replicas. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Consistently
pub struct UpsertSchema { | ||
pub partition_key_range: Keys, | ||
pub schema: Schema, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A nice little optimization is to break the version from schema, and only deserialize the schema if the version is eligible for the update.
struct UpsertSchema {
pub partition_key_range: Keys,
pub version: Version,
pub schema_raw: Bytes,
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed to streamline this optimisation as part of Envelope v2
// todo: enable on version v1.7.0 | ||
// | ||
// self.self_proposer | ||
// .propose( | ||
// *self.partition_key_range.start(), | ||
// Command::UpsertSchema(UpsertSchema { | ||
// partition_key_range: Keys::RangeInclusive( | ||
// self.partition_key_range.clone(), | ||
// ), | ||
// schema, | ||
// }), | ||
// ) | ||
// .await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tip: You can potentially gate this based on RestateVersion::current()
.unwrap_or_else(Version::invalid); | ||
|
||
std::future::ready( | ||
Some(Metadata::with_current(|m| m.schema().clone())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps check the version in the closure to avoid cloning if we are not interested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch :)
Summary: This PR make sure that PPs have a consistent service schema by writing and upsert-schema record to bifrost. The leader processor makes sure to write this record to bifrost when schema changes are detected
Consistent schema across partitions
Summary:
This PR make sure that PPs have a consistent service schema by writing
and upsert-schema record to bifrost. The leader processor makes sure
to write this record to bifrost when schema changes are detected