Skip to content

Commit 28d0321

Browse files
committed
More serde exchangeable part
It's currently possible, but a bit odd, to have an exchangeable part that doesn't have a lease attached. This switches us to using the "exchangeable" part directly, instead of inventing and immediately dropping the lease.
1 parent 2dfaa2e commit 28d0321

File tree

7 files changed

+74
-185
lines changed

7 files changed

+74
-185
lines changed

src/persist-client/src/batch.proto

-34
Original file line numberDiff line numberDiff line change
@@ -25,37 +25,3 @@ message ProtoBatch {
2525

2626
reserved 4;
2727
}
28-
29-
// This is only to impl ExchangeData, and so used between processes running the
30-
// same version of code. It's not durably written down anywhere.
31-
message ProtoLeasedBatchPart {
32-
string shard_id = 1;
33-
ProtoFetchBatchFilter filter = 2;
34-
mz_persist_client.internal.state.ProtoU64Description desc = 3;
35-
mz_persist_client.internal.state.ProtoHollowBatchPart part = 4;
36-
ProtoLease lease = 5;
37-
bool filter_pushdown_audit = 6;
38-
}
39-
40-
message ProtoFetchBatchFilter {
41-
oneof kind {
42-
// Apply snapshot-style semantics to the fetched batch part.
43-
//
44-
// Return all values with time leq `as_of`.
45-
mz_persist_client.internal.state.ProtoU64Antichain snapshot = 1;
46-
// Apply listen-style semantics to the fetched batch part.
47-
ProtoFetchBatchFilterListen listen = 2;
48-
}
49-
}
50-
51-
message ProtoFetchBatchFilterListen {
52-
// Return all values with time in advance of `as_of`.
53-
mz_persist_client.internal.state.ProtoU64Antichain as_of = 1;
54-
// Return all values with `lower` leq time.
55-
mz_persist_client.internal.state.ProtoU64Antichain lower = 2;
56-
}
57-
58-
message ProtoLease {
59-
string reader_id = 1;
60-
optional uint64 seqno = 2;
61-
}

src/persist-client/src/fetch.rs

+53-137
Original file line numberDiff line numberDiff line change
@@ -36,25 +36,23 @@ use mz_persist_types::part::Codec64Mut;
3636
use mz_persist_types::schema::backward_compatible;
3737
use mz_persist_types::stats::PartStats;
3838
use mz_persist_types::{Codec, Codec64};
39-
use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
39+
use mz_proto::RustType;
4040
use serde::{Deserialize, Serialize};
4141
use timely::PartialOrder;
4242
use timely::progress::frontier::AntichainRef;
4343
use timely::progress::{Antichain, Timestamp};
4444
use tracing::{Instrument, debug, debug_span, trace_span};
4545

4646
use crate::ShardId;
47-
use crate::batch::{
48-
ProtoFetchBatchFilter, ProtoFetchBatchFilterListen, ProtoLease, ProtoLeasedBatchPart,
49-
proto_fetch_batch_filter,
50-
};
5147
use crate::cfg::PersistConfig;
5248
use crate::error::InvalidUsage;
5349
use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
5450
use crate::internal::machine::retry_external;
5551
use crate::internal::metrics::{Metrics, MetricsPermits, ReadMetrics, ShardMetrics};
5652
use crate::internal::paths::BlobKey;
57-
use crate::internal::state::{BatchPart, HollowBatchPart, ProtoInlineBatchPart};
53+
use crate::internal::state::{
54+
BatchPart, HollowBatchPart, ProtoHollowBatchPart, ProtoInlineBatchPart,
55+
};
5856
use crate::read::LeasedReaderId;
5957
use crate::schema::{PartMigration, SchemaCache};
6058

