From 013b46bc985413ac6f9a4eecdafa122a5fef68de Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Fri, 15 May 2026 01:00:11 -0700 Subject: [PATCH] spike --- proto/store/v1/ingest.proto | 5 + sdk/rs/README.md | 2 +- sdk/rs/src/gen/store.v1.ingest.rs | 86 ++++++++++ sdk/rs/src/lib.rs | 125 ++++++++++----- sdk/ts/README.md | 2 +- sdk/ts/__tests__/sdk.test.ts | 2 +- sdk/ts/buf.gen.yaml | 2 +- sdk/ts/src/client.ts | 2 + sdk/ts/src/gen/ts/buf/validate/validate_pb.ts | 6 +- .../src/gen/ts/google/rpc/error_details_pb.ts | 6 +- sdk/ts/src/gen/ts/store/v1/common_pb.ts | 6 +- sdk/ts/src/gen/ts/store/v1/compact_pb.ts | 6 +- sdk/ts/src/gen/ts/store/v1/ingest_pb.ts | 20 ++- sdk/ts/src/gen/ts/store/v1/query_pb.ts | 6 +- sdk/ts/src/gen/ts/store/v1/stream_pb.ts | 6 +- sdk/ts/src/store.ts | 149 +++++++++++++++--- server/src/connect.rs | 93 ++++++++--- server/src/validate.rs | 14 ++ 18 files changed, 436 insertions(+), 102 deletions(-) diff --git a/proto/store/v1/ingest.proto b/proto/store/v1/ingest.proto index ee1fbbe4..2ff65624 100644 --- a/proto/store/v1/ingest.proto +++ b/proto/store/v1/ingest.proto @@ -12,6 +12,11 @@ service Service { // covers this write. Clients can pass that sequence number to query RPCs // (via `min_sequence_number`) for read-after-write consistency. rpc Put(PutRequest) returns (PutResponse); + + // Atomically write a streamed batch of key-value pairs. Each request chunk + // carries one or more pairs, and the server persists the combined stream as + // a single Store batch after the client closes the stream. + rpc PutMany(stream PutRequest) returns (PutResponse); } // Batch write request. All pairs are applied atomically. diff --git a/sdk/rs/README.md b/sdk/rs/README.md index bcf73c0e..25900a44 100644 --- a/sdk/rs/README.md +++ b/sdk/rs/README.md @@ -29,5 +29,5 @@ use exoware_sdk::StoreWriteBatch; let mut batch = StoreWriteBatch::new(); batch.push(&orders, &order_key, order_value)?; batch.push(&accounts, &account_key, account_value)?; -let sequence = batch.commit(&base).await?; +let sequence = batch.finish(&base).await?; ``` diff --git a/sdk/rs/src/gen/store.v1.ingest.rs b/sdk/rs/src/gen/store.v1.ingest.rs index db7a0285..1198a5d4 100644 --- a/sdk/rs/src/gen/store.v1.ingest.rs +++ b/sdk/rs/src/gen/store.v1.ingest.rs @@ -527,6 +527,25 @@ pub trait Service: Send + Sync + 'static { ) -> impl ::std::future::Future< Output = Result<(PutResponse, ::connectrpc::Context), ::connectrpc::ConnectError>, > + Send; + /// Atomically write a streamed batch of key-value pairs. Each request chunk + /// carries one or more pairs, and the server persists the combined stream as + /// a single Store batch after the client closes the stream. + fn put_many( + &self, + ctx: ::connectrpc::Context, + requests: ::std::pin::Pin< + Box< + dyn ::futures::Stream< + Item = Result< + ::buffa::view::OwnedView>, + ::connectrpc::ConnectError, + >, + > + Send, + >, + >, + ) -> impl ::std::future::Future< + Output = Result<(PutResponse, ::connectrpc::Context), ::connectrpc::ConnectError>, + > + Send; } /// Extension trait for registering a service implementation with a Router. /// @@ -567,6 +586,17 @@ impl ServiceExt for S { }) }, ) + .route_view_client_stream( + SERVICE_SERVICE_NAME, + "PutMany", + ::connectrpc::view_client_streaming_handler_fn({ + let svc = ::std::sync::Arc::clone(&self); + move |ctx, req| { + let svc = ::std::sync::Arc::clone(&svc); + async move { svc.put_many(ctx, req).await } + } + }), + ) } } /// Monomorphic dispatcher for `Service`. @@ -615,6 +645,11 @@ impl ::connectrpc::Dispatcher for ServiceServer { "Put" => { Some(::connectrpc::dispatcher::codegen::MethodDescriptor::unary(false)) } + "PutMany" => { + Some( + ::connectrpc::dispatcher::codegen::MethodDescriptor::client_streaming(), + ) + } _ => None, } } @@ -674,6 +709,20 @@ impl ::connectrpc::Dispatcher for ServiceServer { }; let _ = (&ctx, &requests, &format); match method { + "PutMany" => { + let svc = ::std::sync::Arc::clone(&self.inner); + Box::pin(async move { + let req_stream = ::connectrpc::dispatcher::codegen::decode_view_request_stream::< + PutRequestView, + >(requests, format); + let (res, ctx) = svc.put_many(ctx, req_stream).await?; + let bytes = ::connectrpc::dispatcher::codegen::encode_response( + &res, + format, + )?; + Ok((bytes, ctx)) + }) + } _ => ::connectrpc::dispatcher::codegen::unimplemented_unary(path), } } @@ -798,4 +847,41 @@ where ) .await } + /// Call the PutMany RPC. Sends a request to /store.ingest.v1.Service/PutMany. + pub async fn put_many( + &self, + requests: impl IntoIterator, + ) -> Result< + ::connectrpc::client::UnaryResponse< + ::buffa::view::OwnedView>, + >, + ::connectrpc::ConnectError, + > { + self.put_many_with_options( + requests, + ::connectrpc::client::CallOptions::default(), + ) + .await + } + /// Call the PutMany RPC with explicit per-call options. Options override [`connectrpc::client::ClientConfig`] defaults. + pub async fn put_many_with_options( + &self, + requests: impl IntoIterator, + options: ::connectrpc::client::CallOptions, + ) -> Result< + ::connectrpc::client::UnaryResponse< + ::buffa::view::OwnedView>, + >, + ::connectrpc::ConnectError, + > { + ::connectrpc::client::call_client_stream( + &self.transport, + &self.config, + SERVICE_SERVICE_NAME, + "PutMany", + requests, + options, + ) + .await + } } diff --git a/sdk/rs/src/lib.rs b/sdk/rs/src/lib.rs index ad825f10..134e52bf 100644 --- a/sdk/rs/src/lib.rs +++ b/sdk/rs/src/lib.rs @@ -54,6 +54,7 @@ use std::time::Duration; const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 3; const DEFAULT_RETRY_INITIAL_BACKOFF_MS: u64 = 100; const DEFAULT_RETRY_MAX_BACKOFF_MS: u64 = 2_000; +const PUT_MANY_CHUNK_ENTRIES: usize = 4096; /// Codec used to compress **outgoing** RPC request bodies when compression applies. /// @@ -282,11 +283,11 @@ impl StoreKeyPrefix { /// A physical Store write batch assembled from one or more logical clients. /// /// Use [`Self::push`] with the specific prefixed client that produced each -/// logical key, then [`Self::commit`] once to submit all rows in one atomic -/// Store `Put`. +/// logical key, then [`Self::finish`] once to submit all rows in one atomic +/// streaming Store `PutMany`. #[derive(Clone, Debug, Default)] pub struct StoreWriteBatch { - entries: Vec<(Key, Bytes)>, + kvs: Vec, } impl StoreWriteBatch { @@ -294,16 +295,28 @@ impl StoreWriteBatch { Self::default() } + pub fn with_capacity(capacity: usize) -> Self { + Self { + kvs: Vec::with_capacity(capacity), + } + } + pub fn len(&self) -> usize { - self.entries.len() + self.kvs.len() } pub fn is_empty(&self) -> bool { - self.entries.is_empty() + self.kvs.is_empty() } pub fn clear(&mut self) { - self.entries.clear(); + self.kvs.clear(); + } + + pub fn entries(&self) -> impl ExactSizeIterator { + self.kvs + .iter() + .map(|entry| (entry.key.as_slice(), entry.value.as_slice())) } pub fn push( @@ -312,18 +325,25 @@ impl StoreWriteBatch { key: &Key, value: &[u8], ) -> Result<&mut Self, ClientError> { - self.entries - .push((client.encode_store_key(key)?, Bytes::copy_from_slice(value))); + self.push_physical(&client.encode_store_key(key)?, value); Ok(self) } + fn push_physical(&mut self, key: &Key, value: &[u8]) -> &mut Self { + self.kvs.push(exoware_proto::common::KvEntry { + key: key.to_vec(), + value: value.to_vec(), + ..Default::default() + }); + self + } + + pub async fn finish(self, client: &StoreClient) -> Result { + client.put_many(self).await + } + pub async fn commit(&self, client: &StoreClient) -> Result { - let refs: Vec<(&Key, &[u8])> = self - .entries - .iter() - .map(|(key, value)| (key, value.as_ref())) - .collect(); - client.put_physical(&refs).await + client.put_many(self.clone()).await } } @@ -335,7 +355,7 @@ impl StoreWriteBatch { /// this trait provides the common lifecycle: /// /// 1. stage rows into a [`StoreWriteBatch`] -/// 2. commit that batch +/// 2. finish that batch /// 3. mark the prepared handle persisted with the returned Store sequence /// number, or failed if staging/commit does not complete pub trait StoreBatchUpload { @@ -1332,6 +1352,11 @@ impl StoreClient { Stream { c: self } } + /// Start assembling an atomic Store write batch incrementally. + pub fn write_batch(&self) -> StoreWriteBatch { + StoreWriteBatch::new() + } + /// Submit a KV batch via Connect `Put`. /// /// On success returns the **store sequence number** from the response. Use it for immediate @@ -1339,24 +1364,9 @@ impl StoreClient { /// [`Self::create_session_with_sequence`]. /// If the request succeeds, the server accepts the full batch (count is `kvs.len()`). pub(crate) async fn put(&self, kvs: &[(&Key, &[u8])]) -> Result { - if self.key_prefix.is_none() { - return self.put_physical(kvs).await; - } - let mut keys = Vec::with_capacity(kvs.len()); - for (key, _) in kvs { - keys.push(self.encode_store_key(key)?); - } - let prefixed: Vec<(&Key, &[u8])> = keys - .iter() - .zip(kvs.iter()) - .map(|(key, (_, value))| (key, *value)) - .collect(); - self.put_physical(&prefixed).await - } - - async fn put_physical(&self, kvs: &[(&Key, &[u8])]) -> Result { let mut proto_kvs = Vec::with_capacity(kvs.len()); for (key, value) in kvs { + let key = self.encode_store_key(key)?; if !is_valid_key_size(key.len()) { return Err(ClientError::WireFormat(format!( "key length {} is outside valid store key range ({}..={})", @@ -1366,12 +1376,11 @@ impl StoreClient { ))); } proto_kvs.push(exoware_proto::common::KvEntry { - key: (*key).to_vec(), + key: key.to_vec(), value: value.to_vec(), ..Default::default() }); } - let config = store_connect_client_config(self.ingest_uri.clone(), self.connect_request_compression); let client = IngestServiceClient::new(self.connect_http.clone(), config); @@ -1385,6 +1394,40 @@ impl StoreClient { Ok(response.into_owned().sequence_number) } + async fn put_many(&self, batch: StoreWriteBatch) -> Result { + for entry in &batch.kvs { + if !is_valid_key_size(entry.key.len()) { + return Err(ClientError::WireFormat(format!( + "key length {} is outside valid store key range ({}..={})", + entry.key.len(), + keys::MIN_KEY_LEN, + MAX_KEY_LEN + ))); + } + } + + let config = + store_connect_client_config(self.ingest_uri.clone(), self.connect_request_compression); + let client = IngestServiceClient::new(self.connect_http.clone(), config); + let mut kvs = batch.kvs.into_iter(); + let requests = std::iter::from_fn(move || { + let chunk: Vec<_> = kvs.by_ref().take(PUT_MANY_CHUNK_ENTRIES).collect(); + if chunk.is_empty() { + None + } else { + Some(ProtoPutRequest { + kvs: chunk, + ..Default::default() + }) + } + }); + let response = client + .put_many(requests) + .await + .map_err(client_error_from_connect)?; + Ok(response.into_owned().sequence_number) + } + pub(crate) async fn get(&self, key: &Key) -> Result, ClientError> { self.get_internal(key, None).await } @@ -2103,6 +2146,10 @@ pub struct Stream<'a> { } impl<'a> Ingest<'a> { + pub fn write_batch(&self) -> StoreWriteBatch { + self.c.write_batch() + } + pub async fn put(&self, kvs: &[(&Key, &[u8])]) -> Result { self.c.put(kvs).await } @@ -2112,6 +2159,10 @@ impl<'a> Ingest<'a> { pub async fn put_prepared(&self, batch: &StoreWriteBatch) -> Result { batch.commit(self.c).await } + + pub async fn put_many(&self, batch: StoreWriteBatch) -> Result { + batch.finish(self.c).await + } } impl<'a> Query<'a> { @@ -3017,12 +3068,12 @@ mod tests { batch.push(&b, &key_b, b"vb").unwrap(); assert_eq!( - batch.entries[0].0, - a.key_prefix().unwrap().encode_key(&key_a).unwrap() + batch.kvs[0].key, + a.key_prefix().unwrap().encode_key(&key_a).unwrap().to_vec() ); assert_eq!( - batch.entries[1].0, - b.key_prefix().unwrap().encode_key(&key_b).unwrap() + batch.kvs[1].key, + b.key_prefix().unwrap().encode_key(&key_b).unwrap().to_vec() ); } diff --git a/sdk/ts/README.md b/sdk/ts/README.md index f412dd71..d6731f10 100644 --- a/sdk/ts/README.md +++ b/sdk/ts/README.md @@ -22,7 +22,7 @@ const accounts = base.withKeyPrefix(new StoreKeyPrefix(4, 2)); const batch = new StoreWriteBatch() .push(orders, orderKey, orderValue) .push(accounts, accountKey, accountValue); -const sequence = await batch.commit(base); +const sequence = await batch.finish(base); ``` ## Generated TypeScript (`gen/ts`) diff --git a/sdk/ts/__tests__/sdk.test.ts b/sdk/ts/__tests__/sdk.test.ts index a6323090..50e30722 100644 --- a/sdk/ts/__tests__/sdk.test.ts +++ b/sdk/ts/__tests__/sdk.test.ts @@ -203,7 +203,7 @@ describe('Exoware TS SDK', () => { const batch = new StoreWriteBatch(); batch.push(a, key, Buffer.from('value-a')); batch.push(b, key, Buffer.from('value-b')); - const sequenceNumber = await batch.commit(base); + const sequenceNumber = await batch.finish(base); const resultA = await a.get(key); const resultB = await b.get(key); diff --git a/sdk/ts/buf.gen.yaml b/sdk/ts/buf.gen.yaml index 7621b7e1..f28e0607 100644 --- a/sdk/ts/buf.gen.yaml +++ b/sdk/ts/buf.gen.yaml @@ -1,6 +1,6 @@ version: v2 plugins: - - remote: buf.build/bufbuild/es:v2.2.3 + - remote: buf.build/bufbuild/es:v2.11.0 out: src/gen/ts opt: - target=ts diff --git a/sdk/ts/src/client.ts b/sdk/ts/src/client.ts index 09ada1ef..cbd76ee1 100644 --- a/sdk/ts/src/client.ts +++ b/sdk/ts/src/client.ts @@ -85,6 +85,7 @@ export function createTransport(baseUrl: string, tokenOrOptions?: string | Clien export class Client { public readonly baseUrl: string; + public readonly token?: string; public readonly compact: ConnectClient; public readonly ingest: ConnectClient; public readonly query: ConnectClient; @@ -94,6 +95,7 @@ export class Client { constructor(baseUrl: string, tokenOrOptions?: string | ClientOptions) { const opts = normalizeClientOptions(tokenOrOptions); this.baseUrl = baseUrl.replace(/\/$/, ''); + this.token = opts.token; this.retryConfig = opts.retry ?? DEFAULT_RETRY_CONFIG; const transport = createTransport(this.baseUrl, opts); this.compact = createClient(CompactService, transport); diff --git a/sdk/ts/src/gen/ts/buf/validate/validate_pb.ts b/sdk/ts/src/gen/ts/buf/validate/validate_pb.ts index f205e4fb..4b1233cb 100644 --- a/sdk/ts/src/gen/ts/buf/validate/validate_pb.ts +++ b/sdk/ts/src/gen/ts/buf/validate/validate_pb.ts @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// @generated by protoc-gen-es v2.2.3 with parameter "target=ts,import_extension=js" +// @generated by protoc-gen-es v2.11.0 with parameter "target=ts,import_extension=js" // @generated from file buf/validate/validate.proto (package buf.validate, syntax proto2) /* eslint-disable */ @@ -45,8 +45,8 @@ // [Python](https://github.com/bufbuild/protovalidate-python), // or [C++](https://github.com/bufbuild/protovalidate-cc). -import type { GenEnum, GenExtension, GenFile, GenMessage } from "@bufbuild/protobuf/codegenv1"; -import { enumDesc, extDesc, fileDesc, messageDesc } from "@bufbuild/protobuf/codegenv1"; +import type { GenEnum, GenExtension, GenFile, GenMessage } from "@bufbuild/protobuf/codegenv2"; +import { enumDesc, extDesc, fileDesc, messageDesc } from "@bufbuild/protobuf/codegenv2"; import type { Duration, FieldDescriptorProto_Type, FieldMask, FieldOptions, MessageOptions, OneofOptions, Timestamp } from "@bufbuild/protobuf/wkt"; import { file_google_protobuf_descriptor, file_google_protobuf_duration, file_google_protobuf_field_mask, file_google_protobuf_timestamp } from "@bufbuild/protobuf/wkt"; import type { Message } from "@bufbuild/protobuf"; diff --git a/sdk/ts/src/gen/ts/google/rpc/error_details_pb.ts b/sdk/ts/src/gen/ts/google/rpc/error_details_pb.ts index f2548e0e..a6b9379c 100644 --- a/sdk/ts/src/gen/ts/google/rpc/error_details_pb.ts +++ b/sdk/ts/src/gen/ts/google/rpc/error_details_pb.ts @@ -2,12 +2,12 @@ // Wire-compatible with the original (same field numbers and types). // Only the types used by the store API are included here. -// @generated by protoc-gen-es v2.2.3 with parameter "target=ts,import_extension=js" +// @generated by protoc-gen-es v2.11.0 with parameter "target=ts,import_extension=js" // @generated from file google/rpc/error_details.proto (package google.rpc, syntax proto3) /* eslint-disable */ -import type { GenFile, GenMessage } from "@bufbuild/protobuf/codegenv1"; -import { fileDesc, messageDesc } from "@bufbuild/protobuf/codegenv1"; +import type { GenFile, GenMessage } from "@bufbuild/protobuf/codegenv2"; +import { fileDesc, messageDesc } from "@bufbuild/protobuf/codegenv2"; import type { Duration } from "@bufbuild/protobuf/wkt"; import { file_google_protobuf_duration } from "@bufbuild/protobuf/wkt"; import type { Message } from "@bufbuild/protobuf"; diff --git a/sdk/ts/src/gen/ts/store/v1/common_pb.ts b/sdk/ts/src/gen/ts/store/v1/common_pb.ts index a803ba1d..abafba88 100644 --- a/sdk/ts/src/gen/ts/store/v1/common_pb.ts +++ b/sdk/ts/src/gen/ts/store/v1/common_pb.ts @@ -1,9 +1,9 @@ -// @generated by protoc-gen-es v2.2.3 with parameter "target=ts,import_extension=js" +// @generated by protoc-gen-es v2.11.0 with parameter "target=ts,import_extension=js" // @generated from file store/v1/common.proto (package store.common.v1, syntax proto3) /* eslint-disable */ -import type { GenFile, GenMessage } from "@bufbuild/protobuf/codegenv1"; -import { fileDesc, messageDesc } from "@bufbuild/protobuf/codegenv1"; +import type { GenFile, GenMessage } from "@bufbuild/protobuf/codegenv2"; +import { fileDesc, messageDesc } from "@bufbuild/protobuf/codegenv2"; import { file_buf_validate_validate } from "../../buf/validate/validate_pb.js"; import type { Message } from "@bufbuild/protobuf"; diff --git a/sdk/ts/src/gen/ts/store/v1/compact_pb.ts b/sdk/ts/src/gen/ts/store/v1/compact_pb.ts index c71d238d..f258c95c 100644 --- a/sdk/ts/src/gen/ts/store/v1/compact_pb.ts +++ b/sdk/ts/src/gen/ts/store/v1/compact_pb.ts @@ -1,9 +1,9 @@ -// @generated by protoc-gen-es v2.2.3 with parameter "target=ts,import_extension=js" +// @generated by protoc-gen-es v2.11.0 with parameter "target=ts,import_extension=js" // @generated from file store/v1/compact.proto (package store.compact.v1, syntax proto3) /* eslint-disable */ -import type { GenEnum, GenFile, GenMessage, GenService } from "@bufbuild/protobuf/codegenv1"; -import { enumDesc, fileDesc, messageDesc, serviceDesc } from "@bufbuild/protobuf/codegenv1"; +import type { GenEnum, GenFile, GenMessage, GenService } from "@bufbuild/protobuf/codegenv2"; +import { enumDesc, fileDesc, messageDesc, serviceDesc } from "@bufbuild/protobuf/codegenv2"; import { file_buf_validate_validate } from "../../buf/validate/validate_pb.js"; import type { MatchKey } from "./common_pb.js"; import { file_store_v1_common } from "./common_pb.js"; diff --git a/sdk/ts/src/gen/ts/store/v1/ingest_pb.ts b/sdk/ts/src/gen/ts/store/v1/ingest_pb.ts index 39b16b95..b6f6b999 100644 --- a/sdk/ts/src/gen/ts/store/v1/ingest_pb.ts +++ b/sdk/ts/src/gen/ts/store/v1/ingest_pb.ts @@ -1,9 +1,9 @@ -// @generated by protoc-gen-es v2.2.3 with parameter "target=ts,import_extension=js" +// @generated by protoc-gen-es v2.11.0 with parameter "target=ts,import_extension=js" // @generated from file store/v1/ingest.proto (package store.ingest.v1, syntax proto3) /* eslint-disable */ -import type { GenFile, GenMessage, GenService } from "@bufbuild/protobuf/codegenv1"; -import { fileDesc, messageDesc, serviceDesc } from "@bufbuild/protobuf/codegenv1"; +import type { GenFile, GenMessage, GenService } from "@bufbuild/protobuf/codegenv2"; +import { fileDesc, messageDesc, serviceDesc } from "@bufbuild/protobuf/codegenv2"; import { file_buf_validate_validate } from "../../buf/validate/validate_pb.js"; import type { KvEntry } from "./common_pb.js"; import { file_store_v1_common } from "./common_pb.js"; @@ -13,7 +13,7 @@ import type { Message } from "@bufbuild/protobuf"; * Describes the file store/v1/ingest.proto. */ export const file_store_v1_ingest: GenFile = /*@__PURE__*/ - fileDesc("ChVzdG9yZS92MS9pbmdlc3QucHJvdG8SD3N0b3JlLmluZ2VzdC52MSI9CgpQdXRSZXF1ZXN0Ei8KA2t2cxgBIAMoCzIYLnN0b3JlLmNvbW1vbi52MS5LdkVudHJ5Qgi6SAWSAQIIASImCgtQdXRSZXNwb25zZRIXCg9zZXF1ZW5jZV9udW1iZXIYASABKAQySwoHU2VydmljZRJACgNQdXQSGy5zdG9yZS5pbmdlc3QudjEuUHV0UmVxdWVzdBocLnN0b3JlLmluZ2VzdC52MS5QdXRSZXNwb25zZWIGcHJvdG8z", [file_buf_validate_validate, file_store_v1_common]); + fileDesc("ChVzdG9yZS92MS9pbmdlc3QucHJvdG8SD3N0b3JlLmluZ2VzdC52MSI9CgpQdXRSZXF1ZXN0Ei8KA2t2cxgBIAMoCzIYLnN0b3JlLmNvbW1vbi52MS5LdkVudHJ5Qgi6SAWSAQIIASImCgtQdXRSZXNwb25zZRIXCg9zZXF1ZW5jZV9udW1iZXIYASABKAQykwEKB1NlcnZpY2USQAoDUHV0Ehsuc3RvcmUuaW5nZXN0LnYxLlB1dFJlcXVlc3QaHC5zdG9yZS5pbmdlc3QudjEuUHV0UmVzcG9uc2USRgoHUHV0TWFueRIbLnN0b3JlLmluZ2VzdC52MS5QdXRSZXF1ZXN0Ghwuc3RvcmUuaW5nZXN0LnYxLlB1dFJlc3BvbnNlKAFiBnByb3RvMw", [file_buf_validate_validate, file_store_v1_common]); /** * Batch write request. All pairs are applied atomically. @@ -78,6 +78,18 @@ export const Service: GenService<{ input: typeof PutRequestSchema; output: typeof PutResponseSchema; }, + /** + * Atomically write a streamed batch of key-value pairs. Each request chunk + * carries one or more pairs, and the server persists the combined stream as + * a single Store batch after the client closes the stream. + * + * @generated from rpc store.ingest.v1.Service.PutMany + */ + putMany: { + methodKind: "client_streaming"; + input: typeof PutRequestSchema; + output: typeof PutResponseSchema; + }, }> = /*@__PURE__*/ serviceDesc(file_store_v1_ingest, 0); diff --git a/sdk/ts/src/gen/ts/store/v1/query_pb.ts b/sdk/ts/src/gen/ts/store/v1/query_pb.ts index 0ab35b42..de290627 100644 --- a/sdk/ts/src/gen/ts/store/v1/query_pb.ts +++ b/sdk/ts/src/gen/ts/store/v1/query_pb.ts @@ -1,9 +1,9 @@ -// @generated by protoc-gen-es v2.2.3 with parameter "target=ts,import_extension=js" +// @generated by protoc-gen-es v2.11.0 with parameter "target=ts,import_extension=js" // @generated from file store/v1/query.proto (package store.query.v1, syntax proto3) /* eslint-disable */ -import type { GenEnum, GenFile, GenMessage, GenService } from "@bufbuild/protobuf/codegenv1"; -import { enumDesc, fileDesc, messageDesc, serviceDesc } from "@bufbuild/protobuf/codegenv1"; +import type { GenEnum, GenFile, GenMessage, GenService } from "@bufbuild/protobuf/codegenv2"; +import { enumDesc, fileDesc, messageDesc, serviceDesc } from "@bufbuild/protobuf/codegenv2"; import { file_buf_validate_validate } from "../../buf/validate/validate_pb.js"; import type { Value } from "@bufbuild/protobuf/wkt"; import { file_google_protobuf_struct } from "@bufbuild/protobuf/wkt"; diff --git a/sdk/ts/src/gen/ts/store/v1/stream_pb.ts b/sdk/ts/src/gen/ts/store/v1/stream_pb.ts index f74d2e01..206b6495 100644 --- a/sdk/ts/src/gen/ts/store/v1/stream_pb.ts +++ b/sdk/ts/src/gen/ts/store/v1/stream_pb.ts @@ -1,9 +1,9 @@ -// @generated by protoc-gen-es v2.2.3 with parameter "target=ts,import_extension=js" +// @generated by protoc-gen-es v2.11.0 with parameter "target=ts,import_extension=js" // @generated from file store/v1/stream.proto (package store.stream.v1, syntax proto3) /* eslint-disable */ -import type { GenFile, GenMessage, GenService } from "@bufbuild/protobuf/codegenv1"; -import { fileDesc, messageDesc, serviceDesc } from "@bufbuild/protobuf/codegenv1"; +import type { GenFile, GenMessage, GenService } from "@bufbuild/protobuf/codegenv2"; +import { fileDesc, messageDesc, serviceDesc } from "@bufbuild/protobuf/codegenv2"; import { file_buf_validate_validate } from "../../buf/validate/validate_pb.js"; import type { BytesFilter, KvEntry, MatchKey } from "./common_pb.js"; import { file_store_v1_common } from "./common_pb.js"; diff --git a/sdk/ts/src/store.ts b/sdk/ts/src/store.ts index 0aa3cc42..933a3c8e 100644 --- a/sdk/ts/src/store.ts +++ b/sdk/ts/src/store.ts @@ -1,4 +1,4 @@ -import { create, type MessageInitShape } from '@bufbuild/protobuf'; +import { create, fromBinary, toBinary, type MessageInitShape } from '@bufbuild/protobuf'; import { Code, ConnectError } from '@connectrpc/connect'; import type { CallOptions } from '@connectrpc/connect'; import type { Client } from './client.js'; @@ -10,9 +10,10 @@ import { KvEntrySchema, MatchKeySchema, } from './gen/ts/store/v1/common_pb.js'; -import type { MatchKey } from './gen/ts/store/v1/common_pb.js'; +import type { KvEntry, MatchKey } from './gen/ts/store/v1/common_pb.js'; import { ErrorInfoSchema } from './gen/ts/google/rpc/error_details_pb.js'; -import { PutRequestSchema } from './gen/ts/store/v1/ingest_pb.js'; +import { PutRequestSchema, PutResponseSchema } from './gen/ts/store/v1/ingest_pb.js'; +import type { PutRequest } from './gen/ts/store/v1/ingest_pb.js'; import { GetManyRequestSchema, GetRequestSchema as QueryGetRequestSchema, @@ -35,6 +36,9 @@ import { } from './gen/ts/store/v1/stream_pb.js'; const STREAM_SERVER_PAYLOAD_REGEX = '(?s-u).*'; +const CONNECT_STREAM_CONTENT_TYPE = 'application/connect+proto'; +const CONNECT_END_STREAM_FLAG = 2; +const PUT_MANY_CHUNK_ENTRIES = 4096; type DetailObserver = (detail: Detail) => void; @@ -257,6 +261,75 @@ function copyBytes(value: Uint8Array | Buffer): Uint8Array { return new Uint8Array(toUint8Array(value)); } +function encodeConnectEnvelope(message: Uint8Array): Uint8Array { + const envelope = new Uint8Array(5 + message.length); + envelope[0] = 0; + new DataView(envelope.buffer, envelope.byteOffset + 1, 4).setUint32(0, message.length); + envelope.set(message, 5); + return envelope; +} + +function putManyBody(requests: AsyncIterable): ReadableStream { + const iterator = requests[Symbol.asyncIterator](); + return new ReadableStream({ + async pull(controller) { + const next = await iterator.next(); + if (next.done === true) { + controller.close(); + return; + } + controller.enqueue(encodeConnectEnvelope(toBinary(PutRequestSchema, next.value))); + }, + async cancel(reason) { + if (iterator.throw) { + try { + await iterator.throw(reason); + } catch { + // Iterator cancellation is best-effort. + } + } + }, + }); +} + +async function readPutManyResponse(response: Response): Promise { + const bytes = new Uint8Array(await response.arrayBuffer()); + let offset = 0; + let sequenceNumber: bigint | undefined; + while (offset < bytes.length) { + if (offset + 5 > bytes.length) { + throw new Error('malformed PutMany response envelope'); + } + const flags = bytes[offset]; + const len = new DataView(bytes.buffer, bytes.byteOffset + offset + 1, 4).getUint32(0); + offset += 5; + if (offset + len > bytes.length) { + throw new Error('truncated PutMany response envelope'); + } + const data = bytes.subarray(offset, offset + len); + offset += len; + if ((flags & CONNECT_END_STREAM_FLAG) === CONNECT_END_STREAM_FLAG) { + if (data.length > 0) { + const end = JSON.parse(new TextDecoder().decode(data)) as { + error?: { message?: string }; + }; + if (end.error) { + throw new Error(end.error.message ?? 'PutMany stream failed'); + } + } + continue; + } + if (sequenceNumber !== undefined) { + throw new Error('PutMany response contained multiple messages'); + } + sequenceNumber = fromBinary(PutResponseSchema, data).sequenceNumber; + } + if (sequenceNumber === undefined) { + throw new Error('PutMany response contained no message'); + } + return sequenceNumber; +} + function encodeStoreKey(prefix: StoreKeyPrefix | undefined, key: Uint8Array): Uint8Array { return prefix ? prefix.encodeKey(key) : key; } @@ -280,13 +353,15 @@ function encodeStoreRange( } export class StoreWriteBatch { - private readonly kvs: StoreBatchEntry[] = []; + private readonly kvs: KvEntry[] = []; push(client: StoreClient, key: Uint8Array, value: Uint8Array | Buffer): this { - this.kvs.push({ - key: client.encodeStoreKey(key), - value: copyBytes(value), - }); + this.kvs.push( + create(KvEntrySchema, { + key: copyBytes(client.encodeStoreKey(key)), + value: copyBytes(value), + }), + ); return this; } @@ -303,7 +378,11 @@ export class StoreWriteBatch { } async commit(client: StoreClient): Promise { - return client.putPrepared(this); + return this.finish(client); + } + + async finish(client: StoreClient): Promise { + return client.putMany(this); } } @@ -988,6 +1067,10 @@ export class StoreClient { return new SerializableReadSession(this.client, this.keyPrefix, sequence); } + writeBatch(): StoreWriteBatch { + return new StoreWriteBatch(); + } + async set(key: Uint8Array, value: Uint8Array | Buffer): Promise { const req = create(PutRequestSchema, { kvs: [ @@ -1023,17 +1106,45 @@ export class StoreClient { } async putPrepared(batch: StoreWriteBatch): Promise { - const req = create(PutRequestSchema, { - kvs: batch.entries().map((kv) => - create(KvEntrySchema, { - key: kv.key, - value: kv.value, - }), - ), - }); + return this.putMany(batch); + } + + async putMany(batch: StoreWriteBatch): Promise { + async function* requests() { + const entries = batch.entries(); + for (let start = 0; start < entries.length; start += PUT_MANY_CHUNK_ENTRIES) { + yield create(PutRequestSchema, { + kvs: entries.slice(start, start + PUT_MANY_CHUNK_ENTRIES).map((kv) => + create(KvEntrySchema, { + key: kv.key, + value: kv.value, + }), + ), + }); + } + } try { - const res = await this.client.ingest.put(req); - return res.sequenceNumber; + const headers = new Headers({ + 'Content-Type': CONNECT_STREAM_CONTENT_TYPE, + 'Connect-Protocol-Version': '1', + }); + if (this.client.token !== undefined) { + headers.set('Authorization', `Bearer ${this.client.token}`); + } + const init: RequestInit & { duplex: 'half' } = { + method: 'POST', + headers, + body: putManyBody(requests()), + duplex: 'half', + }; + const res = await fetch( + `${this.client.baseUrl}/store.ingest.v1.Service/PutMany`, + init, + ); + if (!res.ok) { + throw new Error(`PutMany failed with HTTP ${res.status}: ${await res.text()}`); + } + return await readPutManyResponse(res); } catch (e) { mapConnectToHttpError(e); } diff --git a/server/src/connect.rs b/server/src/connect.rs index 8646457e..e9c15970 100644 --- a/server/src/connect.rs +++ b/server/src/connect.rs @@ -33,7 +33,7 @@ use exoware_sdk as exoware_proto; use exoware_sdk::keys::Key; use exoware_sdk::match_key::MatchKey; use exoware_sdk::store::common::v1::bytes_filter::KindView as ProtoBytesFilterKindView; -use futures::{stream as stream_util, Stream}; +use futures::{stream as stream_util, Stream, StreamExt}; use tokio::sync::Notify; use crate::reduce::RangeReducer; @@ -350,17 +350,8 @@ where state: state.into(), } } -} -impl IngestApi for IngestConnect -where - I: Ingest, -{ - async fn put( - &self, - _ctx: Context, - request: buffa::view::OwnedView>, - ) -> Result<(ProtoPutResponse, Context), ConnectError> { + fn ensure_ready(&self) -> Result<(), ConnectError> { if !self.state.ready.load(Ordering::SeqCst) { return Err(with_error_info_detail( ConnectError::unavailable("ingest is not ready"), @@ -371,7 +362,35 @@ where }, )); } + Ok(()) + } + async fn commit_ingest_batch(&self, batch: Vec<(Bytes, Bytes)>) -> Result { + let seq = self + .state + .ingest + .put_batch(batch) + .await + .map_err(ConnectError::internal)?; + + if let Some(notifier) = &self.state.notifier { + notifier.advance(seq); + } + + Ok(seq) + } +} + +impl IngestApi for IngestConnect +where + I: Ingest, +{ + async fn put( + &self, + _ctx: Context, + request: buffa::view::OwnedView>, + ) -> Result<(ProtoPutResponse, Context), ConnectError> { + self.ensure_ready()?; validate::validate_put_request(&request, self.state.limits)?; let wire = request.bytes(); @@ -382,18 +401,52 @@ where batch.push((key, value)); } - let seq = self - .state - .ingest - .put_batch(batch) - .await - .map_err(ConnectError::internal)?; + let seq = self.commit_ingest_batch(batch).await?; - // Advance any attached stream frontier after the write is committed. - if let Some(notifier) = &self.state.notifier { - notifier.advance(seq); + Ok(( + ProtoPutResponse { + sequence_number: seq, + ..Default::default() + }, + Context::default(), + )) + } + + async fn put_many( + &self, + _ctx: Context, + mut requests: Pin< + Box< + dyn Stream< + Item = Result< + buffa::view::OwnedView< + exoware_proto::store::ingest::v1::PutRequestView<'static>, + >, + ConnectError, + >, + > + Send, + >, + >, + ) -> Result<(ProtoPutResponse, Context), ConnectError> { + self.ensure_ready()?; + + let mut batch = Vec::new(); + while let Some(request) = requests.next().await { + let request = request?; + validate::validate_put_request(&request, self.state.limits)?; + + let wire = request.bytes(); + batch.reserve(request.kvs.len()); + for kv in request.kvs.iter() { + let key: Key = wire.slice_ref(kv.key); + let value = wire.slice_ref(kv.value); + batch.push((key, value)); + } } + validate::validate_put_many_entries(batch.len())?; + let seq = self.commit_ingest_batch(batch).await?; + Ok(( ProtoPutResponse { sequence_number: seq, diff --git a/server/src/validate.rs b/server/src/validate.rs index 59364f86..0ded8998 100644 --- a/server/src/validate.rs +++ b/server/src/validate.rs @@ -134,6 +134,20 @@ pub fn validate_put_request( Ok(()) } +pub fn validate_put_many_entries(len: usize) -> Result<(), ConnectError> { + if len == 0 { + return Err(field_error( + "store.ingest", + "kvs", + "at least one key-value pair is required", + "INVALID_BATCH", + "put many stream must contain at least one key-value pair", + [], + )); + } + Ok(()) +} + // -- query -- pub fn validate_get_request(