Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions proto/store/v1/ingest.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ service Service {
// covers this write. Clients can pass that sequence number to query RPCs
// (via `min_sequence_number`) for read-after-write consistency.
rpc Put(PutRequest) returns (PutResponse);

// Atomically write a streamed batch of key-value pairs. Each request chunk
// carries one or more pairs, and the server persists the combined stream as
// a single Store batch after the client closes the stream.
rpc PutMany(stream PutRequest) returns (PutResponse);
}

// Batch write request. All pairs are applied atomically.
Expand Down
2 changes: 1 addition & 1 deletion sdk/rs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ use exoware_sdk::StoreWriteBatch;
let mut batch = StoreWriteBatch::new();
batch.push(&orders, &order_key, order_value)?;
batch.push(&accounts, &account_key, account_value)?;
let sequence = batch.commit(&base).await?;
let sequence = batch.finish(&base).await?;
```
86 changes: 86 additions & 0 deletions sdk/rs/src/gen/store.v1.ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,25 @@ pub trait Service: Send + Sync + 'static {
) -> impl ::std::future::Future<
Output = Result<(PutResponse, ::connectrpc::Context), ::connectrpc::ConnectError>,
> + Send;
/// Atomically write a streamed batch of key-value pairs. Each request chunk
/// carries one or more pairs, and the server persists the combined stream as
/// a single Store batch after the client closes the stream.
fn put_many(
&self,
ctx: ::connectrpc::Context,
requests: ::std::pin::Pin<
Box<
dyn ::futures::Stream<
Item = Result<
::buffa::view::OwnedView<PutRequestView<'static>>,
::connectrpc::ConnectError,
>,
> + Send,
>,
>,
) -> impl ::std::future::Future<
Output = Result<(PutResponse, ::connectrpc::Context), ::connectrpc::ConnectError>,
> + Send;
}
/// Extension trait for registering a service implementation with a Router.
///
Expand Down Expand Up @@ -567,6 +586,17 @@ impl<S: Service> ServiceExt for S {
})
},
)
.route_view_client_stream(
SERVICE_SERVICE_NAME,
"PutMany",
::connectrpc::view_client_streaming_handler_fn({
let svc = ::std::sync::Arc::clone(&self);
move |ctx, req| {
let svc = ::std::sync::Arc::clone(&svc);
async move { svc.put_many(ctx, req).await }
}
}),
)
}
}
/// Monomorphic dispatcher for `Service`.
Expand Down Expand Up @@ -615,6 +645,11 @@ impl<T: Service> ::connectrpc::Dispatcher for ServiceServer<T> {
"Put" => {
Some(::connectrpc::dispatcher::codegen::MethodDescriptor::unary(false))
}
"PutMany" => {
Some(
::connectrpc::dispatcher::codegen::MethodDescriptor::client_streaming(),
)
}
_ => None,
}
}
Expand Down Expand Up @@ -674,6 +709,20 @@ impl<T: Service> ::connectrpc::Dispatcher for ServiceServer<T> {
};
let _ = (&ctx, &requests, &format);
match method {
"PutMany" => {
let svc = ::std::sync::Arc::clone(&self.inner);
Box::pin(async move {
let req_stream = ::connectrpc::dispatcher::codegen::decode_view_request_stream::<
PutRequestView,
>(requests, format);
let (res, ctx) = svc.put_many(ctx, req_stream).await?;
let bytes = ::connectrpc::dispatcher::codegen::encode_response(
&res,
format,
)?;
Ok((bytes, ctx))
})
}
_ => ::connectrpc::dispatcher::codegen::unimplemented_unary(path),
}
}
Expand Down Expand Up @@ -798,4 +847,41 @@ where
)
.await
}
/// Call the PutMany RPC. Sends a request to /store.ingest.v1.Service/PutMany.
pub async fn put_many(
&self,
requests: impl IntoIterator<Item = PutRequest>,
) -> Result<
::connectrpc::client::UnaryResponse<
::buffa::view::OwnedView<PutResponseView<'static>>,
>,
::connectrpc::ConnectError,
> {
self.put_many_with_options(
requests,
::connectrpc::client::CallOptions::default(),
)
.await
}
/// Call the PutMany RPC with explicit per-call options. Options override [`connectrpc::client::ClientConfig`] defaults.
pub async fn put_many_with_options(
&self,
requests: impl IntoIterator<Item = PutRequest>,
options: ::connectrpc::client::CallOptions,
) -> Result<
::connectrpc::client::UnaryResponse<
::buffa::view::OwnedView<PutResponseView<'static>>,
>,
::connectrpc::ConnectError,
> {
::connectrpc::client::call_client_stream(
&self.transport,
&self.config,
SERVICE_SERVICE_NAME,
"PutMany",
requests,
options,
)
.await
}
}
125 changes: 88 additions & 37 deletions sdk/rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use std::time::Duration;
const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 3;
const DEFAULT_RETRY_INITIAL_BACKOFF_MS: u64 = 100;
const DEFAULT_RETRY_MAX_BACKOFF_MS: u64 = 2_000;
const PUT_MANY_CHUNK_ENTRIES: usize = 4096;

/// Codec used to compress **outgoing** RPC request bodies when compression applies.
///
Expand Down Expand Up @@ -282,28 +283,40 @@ impl StoreKeyPrefix {
/// A physical Store write batch assembled from one or more logical clients.
///
/// Use [`Self::push`] with the specific prefixed client that produced each
/// logical key, then [`Self::commit`] once to submit all rows in one atomic
/// Store `Put`.
/// logical key, then [`Self::finish`] once to submit all rows in one atomic
/// streaming Store `PutMany`.
#[derive(Clone, Debug, Default)]
pub struct StoreWriteBatch {
entries: Vec<(Key, Bytes)>,
kvs: Vec<exoware_proto::common::KvEntry>,
}

impl StoreWriteBatch {
pub fn new() -> Self {
Self::default()
}

pub fn with_capacity(capacity: usize) -> Self {
Self {
kvs: Vec::with_capacity(capacity),
}
}

pub fn len(&self) -> usize {
self.entries.len()
self.kvs.len()
}

pub fn is_empty(&self) -> bool {
self.entries.is_empty()
self.kvs.is_empty()
}

pub fn clear(&mut self) {
self.entries.clear();
self.kvs.clear();
}

pub fn entries(&self) -> impl ExactSizeIterator<Item = (&[u8], &[u8])> {
self.kvs
.iter()
.map(|entry| (entry.key.as_slice(), entry.value.as_slice()))
}

pub fn push(
Expand All @@ -312,18 +325,25 @@ impl StoreWriteBatch {
key: &Key,
value: &[u8],
) -> Result<&mut Self, ClientError> {
self.entries
.push((client.encode_store_key(key)?, Bytes::copy_from_slice(value)));
self.push_physical(&client.encode_store_key(key)?, value);
Ok(self)
}

fn push_physical(&mut self, key: &Key, value: &[u8]) -> &mut Self {
self.kvs.push(exoware_proto::common::KvEntry {
key: key.to_vec(),
value: value.to_vec(),
..Default::default()
});
self
}

pub async fn finish(self, client: &StoreClient) -> Result<u64, ClientError> {
client.put_many(self).await
}

pub async fn commit(&self, client: &StoreClient) -> Result<u64, ClientError> {
let refs: Vec<(&Key, &[u8])> = self
.entries
.iter()
.map(|(key, value)| (key, value.as_ref()))
.collect();
client.put_physical(&refs).await
client.put_many(self.clone()).await
}
}

Expand All @@ -335,7 +355,7 @@ impl StoreWriteBatch {
/// this trait provides the common lifecycle:
///
/// 1. stage rows into a [`StoreWriteBatch`]
/// 2. commit that batch
/// 2. finish that batch
/// 3. mark the prepared handle persisted with the returned Store sequence
/// number, or failed if staging/commit does not complete
pub trait StoreBatchUpload {
Expand Down Expand Up @@ -1332,31 +1352,21 @@ impl StoreClient {
Stream { c: self }
}

/// Start assembling an atomic Store write batch incrementally.
pub fn write_batch(&self) -> StoreWriteBatch {
StoreWriteBatch::new()
}

/// Submit a KV batch via Connect `Put`.
///
/// On success returns the **store sequence number** from the response. Use it for immediate
/// `get_with_min_sequence_number` / range calls or to seed
/// [`Self::create_session_with_sequence`].
/// If the request succeeds, the server accepts the full batch (count is `kvs.len()`).
pub(crate) async fn put(&self, kvs: &[(&Key, &[u8])]) -> Result<u64, ClientError> {
if self.key_prefix.is_none() {
return self.put_physical(kvs).await;
}
let mut keys = Vec::with_capacity(kvs.len());
for (key, _) in kvs {
keys.push(self.encode_store_key(key)?);
}
let prefixed: Vec<(&Key, &[u8])> = keys
.iter()
.zip(kvs.iter())
.map(|(key, (_, value))| (key, *value))
.collect();
self.put_physical(&prefixed).await
}

async fn put_physical(&self, kvs: &[(&Key, &[u8])]) -> Result<u64, ClientError> {
let mut proto_kvs = Vec::with_capacity(kvs.len());
for (key, value) in kvs {
let key = self.encode_store_key(key)?;
if !is_valid_key_size(key.len()) {
return Err(ClientError::WireFormat(format!(
"key length {} is outside valid store key range ({}..={})",
Expand All @@ -1366,12 +1376,11 @@ impl StoreClient {
)));
}
proto_kvs.push(exoware_proto::common::KvEntry {
key: (*key).to_vec(),
key: key.to_vec(),
value: value.to_vec(),
..Default::default()
});
}

let config =
store_connect_client_config(self.ingest_uri.clone(), self.connect_request_compression);
let client = IngestServiceClient::new(self.connect_http.clone(), config);
Expand All @@ -1385,6 +1394,40 @@ impl StoreClient {
Ok(response.into_owned().sequence_number)
}

async fn put_many(&self, batch: StoreWriteBatch) -> Result<u64, ClientError> {
for entry in &batch.kvs {
if !is_valid_key_size(entry.key.len()) {
return Err(ClientError::WireFormat(format!(
"key length {} is outside valid store key range ({}..={})",
entry.key.len(),
keys::MIN_KEY_LEN,
MAX_KEY_LEN
)));
}
}

let config =
store_connect_client_config(self.ingest_uri.clone(), self.connect_request_compression);
let client = IngestServiceClient::new(self.connect_http.clone(), config);
let mut kvs = batch.kvs.into_iter();
let requests = std::iter::from_fn(move || {
let chunk: Vec<_> = kvs.by_ref().take(PUT_MANY_CHUNK_ENTRIES).collect();
if chunk.is_empty() {
None
} else {
Some(ProtoPutRequest {
kvs: chunk,
..Default::default()
})
}
});
let response = client
.put_many(requests)
.await
.map_err(client_error_from_connect)?;
Ok(response.into_owned().sequence_number)
}

pub(crate) async fn get(&self, key: &Key) -> Result<Option<Bytes>, ClientError> {
self.get_internal(key, None).await
}
Expand Down Expand Up @@ -2103,6 +2146,10 @@ pub struct Stream<'a> {
}

impl<'a> Ingest<'a> {
pub fn write_batch(&self) -> StoreWriteBatch {
self.c.write_batch()
}

pub async fn put(&self, kvs: &[(&Key, &[u8])]) -> Result<u64, ClientError> {
self.c.put(kvs).await
}
Expand All @@ -2112,6 +2159,10 @@ impl<'a> Ingest<'a> {
pub async fn put_prepared(&self, batch: &StoreWriteBatch) -> Result<u64, ClientError> {
batch.commit(self.c).await
}

pub async fn put_many(&self, batch: StoreWriteBatch) -> Result<u64, ClientError> {
batch.finish(self.c).await
}
}

impl<'a> Query<'a> {
Expand Down Expand Up @@ -3017,12 +3068,12 @@ mod tests {
batch.push(&b, &key_b, b"vb").unwrap();

assert_eq!(
batch.entries[0].0,
a.key_prefix().unwrap().encode_key(&key_a).unwrap()
batch.kvs[0].key,
a.key_prefix().unwrap().encode_key(&key_a).unwrap().to_vec()
);
assert_eq!(
batch.entries[1].0,
b.key_prefix().unwrap().encode_key(&key_b).unwrap()
batch.kvs[1].key,
b.key_prefix().unwrap().encode_key(&key_b).unwrap().to_vec()
);
}

Expand Down
2 changes: 1 addition & 1 deletion sdk/ts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const accounts = base.withKeyPrefix(new StoreKeyPrefix(4, 2));
const batch = new StoreWriteBatch()
.push(orders, orderKey, orderValue)
.push(accounts, accountKey, accountValue);
const sequence = await batch.commit(base);
const sequence = await batch.finish(base);
```

## Generated TypeScript (`gen/ts`)
Expand Down
2 changes: 1 addition & 1 deletion sdk/ts/__tests__/sdk.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ describe('Exoware TS SDK', () => {
const batch = new StoreWriteBatch();
batch.push(a, key, Buffer.from('value-a'));
batch.push(b, key, Buffer.from('value-b'));
const sequenceNumber = await batch.commit(base);
const sequenceNumber = await batch.finish(base);

const resultA = await a.get(key);
const resultB = await b.get(key);
Expand Down
2 changes: 1 addition & 1 deletion sdk/ts/buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
version: v2
plugins:
- remote: buf.build/bufbuild/es:v2.2.3
- remote: buf.build/bufbuild/es:v2.11.0
out: src/gen/ts
opt:
- target=ts
Expand Down
Loading
Loading