Skip to content

Commit 713330d

Browse files
authored
columnar: Refactor PartEncoder, add decode_into(...) to PartDecoder (MaterializeInc#26669)
This PR was split out from MaterializeInc#26605 ### Motivation This refactors `PartEncoder` to take ownership of the columns it's encoding into, rather than just mutable references. More detail is provided in MaterializeInc#26605 but the tl;dr is we only want to downcast `dyn Array`s once, and there isn't any lifetime that we can associate with the borrows to achieve this. It also renames the existing `PartDecoder::decode(idx, &mut V)` method to `PartDecoder::decode_into(...)` and adds a new `PartDecoder::decode(idx) -> V` method. The goal of `decode_into(...)` is it allows you to re-use allocations where possible, but that isn't currently applicable when decoding from our structured columnar data. When decoding with `Codec` we go from `&[u8]` -> `ProtoRow` -> `Row`, so it's very helpful to retain the intermediate `ProtoRow` to reduce allocations. But with columnar data we go directly from `dyn arrow::Array` -> `Row`, so there isn't any intermediate step to retain. Also we return the `Row`s we decode as part of an iterator, so there isn't any way to reclaim them for re-use. In practice without `PartDecoder::decode(idx)` we end up with a pattern like: ``` let mut val = V::default(); decoder.decode_into(idx, &mut val); val ``` Instead of requiring our generic parameters `K` and `V` to implement `Default`, it was easier to add the new method. ### Tips for reviewer This PR is broken up into two commits: 1. Refactor to `PartEncoder` 2. New methods on `PartDecoder` ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [x] This PR includes the following [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note): - N/a
1 parent 937b9e3 commit 713330d

File tree

19 files changed

+477
-346
lines changed

19 files changed

+477
-346
lines changed

src/persist-cli/src/maelstrom/txn_list_append_single.rs

+5-7
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use crate::maelstrom::txn_list_append_single::codec_impls::{
4646
use crate::maelstrom::Args;
4747

4848
/// Key of the persist shard used by [Transactor]
49-
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
49+
#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
5050
pub struct MaelstromKey(u64);
5151

5252
/// Val of the persist shard used by [Transactor]
@@ -749,8 +749,7 @@ mod codec_impls {
749749
pub struct MaelstromKeySchema;
750750

751751
impl Schema<MaelstromKey> for MaelstromKeySchema {
752-
type Encoder<'a> = SimpleEncoder<'a, MaelstromKey, u64>;
753-
752+
type Encoder = SimpleEncoder<MaelstromKey, u64>;
754753
type Decoder = SimpleDecoder<MaelstromKey, u64>;
755754

756755
fn columns(&self) -> DynStructCfg {
@@ -761,7 +760,7 @@ mod codec_impls {
761760
SimpleSchema::<MaelstromKey, u64>::decoder(cols, |val, ret| ret.0 = val)
762761
}
763762

764-
fn encoder<'a>(&self, cols: ColumnsMut<'a>) -> Result<Self::Encoder<'a>, String> {
763+
fn encoder(&self, cols: ColumnsMut) -> Result<Self::Encoder, String> {
765764
SimpleSchema::<MaelstromKey, u64>::encoder(cols, |val| val.0)
766765
}
767766
}
@@ -793,8 +792,7 @@ mod codec_impls {
793792
pub struct MaelstromValSchema;
794793

795794
impl Schema<MaelstromVal> for MaelstromValSchema {
796-
type Encoder<'a> = SimpleEncoder<'a, MaelstromVal, Vec<u8>>;
797-
795+
type Encoder = SimpleEncoder<MaelstromVal, Vec<u8>>;
798796
type Decoder = SimpleDecoder<MaelstromVal, Vec<u8>>;
799797

800798
fn columns(&self) -> DynStructCfg {
@@ -807,7 +805,7 @@ mod codec_impls {
807805
})
808806
}
809807

810-
fn encoder<'a>(&self, cols: ColumnsMut<'a>) -> Result<Self::Encoder<'a>, String> {
808+
fn encoder(&self, cols: ColumnsMut) -> Result<Self::Encoder, String> {
811809
SimpleSchema::<MaelstromVal, Vec<u8>>::push_encoder(cols, |col, val| {
812810
let mut buf = Vec::new();
813811
MaelstromVal::encode(val, &mut buf);

src/persist-client/src/fetch.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -629,8 +629,8 @@ pub(crate) struct EncodedPart<T> {
629629

630630
impl<K, V, T, D> FetchedPart<K, V, T, D>
631631
where
632-
K: Debug + Codec,
633-
V: Debug + Codec,
632+
K: Debug + Codec + Default,
633+
V: Debug + Codec + Default,
634634
T: Timestamp + Lattice + Codec64,
635635
D: Semigroup + Codec64 + Send + Sync,
636636
{
@@ -694,8 +694,8 @@ where
694694

695695
impl<K, V, T, D> Iterator for FetchedPart<K, V, T, D>
696696
where
697-
K: Debug + Codec,
698-
V: Debug + Codec,
697+
K: Debug + Codec + Default,
698+
V: Debug + Codec + Default,
699699
T: Timestamp + Lattice + Codec64,
700700
D: Semigroup + Codec64 + Send + Sync,
701701
{

src/persist-client/src/read.rs

+80-40
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,15 @@ where
147147
}
148148
}
149149
}
150+
}
150151

152+
impl<K, V, T, D> Subscribe<K, V, T, D>
153+
where
154+
K: Debug + Codec + Default,
155+
V: Debug + Codec + Default,
156+
T: Timestamp + Lattice + Codec64,
157+
D: Semigroup + Codec64 + Send + Sync,
158+
{
151159
/// Equivalent to `next`, but rather than returning a [`LeasedBatchPart`],
152160
/// fetches and returns the data from within it.
153161
#[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
@@ -184,7 +192,15 @@ where
184192
pub async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
185193
self.listen.fetch_batch_part(part).await
186194
}
195+
}
187196

197+
impl<K, V, T, D> Subscribe<K, V, T, D>
198+
where
199+
K: Debug + Codec,
200+
V: Debug + Codec,
201+
T: Timestamp + Lattice + Codec64,
202+
D: Semigroup + Codec64 + Send + Sync,
203+
{
188204
/// Takes a [`SerdeLeasedBatchPart`] into a [`LeasedBatchPart`].
189205
pub fn leased_part_from_exchangeable(&self, x: SerdeLeasedBatchPart) -> LeasedBatchPart<T> {
190206
self.listen
@@ -269,19 +285,6 @@ where
269285
}
270286
}
271287

272-
/// Convert listener into futures::Stream
273-
pub fn into_stream(
274-
mut self,
275-
) -> impl Stream<Item = ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
276-
async_stream::stream!({
277-
loop {
278-
for msg in self.fetch_next().await {
279-
yield msg;
280-
}
281-
}
282-
})
283-
}
284-
285288
/// An exclusive upper bound on the progress of this Listen.
286289
pub fn frontier(&self) -> &Antichain<T> {
287290
&self.frontier
@@ -383,7 +386,15 @@ where
383386

384387
(parts, self.frontier.clone())
385388
}
389+
}
386390

391+
impl<K, V, T, D> Listen<K, V, T, D>
392+
where
393+
K: Debug + Codec + Default,
394+
V: Debug + Codec + Default,
395+
T: Timestamp + Lattice + Codec64,
396+
D: Semigroup + Codec64 + Send + Sync,
397+
{
387398
/// Attempt to pull out the next values of this subscription.
388399
///
389400
/// The updates received in [ListenEvent::Updates] should be assumed to be in arbitrary order
@@ -411,6 +422,54 @@ where
411422
ret
412423
}
413424

425+
/// Convert listener into futures::Stream
426+
pub fn into_stream(
427+
mut self,
428+
) -> impl Stream<Item = ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
429+
async_stream::stream!({
430+
loop {
431+
for msg in self.fetch_next().await {
432+
yield msg;
433+
}
434+
}
435+
})
436+
}
437+
438+
/// Test helper to read from the listener until the given frontier is
439+
/// reached. Because compaction can arbitrarily combine batches, we only
440+
/// return the final progress info.
441+
#[cfg(test)]
442+
#[track_caller]
443+
pub async fn read_until(
444+
&mut self,
445+
ts: &T,
446+
) -> (
447+
Vec<((Result<K, String>, Result<V, String>), T, D)>,
448+
Antichain<T>,
449+
) {
450+
let mut updates = Vec::new();
451+
let mut frontier = Antichain::from_elem(T::minimum());
452+
while self.frontier.less_than(ts) {
453+
for event in self.fetch_next().await {
454+
match event {
455+
ListenEvent::Updates(mut x) => updates.append(&mut x),
456+
ListenEvent::Progress(x) => frontier = x,
457+
}
458+
}
459+
}
460+
// Unlike most tests, intentionally don't consolidate updates here
461+
// because Listen replays them at the original fidelity.
462+
(updates, frontier)
463+
}
464+
}
465+
466+
impl<K, V, T, D> Listen<K, V, T, D>
467+
where
468+
K: Debug + Codec,
469+
V: Debug + Codec,
470+
T: Timestamp + Lattice + Codec64,
471+
D: Semigroup + Codec64 + Send + Sync,
472+
{
414473
/// Fetches the contents of `part` and returns its lease.
415474
///
416475
/// This is broken out into its own function to provide a trivial means for
@@ -441,33 +500,6 @@ where
441500
pub async fn expire(self) {
442501
self.handle.expire().await
443502
}
444-
445-
/// Test helper to read from the listener until the given frontier is
446-
/// reached. Because compaction can arbitrarily combine batches, we only
447-
/// return the final progress info.
448-
#[cfg(test)]
449-
#[track_caller]
450-
pub async fn read_until(
451-
&mut self,
452-
ts: &T,
453-
) -> (
454-
Vec<((Result<K, String>, Result<V, String>), T, D)>,
455-
Antichain<T>,
456-
) {
457-
let mut updates = Vec::new();
458-
let mut frontier = Antichain::from_elem(T::minimum());
459-
while self.frontier.less_than(ts) {
460-
for event in self.fetch_next().await {
461-
match event {
462-
ListenEvent::Updates(mut x) => updates.append(&mut x),
463-
ListenEvent::Progress(x) => frontier = x,
464-
}
465-
}
466-
}
467-
// Unlike most tests, intentionally don't consolidate updates here
468-
// because Listen replays them at the original fidelity.
469-
(updates, frontier)
470-
}
471503
}
472504

