Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions databroker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tokio = { workspace = true, features = [
"signal",
] }
tokio-stream = { workspace = true, features = ["sync", "net"] }
async-stream = "0.3"
socket2 = { version = "0.5.8", features = ["all"] }
tracing = "0.1.34"
tracing-subscriber = { version = "0.3.20", default-features = false, features = [
Expand Down
36 changes: 35 additions & 1 deletion databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::pin::Pin;
use tokio::select;
use tokio::sync::mpsc;

use async_stream::stream;
use databroker_proto::kuksa::val::v1 as proto;
use databroker_proto::kuksa::val::v1::{DataEntryError, EntryUpdate};
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -612,8 +613,41 @@ impl proto::val_server::Val for broker::DataBroker {

match broker.subscribe(entries, None, None).await {
Ok(stream) => {
// Convert the internal stream (EntryUpdates) → protocol replies
let stream = convert_to_proto_stream(stream);
Ok(tonic::Response::new(Box::pin(stream)))

// Listen for broker shutdown, and on receipt terminate with UNAVAILABLE
let mut shutdown_rx = self.get_shutdown_trigger();

let wrapped = stream! {
let mut s = Box::pin(stream);

loop {
tokio::select! {
item = s.next() => {
match item {
Some(res) => {
// res: Result<SubscribeResponse, Status>
yield res;
}
None => {
// Upstream finished by itself (not a broker shutdown)
break;
}
}
}
_ = shutdown_rx.recv() => {
// Broker is going away -> signal UNAVAILABLE to client
yield Err(Status::unavailable("Databroker shutting down"));
break;
}
}
}
};

Ok(tonic::Response::new(
Box::pin(wrapped) as Self::SubscribeStream
))
}
Err(SubscriptionError::NotFound) => {
Err(tonic::Status::new(tonic::Code::NotFound, "Path not found"))
Expand Down
31 changes: 30 additions & 1 deletion databroker/src/grpc/sdv_databroker_v1/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tonic::{Code, Request, Response, Status};

use databroker_proto::sdv::databroker::v1 as proto;

use async_stream::stream;
use tokio_stream::{Stream, StreamExt};

use std::collections::HashMap;
Expand Down Expand Up @@ -185,8 +186,36 @@ impl proto::broker_server::Broker for broker::DataBroker {
match broker.subscribe_query(&query).await {
Ok(stream) => {
let stream = convert_to_proto_stream(stream);

// get_shutdown_trigger() is available via the databroker object (same struct)
let mut shutdown_rx = self.get_shutdown_trigger();

let wrapped = stream! {
let mut s = Box::pin(stream);

loop {
tokio::select! {
item = s.next() => {
match item {
Some(res) => {
// res: Result<proto::SubscribeReply, Status>
yield res;
}
None => {
break;
}
}
}
_ = shutdown_rx.recv() => {
yield Err(Status::unavailable("Databroker shutting down"));
break;
}
}
}
};

debug!("Subscribed to new query");
Ok(Response::new(Box::pin(stream)))
Ok(Response::new(Box::pin(wrapped) as Self::SubscribeStream))
}
Err(e) => Err(Status::new(Code::InvalidArgument, format!("{e:?}"))),
}
Expand Down
Loading