Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions connectrpc/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,28 @@ pub mod header {
}

/// Encode a protobuf message to binary format.
#[inline]
pub fn encode_proto<M: Message>(message: &M) -> Result<Bytes, ConnectError> {
Ok(message.encode_to_bytes())
}

/// Decode bytes into a protobuf message.
#[inline]
pub fn decode_proto<M: Message>(data: &[u8]) -> Result<M, ConnectError> {
M::decode_from_slice(data)
.map_err(|e| ConnectError::invalid_argument(format!("failed to decode proto: {e}")))
}

/// Encode a message to JSON format.
#[inline]
pub fn encode_json<M: Serialize>(message: &M) -> Result<Bytes, ConnectError> {
serde_json::to_vec(message)
.map(Bytes::from)
.map_err(|e| ConnectError::internal(format!("failed to encode JSON: {e}")))
}

/// Decode JSON bytes into a message.
#[inline]
pub fn decode_json<M: DeserializeOwned>(data: &[u8]) -> Result<M, ConnectError> {
serde_json::from_slice(data)
.map_err(|e| ConnectError::invalid_argument(format!("failed to decode JSON: {e}")))
Expand All @@ -64,16 +68,19 @@ pub struct ProtoCodec;

impl ProtoCodec {
/// Get the content type for this codec.
#[inline]
pub fn content_type() -> &'static str {
content_type::PROTO
}

/// Encode a protobuf message to bytes.
#[inline]
pub fn encode<M: Message>(message: &M) -> Result<Bytes, ConnectError> {
encode_proto(message)
}

/// Decode bytes into a protobuf message.
#[inline]
pub fn decode<M: Message>(data: &[u8]) -> Result<M, ConnectError> {
decode_proto(data)
}
Expand All @@ -85,16 +92,19 @@ pub struct JsonCodec;

impl JsonCodec {
/// Get the content type for this codec.
#[inline]
pub fn content_type() -> &'static str {
content_type::JSON
}

/// Encode a message to JSON bytes.
#[inline]
pub fn encode<M: Serialize>(message: &M) -> Result<Bytes, ConnectError> {
encode_json(message)
}

/// Decode JSON bytes into a message.
#[inline]
pub fn decode<M: DeserializeOwned>(data: &[u8]) -> Result<M, ConnectError> {
decode_json(data)
}
Expand Down
3 changes: 3 additions & 0 deletions connectrpc/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,14 @@ impl CompressionRegistry {
/// Get a provider by encoding name.
///
/// Returns `None` if no provider is registered for the given name.
#[inline]
#[must_use]
pub fn get(&self, name: &str) -> Option<Arc<dyn CompressionProvider>> {
self.providers.get(name).cloned()
}

/// Check if a provider is registered for the given encoding name.
#[inline]
pub fn supports(&self, name: &str) -> bool {
self.providers.contains_key(name)
}
Expand All @@ -254,6 +256,7 @@ impl CompressionRegistry {
///
/// Useful for Accept-Encoding headers. The string is computed once when
/// providers are registered and cached, so this is a cheap lookup.
#[inline]
pub fn accept_encoding_header(&self) -> &str {
&self.accept_encoding
}
Expand Down
7 changes: 6 additions & 1 deletion connectrpc/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct Envelope {

impl Envelope {
/// Create a new data envelope.
#[inline]
pub fn data(data: Bytes) -> Self {
Self {
flags: flags::DATA,
Expand All @@ -47,6 +48,7 @@ impl Envelope {
}

/// Create a new compressed data envelope.
#[inline]
pub fn compressed(data: Bytes) -> Self {
Self {
flags: flags::COMPRESSED,
Expand All @@ -55,6 +57,7 @@ impl Envelope {
}

/// Create a new end-stream envelope.
#[inline]
pub fn end_stream(data: Bytes) -> Self {
Self {
flags: flags::END_STREAM,
Expand All @@ -63,11 +66,13 @@ impl Envelope {
}

/// Check if this is a compressed message.
#[inline]
pub fn is_compressed(&self) -> bool {
self.flags & flags::COMPRESSED != 0
}

/// Check if this is an end-of-stream message.
#[inline]
pub fn is_end_stream(&self) -> bool {
self.flags & flags::END_STREAM != 0
}
Expand Down Expand Up @@ -568,7 +573,7 @@ mod tests {
Some((Arc::clone(&registry), "gzip")),
CompressionPolicy::default(),
);
let mut dec = EnvelopeDecoder::new(1024, Some("gzip".to_owned()), registry);
let mut dec = EnvelopeDecoder::new(1024, Some("gzip".to_string()), registry);

let original = Bytes::from_static(b"roundtrip test data");
let mut buf = BytesMut::new();
Expand Down
16 changes: 16 additions & 0 deletions connectrpc/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,81 +275,97 @@ impl ConnectError {
}

/// Create a canceled error.
#[inline]
pub fn canceled(message: impl Into<String>) -> Self {
Self::new(ErrorCode::Canceled, message)
}

/// Create an unknown error.
#[inline]
pub fn unknown(message: impl Into<String>) -> Self {
Self::new(ErrorCode::Unknown, message)
}

/// Create an invalid argument error.
#[inline]
pub fn invalid_argument(message: impl Into<String>) -> Self {
Self::new(ErrorCode::InvalidArgument, message)
}

/// Create a deadline exceeded error.
#[inline]
pub fn deadline_exceeded(message: impl Into<String>) -> Self {
Self::new(ErrorCode::DeadlineExceeded, message)
}

/// Create a not found error.
#[inline]
pub fn not_found(message: impl Into<String>) -> Self {
Self::new(ErrorCode::NotFound, message)
}

/// Create an already exists error.
#[inline]
pub fn already_exists(message: impl Into<String>) -> Self {
Self::new(ErrorCode::AlreadyExists, message)
}

/// Create a permission denied error.
#[inline]
pub fn permission_denied(message: impl Into<String>) -> Self {
Self::new(ErrorCode::PermissionDenied, message)
}

/// Create a resource exhausted error.
#[inline]
pub fn resource_exhausted(message: impl Into<String>) -> Self {
Self::new(ErrorCode::ResourceExhausted, message)
}

/// Create a failed precondition error.
#[inline]
pub fn failed_precondition(message: impl Into<String>) -> Self {
Self::new(ErrorCode::FailedPrecondition, message)
}

/// Create an aborted error.
#[inline]
pub fn aborted(message: impl Into<String>) -> Self {
Self::new(ErrorCode::Aborted, message)
}

/// Create an out of range error.
#[inline]
pub fn out_of_range(message: impl Into<String>) -> Self {
Self::new(ErrorCode::OutOfRange, message)
}

/// Create an unimplemented error.
#[inline]
pub fn unimplemented(message: impl Into<String>) -> Self {
Self::new(ErrorCode::Unimplemented, message)
}

/// Create an internal error.
#[inline]
pub fn internal(message: impl Into<String>) -> Self {
Self::new(ErrorCode::Internal, message)
}

/// Create an unavailable error.
#[inline]
pub fn unavailable(message: impl Into<String>) -> Self {
Self::new(ErrorCode::Unavailable, message)
}

/// Create a data loss error.
#[inline]
pub fn data_loss(message: impl Into<String>) -> Self {
Self::new(ErrorCode::DataLoss, message)
}

/// Create an unauthenticated error.
#[inline]
pub fn unauthenticated(message: impl Into<String>) -> Self {
Self::new(ErrorCode::Unauthenticated, message)
}
Expand Down
2 changes: 1 addition & 1 deletion connectrpc/src/grpc_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ mod tests {
use base64::Engine;

let detail = ErrorDetail {
type_url: "type.googleapis.com/test.Detail".to_string(),
type_url: "type.googleapis.com/test.Detail".into(),
value: Some(base64::engine::general_purpose::STANDARD_NO_PAD.encode(b"\x01\x02\x03")),
debug: None,
};
Expand Down
4 changes: 4 additions & 0 deletions connectrpc/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub struct Context {

impl Context {
/// Create a new context with the given headers.
#[inline]
pub fn new(headers: http::HeaderMap) -> Self {
Self {
headers,
Expand All @@ -92,6 +93,7 @@ impl Context {
///
/// Used by the server dispatch paths to expose the parsed timeout
/// to handlers, allowing deadline propagation to downstream calls.
#[inline]
#[must_use]
pub fn with_deadline(mut self, deadline: Option<std::time::Instant>) -> Self {
self.deadline = deadline;
Expand All @@ -101,6 +103,7 @@ impl Context {
/// Attach request extensions captured from the underlying `http::Request`.
///
/// Used by the server dispatch paths; see [`Context::extensions`].
#[inline]
#[must_use]
pub fn with_extensions(mut self, extensions: http::Extensions) -> Self {
self.extensions = extensions;
Expand All @@ -118,6 +121,7 @@ impl Context {
}

/// Get a request header value.
#[inline]
pub fn header(&self, key: &http::header::HeaderName) -> Option<&http::header::HeaderValue> {
self.headers.get(key)
}
Expand Down
1 change: 1 addition & 0 deletions connectrpc/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl Protocol {
/// - `""` or `"+proto"` → Proto (default)
/// - `"+json"` → Json
/// - anything else → None
#[inline]
fn grpc_subtype_to_codec(suffix: &str) -> Option<CodecFormat> {
match suffix {
"" => Some(CodecFormat::Proto),
Expand Down
4 changes: 2 additions & 2 deletions connectrpc/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,8 +720,8 @@ fn headers_to_metadata(
) -> std::collections::HashMap<String, Vec<String>> {
let mut metadata = std::collections::HashMap::new();
for (key, value) in headers.iter() {
let key_str = key.as_str().to_owned();
let value_str = value.to_str().unwrap_or("").to_owned();
let key_str = key.as_str().to_string();
let value_str = value.to_str().unwrap_or("").to_string();
metadata
.entry(key_str)
.or_insert_with(Vec::new)
Expand Down
Loading