Skip to content

Commit 38670ba

Browse files
Deadlock issue + middleware DTO (#77)
* dylib Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * wip Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * wip Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * cdylib Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * Add ffi-primitives patch for local development Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * copilot eish * cargo fmt Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * makefile Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * build Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * build Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * build * openapi discrimonator Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * namespaced dtos Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * build Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * build Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * build Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * plugin registry Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * plugins Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * plugin directory Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * plugin upgrades Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * plugin command Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * pr feedback Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * simplify build Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * update examples Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * examples Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * plugin signing Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * plugin sigs Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * plugin signing Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * cargo fmt Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * clippy Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * fix tests Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * signature tampering Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * plug list Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * organize plugin commands Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * windows build Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * build scripts Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * windows build * gitignore * windows build Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * windows build Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * build Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * plugin integrity check Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * tests Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * fix yaml Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * pr feedback Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * plugin wildcards Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * pr feedback Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * lint * bump versions Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * update Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> * middleware dto * updates libs --------- Signed-off-by: Daniel Gerlag <daniel@gerlag.ca> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent f79cebc commit 38670ba

9 files changed

Lines changed: 180 additions & 89 deletions

File tree

Cargo.lock

Lines changed: 28 additions & 28 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ path = "src/main.rs"
3535

3636
[dependencies]
3737
# Drasi core library (all middleware, using system libjq instead of bundled-jq)
38-
drasi-lib = { version = "0.4.0", features = [
38+
drasi-lib = { version = "0.4.1", features = [
3939
"middleware-jq",
4040
"middleware-decoder",
4141
"middleware-map",
@@ -45,25 +45,28 @@ drasi-lib = { version = "0.4.0", features = [
4545
"middleware-unwind",
4646
] }
4747

48+
# Core models (for SourceMiddlewareConfig and other shared types)
49+
drasi-core = "0.4.1"
50+
4851
# Core bootstrap providers (used by the application API)
49-
drasi-bootstrap-noop = "0.1.8"
50-
drasi-bootstrap-application = "0.1.9"
52+
drasi-bootstrap-noop = "0.1.9"
53+
drasi-bootstrap-application = "0.1.10"
5154

5255
# Core reaction (used by the application API)
53-
drasi-reaction-application = "0.2.8"
56+
drasi-reaction-application = "0.2.9"
5457

5558
# Index plugins
56-
drasi-index-rocksdb = "0.3.0"
57-
drasi-index-garnet = "0.1.6"
59+
drasi-index-rocksdb = "0.3.1"
60+
drasi-index-garnet = "0.1.7"
5861

5962
# State store plugins
60-
drasi-state-store-redb = "0.1.6"
63+
drasi-state-store-redb = "0.1.7"
6164

6265
# Plugin SDK
63-
drasi-plugin-sdk = "0.4.1"
66+
drasi-plugin-sdk = "0.4.2"
6467

6568
# Host SDK for dynamic plugin loading
66-
drasi-host-sdk = { version = "0.4.1", features = ["registry", "fetcher"] }
69+
drasi-host-sdk = { version = "0.4.2", features = ["registry", "fetcher"] }
6770
oci-client = "0.16"
6871

6972
# Server-specific dependencies
@@ -114,7 +117,7 @@ futures = "0.3"
114117
tower = { version = "0.4", features = ["util"] }
115118
hyper = { version = "1.0", features = ["full"] }
116119
rstest = "0.18"
117-
drasi-core = "0.4.0"
120+
drasi-core = "0.4.1"
118121

119122
# Integration testing with testcontainers
120123
testcontainers = "0.23"

src/api/mappings/queries/mod.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
1717
use crate::api::mappings::{ConfigMapper, DtoMapper, MappingError};
1818
use crate::api::models::{QueryConfigDto, SourceSubscriptionConfigDto};
19+
use drasi_core::models::SourceMiddlewareConfig;
1920
use drasi_lib::channels::DispatchMode;
2021
use drasi_lib::config::{QueryConfig, SourceSubscriptionConfig};
22+
use std::sync::Arc;
2123

2224
pub struct QueryConfigMapper;
2325

@@ -33,8 +35,15 @@ impl ConfigMapper<QueryConfigDto, QueryConfig> for QueryConfigMapper {
3335
.map(map_source_subscription)
3436
.collect::<Result<Vec<_>, _>>()?;
3537

36-
// Middleware is empty for now (optional field, defaults to empty vec)
37-
let middleware = Vec::new();
38+
let middleware = dto
39+
.middleware
40+
.iter()
41+
.map(|m| SourceMiddlewareConfig {
42+
kind: Arc::from(m.kind.as_str()),
43+
name: Arc::from(m.name.as_str()),
44+
config: m.config.clone(),
45+
})
46+
.collect();
3847

3948
// Parse dispatch mode if provided
4049
let dispatch_mode = dto

src/api/models/queries/query.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use drasi_lib::config::QueryLanguage;
1818
use drasi_lib::QueryConfig;
1919
use serde::{Deserialize, Serialize};
20+
use serde_json::Map;
2021

2122
/// Query configuration DTO with camelCase serialization
2223
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
@@ -30,7 +31,8 @@ pub struct QueryConfigDto {
3031
#[serde(default = "default_query_language")]
3132
pub query_language: QueryLanguage,
3233
#[serde(default)]
33-
pub middleware: Vec<String>,
34+
#[schema(value_type = Vec<SourceMiddlewareConfig>)]
35+
pub middleware: Vec<SourceMiddlewareConfigDto>,
3436
#[serde(default)]
3537
#[schema(value_type = Vec<SourceSubscriptionConfig>)]
3638
pub sources: Vec<SourceSubscriptionConfigDto>,
@@ -64,6 +66,17 @@ pub struct SourceSubscriptionConfigDto {
6466
pub pipeline: Vec<String>,
6567
}
6668

69+
/// Source middleware configuration DTO with camelCase serialization
70+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
71+
#[schema(as = SourceMiddlewareConfig)]
72+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
73+
pub struct SourceMiddlewareConfigDto {
74+
pub kind: String,
75+
pub name: String,
76+
#[serde(default)]
77+
pub config: Map<String, serde_json::Value>,
78+
}
79+
6780
fn default_auto_start() -> bool {
6881
false
6982
}
@@ -90,7 +103,11 @@ impl From<QueryConfig> for QueryConfigDto {
90103
middleware: config
91104
.middleware
92105
.into_iter()
93-
.map(|m| m.name.to_string())
106+
.map(|m| SourceMiddlewareConfigDto {
107+
kind: m.kind.to_string(),
108+
name: m.name.to_string(),
109+
config: m.config,
110+
})
94111
.collect(),
95112
sources: config
96113
.sources

src/api/shared/error.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,50 @@
1414

1515
//! Error types and error handling utilities shared across API versions.
1616
17+
use axum::async_trait;
18+
use axum::extract::rejection::JsonRejection;
19+
use axum::extract::FromRequest;
1720
use axum::http::StatusCode;
1821
use drasi_lib::DrasiError;
19-
use serde::Serialize;
22+
use serde::{de::DeserializeOwned, Serialize};
2023
use utoipa::ToSchema;
2124

25+
/// A custom JSON extractor that returns detailed error messages on deserialization failure.
26+
///
27+
/// Drop-in replacement for `axum::Json<T>` that converts `JsonRejection` errors
28+
/// into structured `ErrorResponse` bodies with the serde error details included.
29+
#[derive(Debug, Clone, Copy, Default)]
30+
pub struct JsonBody<T>(pub T);
31+
32+
#[async_trait]
33+
impl<T, S> FromRequest<S> for JsonBody<T>
34+
where
35+
axum::Json<T>: FromRequest<S, Rejection = JsonRejection>,
36+
T: DeserializeOwned,
37+
S: Send + Sync,
38+
{
39+
type Rejection = (StatusCode, axum::Json<ErrorResponse>);
40+
41+
async fn from_request(
42+
req: axum::http::Request<axum::body::Body>,
43+
state: &S,
44+
) -> Result<Self, Self::Rejection> {
45+
match axum::Json::<T>::from_request(req, state).await {
46+
Ok(axum::Json(value)) => Ok(JsonBody(value)),
47+
Err(rejection) => {
48+
let message = rejection.body_text();
49+
50+
log::debug!("JSON extraction failed: {message}");
51+
52+
Err((
53+
rejection.status(),
54+
axum::Json(ErrorResponse::new(error_codes::INVALID_REQUEST, message)),
55+
))
56+
}
57+
}
58+
}
59+
}
60+
2261
/// Error codes for API responses
2362
pub mod error_codes {
2463
pub const SOURCE_CREATE_FAILED: &str = "SOURCE_CREATE_FAILED";

src/api/v1/handlers.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use std::convert::Infallible;
2828
use std::sync::Arc;
2929

3030
use crate::api::models::{ComponentEventDto, LogMessageDto, QueryConfigDto};
31+
use crate::api::shared::error::JsonBody;
3132
use crate::api::shared::handlers::{ComponentViewQuery, ObservabilityQuery};
3233
use crate::api::shared::{
3334
ApiResponse, ApiVersionsResponse, ComponentListItem, HealthResponse, InstanceListItem,
@@ -135,7 +136,7 @@ pub async fn create_instance(
135136
Extension(registry): Extension<InstanceRegistry>,
136137
Extension(read_only): Extension<Arc<bool>>,
137138
Extension(config_persistence): Extension<Option<Arc<ConfigPersistence>>>,
138-
Json(request): Json<shared::CreateInstanceRequest>,
139+
JsonBody(request): JsonBody<shared::CreateInstanceRequest>,
139140
) -> Result<Json<ApiResponse<StatusResponse>>, StatusCode> {
140141
shared::create_instance(
141142
Extension(registry),
@@ -542,7 +543,7 @@ pub async fn create_query(
542543
Extension(read_only): Extension<Arc<bool>>,
543544
Extension(config_persistence): Extension<Option<Arc<ConfigPersistence>>>,
544545
Path(InstancePath { instance_id }): Path<InstancePath>,
545-
Json(config): Json<QueryConfigDto>,
546+
JsonBody(config): JsonBody<QueryConfigDto>,
546547
) -> Result<Json<ApiResponse<StatusResponse>>, StatusCode> {
547548
let core = registry
548549
.get(&instance_id)
@@ -1368,7 +1369,7 @@ pub async fn create_query_default(
13681369
Extension(registry): Extension<InstanceRegistry>,
13691370
Extension(read_only): Extension<Arc<bool>>,
13701371
Extension(config_persistence): Extension<Option<Arc<ConfigPersistence>>>,
1371-
Json(config): Json<QueryConfigDto>,
1372+
JsonBody(config): JsonBody<QueryConfigDto>,
13721373
) -> Result<Json<ApiResponse<StatusResponse>>, StatusCode> {
13731374
let (instance_id, core) = registry.get_default().await.ok_or(StatusCode::NOT_FOUND)?;
13741375
shared::create_query(

src/api/v1/openapi.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::api::models::{
2424
ComponentEventDto, ComponentStatusDto, ComponentTypeDto, ConfigValueBoolSchema,
2525
ConfigValueStringSchema, ConfigValueU16Schema, ConfigValueU32Schema, ConfigValueU64Schema,
2626
ConfigValueUsizeSchema, LogLevelDto, LogMessageDto, QueryConfigDto, RedbStateStoreConfigDto,
27-
SourceSubscriptionConfigDto,
27+
SourceMiddlewareConfigDto, SourceSubscriptionConfigDto,
2828
};
2929
use crate::api::shared::handlers::CreateInstanceRequest;
3030
use crate::api::shared::{
@@ -100,6 +100,7 @@ use utoipa::openapi::RefOr;
100100
DrasiLibInstanceConfig,
101101
QueryConfigDto,
102102
SourceSubscriptionConfigDto,
103+
SourceMiddlewareConfigDto,
103104
RedbStateStoreConfigDto,
104105
ConfigValueStringSchema,
105106
ConfigValueU16Schema,

0 commit comments

Comments
 (0)