473505
#[derive(Clone, Debug)]
@@ -1100,7 +1132,15 @@ where
11001132
parts,
11011133
})
11021134
}
1135+
}
11031136

1137+
impl<K, V, T, D> ReadHandle<K, V, T, D>
1138+
where
1139+
K: Debug + Codec + Ord + Default,
1140+
V: Debug + Codec + Ord + Default,
1141+
T: Timestamp + Lattice + Codec64,
1142+
D: Semigroup + Codec64 + Send + Sync,
1143+
{
11041144
/// Generates a [Self::snapshot], and streams out all of the updates
11051145
/// it contains in bounded memory.
11061146
///

src/persist-client/src/stats.rs

+8-13
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use std::sync::Arc;
1414

1515
use mz_dyncfg::{Config, ConfigSet};
1616
use mz_persist::indexed::columnar::ColumnarRecords;
17-
use mz_persist_types::columnar::{PartEncoder, Schema};
1817
use mz_persist_types::part::PartBuilder;
1918
use mz_persist_types::stats::PartStats;
2019
use mz_persist_types::Codec;
@@ -129,24 +128,20 @@ pub(crate) fn part_stats_for_legacy_part<K: Codec, V: Codec>(
129128
// This is a laughably inefficient placeholder implementation of stats
130129
// on the old part format. We don't intend to make this fast, rather we
131130
// intend to compute stats on the new part format.
132-
let mut new_format = PartBuilder::new(schemas.key.as_ref(), schemas.val.as_ref());
133-
let mut builder = new_format.get_mut();
134-
let mut key = schemas.key.encoder(builder.key)?;
135-
let mut val = schemas.val.encoder(builder.val)?;
131+
let mut builder = PartBuilder::new(schemas.key.as_ref(), schemas.val.as_ref())?;
136132
for x in part {
137133
for ((k, v), t, d) in x.iter() {
138134
let k = K::decode(k)?;
139135
let v = V::decode(v)?;
140-
key.encode(&k);
141-
val.encode(&v);
142-
builder.ts.push(i64::from_le_bytes(t));
143-
builder.diff.push(i64::from_le_bytes(d));
136+
let t = i64::from_le_bytes(t);
137+
let d = i64::from_le_bytes(d);
138+
139+
builder.push(&k, &v, t, d);
144140
}
145141
}
146-
drop(key);
147-
drop(val);
148-
let new_format = new_format.finish()?;
149-
PartStats::new(&new_format)
142+
let part = builder.finish();
143+
144+
PartStats::new(&part)
150145
}
151146

152147
/// Statistics about the contents of a shard as_of some time.

src/persist-txn/src/txn_read.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,8 @@ impl<T: Timestamp + Lattice + TotalOrder + Codec64> DataSnapshot<T> {
180180
data_read: &mut ReadHandle<K, V, T, D>,
181181
) -> Result<impl Stream<Item = ((Result<K, String>, Result<V, String>), T, D)>, Since<T>>
182182
where
183-
K: Debug + Codec + Ord,
184-
V: Debug + Codec + Ord,
183+
K: Debug + Codec + Ord + Default,
184+
V: Debug + Codec + Ord + Default,
185185
D: Semigroup + Codec64 + Send + Sync,
186186
{
187187
let data_write = WriteHandle::from_read(data_read, "unblock_read");

0 commit comments

Comments
 (0)