@@ -141,42 +139,42 @@ where
141139
T: Timestamp + Lattice + Codec64 + Sync,
142140
D: Semigroup + Codec64 + Send + Sync,
143141
{
144-
/// Takes a [`SerdeLeasedBatchPart`] into a [`LeasedBatchPart`].
145-
pub fn leased_part_from_exchangeable(&self, x: SerdeLeasedBatchPart) -> LeasedBatchPart<T> {
146-
x.decode(Arc::clone(&self.metrics))
147-
}
148-
149142
/// Trade in an exchange-able [LeasedBatchPart] for the data it represents.
150143
///
151144
/// Note to check the `LeasedBatchPart` documentation for how to handle the
152145
/// returned value.
153146
pub async fn fetch_leased_part(
154147
&mut self,
155-
part: &LeasedBatchPart<T>,
148+
part: ExchangeableBatchPart<T>,
156149
) -> Result<Result<FetchedBlob<K, V, T, D>, BlobKey>, InvalidUsage<T>> {
157-
if &part.shard_id != &self.shard_id {
158-
let batch_shard = part.shard_id.clone();
150+
let ExchangeableBatchPart {
151+
shard_id,
152+
encoded_size_bytes: _,
153+
desc,
154+
filter,
155+
filter_pushdown_audit,
156+
part,
157+
} = part;
158+
let part: BatchPart<T> = part.decode_to().expect("valid part");
159+
if shard_id != self.shard_id {
159160
return Err(InvalidUsage::BatchNotFromThisShard {
160-
batch_shard,
161+
batch_shard: shard_id,
161162
handle_shard: self.shard_id.clone(),
162163
});
163164
}
164165

165-
let migration = PartMigration::new(
166-
&part.part,
167-
self.read_schemas.clone(),
168-
&mut self.schema_cache,
169-
)
170-
.await
171-
.unwrap_or_else(|read_schemas| {
172-
panic!(
173-
"could not decode part {:?} with schema: {:?}",
174-
part.part.schema_id(),
175-
read_schemas
176-
)
177-
});
178-
179-
let (buf, fetch_permit) = match &part.part {
166+
let migration =
167+
PartMigration::new(&part, self.read_schemas.clone(), &mut self.schema_cache)
168+
.await
169+
.unwrap_or_else(|read_schemas| {
170+
panic!(
171+
"could not decode part {:?} with schema: {:?}",
172+
part.schema_id(),
173+
read_schemas
174+
)
175+
});
176+
177+
let (buf, fetch_permit) = match &part {
180178
BatchPart::Hollow(x) => {
181179
let fetch_permit = self
182180
.metrics
@@ -189,7 +187,7 @@ where
189187
&self.metrics.read.batch_fetcher
190188
};
191189
let buf = fetch_batch_part_blob(
192-
&part.shard_id,
190+
&shard_id,
193191
self.blob.as_ref(),
194192
&self.metrics,
195193
&self.shard_metrics,
@@ -213,7 +211,7 @@ where
213211
..
214212
} => {
215213
let buf = FetchedBlobBuf::Inline {
216-
desc: part.desc.clone(),
214+
desc: desc.clone(),
217215
updates: updates.clone(),
218216
ts_rewrite: ts_rewrite.clone(),
219217
};
@@ -224,10 +222,10 @@ where
224222
metrics: Arc::clone(&self.metrics),
225223
read_metrics: self.metrics.read.batch_fetcher.clone(),
226224
buf,
227-
registered_desc: part.desc.clone(),
225+
registered_desc: desc.clone(),
228226
migration,
229-
filter: part.filter.clone(),
230-
filter_pushdown_audit: part.filter_pushdown_audit,
227+
filter: filter.clone(),
228+
filter_pushdown_audit,
231229
structured_part_audit: self.cfg.part_decode_format(),
232230
fetch_permit,
233231
_phantom: PhantomData,
@@ -236,7 +234,7 @@ where
236234
}
237235
}
238236

239-
#[derive(Debug, Clone)]
237+
#[derive(Debug, Clone, Serialize, Deserialize)]
240238
pub(crate) enum FetchBatchFilter<T> {
241239
Snapshot {
242240
as_of: Antichain<T>,
@@ -287,42 +285,6 @@ impl<T: Timestamp + Lattice> FetchBatchFilter<T> {
287285
}
288286
}
289287

290-
impl<T: Timestamp + Codec64> RustType<ProtoFetchBatchFilter> for FetchBatchFilter<T> {
291-
fn into_proto(&self) -> ProtoFetchBatchFilter {
292-
let kind = match self {
293-
FetchBatchFilter::Snapshot { as_of } => {
294-
proto_fetch_batch_filter::Kind::Snapshot(as_of.into_proto())
295-
}
296-
FetchBatchFilter::Listen { as_of, lower } => {
297-
proto_fetch_batch_filter::Kind::Listen(ProtoFetchBatchFilterListen {
298-
as_of: Some(as_of.into_proto()),
299-
lower: Some(lower.into_proto()),
300-
})
301-
}
302-
FetchBatchFilter::Compaction { .. } => unreachable!("not serialized"),
303-
};
304-
ProtoFetchBatchFilter { kind: Some(kind) }
305-
}
306-
307-
fn from_proto(proto: ProtoFetchBatchFilter) -> Result<Self, TryFromProtoError> {
308-
let kind = proto
309-
.kind
310-
.ok_or_else(|| TryFromProtoError::missing_field("ProtoFetchBatchFilter::kind"))?;
311-
match kind {
312-
proto_fetch_batch_filter::Kind::Snapshot(as_of) => Ok(FetchBatchFilter::Snapshot {
313-
as_of: as_of.into_rust()?,
314-
}),
315-
proto_fetch_batch_filter::Kind::Listen(ProtoFetchBatchFilterListen {
316-
as_of,
317-
lower,
318-
}) => Ok(FetchBatchFilter::Listen {
319-
as_of: as_of.into_rust_if_some("ProtoFetchBatchFilterListen::as_of")?,
320-
lower: lower.into_rust_if_some("ProtoFetchBatchFilterListen::lower")?,
321-
}),
322-
}
323-
}
324-
}
325-
326288
/// Trade in an exchange-able [LeasedBatchPart] for the data it represents.
327289
///
328290
/// Note to check the `LeasedBatchPart` documentation for how to handle the
@@ -485,8 +447,8 @@ impl Lease {
485447
///
486448
/// You can exchange `LeasedBatchPart`:
487449
/// - If `leased_seqno.is_none()`
488-
/// - By converting it to [`SerdeLeasedBatchPart`] through
489-
/// `Self::into_exchangeable_part`. [`SerdeLeasedBatchPart`] is exchangeable,
450+
/// - By converting it to [`ExchangeableBatchPart`] through
451+
/// `Self::into_exchangeable_part`. [`ExchangeableBatchPart`] is exchangeable,
490452
/// including over the network.
491453
///
492454
/// n.b. `Self::into_exchangeable_part` is known to be equivalent to
@@ -505,7 +467,6 @@ impl Lease {
505467
pub struct LeasedBatchPart<T> {
506468
pub(crate) metrics: Arc<Metrics>,
507469
pub(crate) shard_id: ShardId,
508-
pub(crate) reader_id: LeasedReaderId,
509470
pub(crate) filter: FetchBatchFilter<T>,
510471
pub(crate) desc: Description<T>,
511472
pub(crate) part: BatchPart<T>,
@@ -523,7 +484,7 @@ impl<T> LeasedBatchPart<T>
523484
where
524485
T: Timestamp + Codec64,
525486
{
526-
/// Takes `self` into a [`SerdeLeasedBatchPart`], which allows `self` to be
487+
/// Takes `self` into a [`ExchangeableBatchPart`], which allows `self` to be
527488
/// exchanged (potentially across the network).
528489
///
529490
/// !!!WARNING!!!
@@ -532,13 +493,16 @@ where
532493
/// that can't travel across process boundaries. The caller is responsible for
533494
/// ensuring that the lease is held for as long as the batch part may be in use:
534495
/// dropping it too early may cause a fetch to fail.
535-
pub(crate) fn into_exchangeable_part(mut self) -> (SerdeLeasedBatchPart, Option<Lease>) {
536-
let (proto, _metrics) = self.into_proto();
496+
pub(crate) fn into_exchangeable_part(mut self) -> (ExchangeableBatchPart<T>, Option<Lease>) {
537497
// If `x` has a lease, we've effectively transferred it to `r`.
538498
let lease = self.lease.take();
539-
let part = SerdeLeasedBatchPart {
499+
let part = ExchangeableBatchPart {
500+
shard_id: self.shard_id,
540501
encoded_size_bytes: self.part.encoded_size_bytes(),
541-
proto: LazyProto::from(&proto),
502+
desc: self.desc.clone(),
503+
filter: self.filter.clone(),
504+
part: LazyProto::from(&self.part.into_proto()),
505+
filter_pushdown_audit: self.filter_pushdown_audit,
542506
};
543507
(part, lease)
544508
}
@@ -1392,69 +1356,21 @@ where
13921356
/// - [`LeasedBatchPart`]
13931357
/// - `From<SerdeLeasedBatchPart>` for `LeasedBatchPart<T>`
13941358
#[derive(Debug, Serialize, Deserialize, Clone)]
1395-
pub struct SerdeLeasedBatchPart {
1359+
pub struct ExchangeableBatchPart<T> {
1360+
shard_id: ShardId,
13961361
// Duplicated with the one serialized in the proto for use in backpressure.
13971362
encoded_size_bytes: usize,
1398-
// We wrap this in a LazyProto because it guarantees that we use the proto
1399-
// encoding for the serde impls.
1400-
proto: LazyProto<ProtoLeasedBatchPart>,
1363+
desc: Description<T>,
1364+
filter: FetchBatchFilter<T>,
1365+
part: LazyProto<ProtoHollowBatchPart>,
1366+
filter_pushdown_audit: bool,
14011367
}
14021368

1403-
impl SerdeLeasedBatchPart {
1369+
impl<T> ExchangeableBatchPart<T> {
14041370
/// Returns the encoded size of the given part.
14051371
pub fn encoded_size_bytes(&self) -> usize {
14061372
self.encoded_size_bytes
14071373
}
1408-
1409-
pub(crate) fn decode<T: Timestamp + Codec64>(
1410-
&self,
1411-
metrics: Arc<Metrics>,
1412-
) -> LeasedBatchPart<T> {
1413-
let proto = self.proto.decode().expect("valid leased batch part");
1414-
(proto, metrics)
1415-
.into_rust()
1416-
.expect("valid leased batch part")
1417-
}
1418-
}
1419-
1420-
// TODO: The way we're smuggling the metrics through here is a bit odd. Perhaps
1421-
// we could refactor `LeasedBatchPart` into some proto-able struct plus the
1422-
// metrics for the Drop bit?
1423-
impl<T: Timestamp + Codec64> RustType<(ProtoLeasedBatchPart, Arc<Metrics>)> for LeasedBatchPart<T> {
1424-
fn into_proto(&self) -> (ProtoLeasedBatchPart, Arc<Metrics>) {
1425-
let proto = ProtoLeasedBatchPart {
1426-
shard_id: self.shard_id.into_proto(),
1427-
filter: Some(self.filter.into_proto()),
1428-
desc: Some(self.desc.into_proto()),
1429-
part: Some(self.part.into_proto()),
1430-
lease: Some(ProtoLease {
1431-
reader_id: self.reader_id.into_proto(),
1432-
seqno: Some(self.leased_seqno.into_proto()),
1433-
}),
1434-
filter_pushdown_audit: self.filter_pushdown_audit,
1435-
};
1436-
(proto, Arc::clone(&self.metrics))
1437-
}
1438-
1439-
fn from_proto(proto: (ProtoLeasedBatchPart, Arc<Metrics>)) -> Result<Self, TryFromProtoError> {
1440-
let (proto, metrics) = proto;
1441-
let lease = proto
1442-
.lease
1443-
.ok_or_else(|| TryFromProtoError::missing_field("ProtoLeasedBatchPart::lease"))?;
1444-
Ok(LeasedBatchPart {
1445-
metrics,
1446-
shard_id: proto.shard_id.into_rust()?,
1447-
filter: proto
1448-
.filter
1449-
.into_rust_if_some("ProtoLeasedBatchPart::filter")?,
1450-
desc: proto.desc.into_rust_if_some("ProtoLeasedBatchPart::desc")?,
1451-
part: proto.part.into_rust_if_some("ProtoLeasedBatchPart::part")?,
1452-
reader_id: lease.reader_id.into_rust()?,
1453-
leased_seqno: lease.seqno.into_rust_if_some("ProtoLease::seqno")?,
1454-
lease: None,
1455-
filter_pushdown_audit: proto.filter_pushdown_audit,
1456-
})
1457-
}
14581374
}
14591375

14601376
/// Format we'll use when decoding a [`Part`].
@@ -1522,6 +1438,6 @@ fn client_exchange_data() {
15221438
// between timely workers, including over the network. Enforce then that it
15231439
// implements ExchangeData.
15241440
fn is_exchange_data<T: timely::ExchangeData>() {}
1525-
is_exchange_data::<SerdeLeasedBatchPart>();
1526-
is_exchange_data::<SerdeLeasedBatchPart>();
1441+
is_exchange_data::<ExchangeableBatchPart<u64>>();
1442+
is_exchange_data::<ExchangeableBatchPart<u64>>();
15271443
}

src/persist-client/src/internal/encoding.rs

+4
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ impl<T: Message + Default> LazyProto<T> {
169169
pub fn decode(&self) -> Result<T, prost::DecodeError> {
170170
T::decode(&*self.buf)
171171
}
172+
173+
pub fn decode_to<R: RustType<T>>(&self) -> anyhow::Result<R> {
174+
Ok(T::decode(&*self.buf)?.into_rust()?)
175+
}
172176
}
173177

174178
impl<T: Message + Default> RustType<Bytes> for LazyProto<T> {

src/persist-client/src/lib.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -1106,8 +1106,9 @@ mod tests {
11061106
)
11071107
.await
11081108
.unwrap();
1109-
for batch in snap {
1110-
let res = fetcher1.fetch_leased_part(&batch).await;
1109+
for part in snap {
1110+
let (part, _lease) = part.into_exchangeable_part();
1111+
let res = fetcher1.fetch_leased_part(part).await;
11111112
assert_eq!(
11121113
res.unwrap_err(),
11131114
InvalidUsage::BatchNotFromThisShard {

0 commit comments

Comments
 (0)