Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions databroker/src/viss/history/no_history.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use async_trait::async_trait;
use crate::viss::history::HistoryProvider;

pub struct NoHistory;

#[async_trait]
impl HistoryProvider for NoHistory {}
22 changes: 22 additions & 0 deletions databroker/src/viss/history/provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use async_trait::async_trait;
use std::time::SystemTime;
use crate::broker::Datapoint;

#[derive(Debug)]
pub enum HistoryError {
NotFound,
BackendUnavailable,
InternalError,
}

#[async_trait]
pub trait HistoryProvider: Send + Sync {
async fn get_history(
&self,
_path: &str,
_start: SystemTime,
_end: SystemTime,
) -> Result<Vec<Datapoint>, HistoryError> {
Err(HistoryError::BackendUnavailable)
}
}
1 change: 1 addition & 0 deletions databroker/src/viss/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@

pub mod server;
pub mod v2;
pub mod history;
6 changes: 5 additions & 1 deletion databroker/src/viss/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ use tracing::{debug, error, info, trace};
use futures::{channel::mpsc, Sink};
use futures::{stream::StreamExt, Stream};

use std::sync::Arc;

use crate::authorization::Authorization;
use crate::broker;
use crate::viss::history::no_history::NoHistory;

use super::v2::{self, server::Viss};

Expand Down Expand Up @@ -126,8 +129,9 @@ async fn handle_viss_v2<W, R>(
// Create a multi producer / single consumer channel, where the
// single consumer will write to the socket.
let (sender, receiver) = mpsc::channel::<Message>(10);
let no_history =Arc::new(NoHistory);

let server = v2::server::Server::new(broker, authorization);
let server = v2::server::Server::new(broker, authorization, no_history);
let mut write_task = tokio::spawn(async move {
let _ = receiver.map(Ok).forward(write).await;
});
Expand Down
71 changes: 70 additions & 1 deletion databroker/src/viss/v2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use crate::{
broker::{self, AuthorizedAccess, UpdateError},
glob::Matcher,
permissions::{self, Permissions},
viss::history::HistoryProvider,
viss::history::HistoryError,
};

use super::{conversions, types::*};
Expand Down Expand Up @@ -74,14 +76,20 @@ impl Drop for SubscriptionHandle {
pub struct Server {
broker: broker::DataBroker,
authorization: Authorization,
history: Arc<dyn HistoryProvider>,
subscriptions: Arc<RwLock<HashMap<SubscriptionId, SubscriptionHandle>>>,
}

impl Server {
pub fn new(broker: broker::DataBroker, authorization: Authorization) -> Self {
pub fn new(
broker: broker::DataBroker,
authorization: Authorization,
history: Arc<dyn HistoryProvider>,
) -> Self {
Self {
broker,
authorization,
history,
subscriptions: Arc::new(RwLock::new(HashMap::new())),
}
}
Expand Down Expand Up @@ -205,6 +213,46 @@ impl Viss for Server {
},
})
}
} else if let Some(Filter::History(history_filter)) = &request.filter {
let permissions = resolve_permissions(&self.authorization, &request.authorization)
.map_err(|error| GetErrorResponse {
request_id: request_id.clone(),
error,
ts: SystemTime::now().into(),
})?;
// Parse ISO8601 duration (P2DT12H)
let duration = parse_iso8601_duration(&history_filter.parameter).ok_or_else(|| {
GetErrorResponse {
request_id: request_id.clone(),
ts: SystemTime::now().into(),
error: Error::InvalidFilter(format!(
"Invalid ISO8601 duration: {}",
history_filter.parameter
)),
}
})?;
let now = chrono::Utc::now();
let start_ts = now - duration;
let history = self.history
.get_history(request.path.as_ref(), start_ts.into(), now.into())
.await
.map_err(|err| GetErrorResponse {
request_id: request_id.clone(),
ts: SystemTime::now().into(),
error: match err {
HistoryError::NotFound => Error::NotImplemented,
HistoryError::BackendUnavailable => Error::ServiceUnavailable,
HistoryError::InternalError => Error::InternalServerError,
},
})?;
let dps: Vec<DataPoint> = history.into_iter().map(DataPoint::from).collect();
return Ok(GetSuccessResponse::Data(DataResponse {
request_id,
data: Data::History(DataObjectHistory {
path: request.path,
dp: dps,
}),
}));
} else {
let permissions = resolve_permissions(&self.authorization, &request.authorization)
.map_err(|error| GetErrorResponse {
Expand Down Expand Up @@ -565,3 +613,24 @@ fn insert_entry(entries: &mut HashMap<String, MetadataEntry>, path: &str, entry:
}
}
}

fn parse_iso8601_duration(s: &str) -> Option<chrono::Duration> {
let d = iso8601_duration::Duration::parse(s).ok()?;

let mut duration = chrono::Duration::zero();

if d.day > 0.0 {
duration = duration + chrono::Duration::days(d.day as i64);
}
if d.hour > 0.0 {
duration = duration + chrono::Duration::hours(d.hour as i64);
}
if d.minute > 0.0 {
duration = duration + chrono::Duration::minutes(d.minute as i64);
}
if d.second > 0.0 {
duration = duration + chrono::Duration::seconds(d.second as i64);
}

Some(duration)
}
28 changes: 28 additions & 0 deletions databroker/src/viss/v2/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ pub enum Filter {
Paths(PathsFilter),
#[serde(rename = "timebased")]
Timebased(TimebasedFilter),
#[serde(rename = "history")]
History(HistoryFilter),
}

#[derive(Deserialize)]
Expand All @@ -226,6 +228,11 @@ pub struct Period {
pub period: u32,
}

#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct HistoryFilter {
pub parameter: String,
}
// Unique id value specified by the client. Returned by the server in the
// response and used by the client to link the request and response messages.
// The value MAY be an integer or a Universally Unique Identifier (UUID).
Expand Down Expand Up @@ -264,12 +271,19 @@ pub struct Timestamp {
ts: SystemTime,
}

#[derive(Serialize)]
pub struct DataObjectHistory {
pub path: Path,
pub dp: Vec<DataPoint>,
}

#[derive(Serialize)]
#[serde(untagged)]
pub enum Data {
Object(DataObject),
#[allow(dead_code)]
Array(Vec<DataObject>),
History(DataObjectHistory),
}

#[derive(Serialize)]
Expand Down Expand Up @@ -324,6 +338,14 @@ pub struct SensorEntry {
#[serde(skip_serializing_if = "Option::is_none")]
pub max: Option<Value>,
}
pub struct Duration {
pub year: f32,
pub month: f32,
pub day: f32,
pub hour: f32,
pub minute: f32,
pub second: f32,
}

#[derive(Serialize)]
pub struct AttributeEntry {
Expand Down Expand Up @@ -418,6 +440,7 @@ pub enum Error {
InternalServerError,
NotImplemented,
ServiceUnavailable,
InvalidFilter(String),
}

impl From<Error> for ErrorSpec {
Expand Down Expand Up @@ -510,6 +533,11 @@ impl From<Error> for ErrorSpec {
reason: "service_unavailable".into(),
message: "The server is temporarily unable to handle the request.".into(),
},
Error::InvalidFilter(_msg) => ErrorSpec {
number: 504,
reason: "filter_invalid.".into(),
message: "Filter is invalid.".into(),
},
}
}
}
Expand Down