diff --git a/services/src/contexts/error.rs b/services/src/contexts/error.rs new file mode 100644 index 000000000..c5b58f74f --- /dev/null +++ b/services/src/contexts/error.rs @@ -0,0 +1,66 @@ +use snafu::Snafu; + +/// High-level error, reported to the user. +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)))] +#[snafu(context(suffix(false)))] // disables default `Snafu` suffix +pub enum ApplicationContextError { + CreateContext { source: InternalError }, + SessionById { source: InternalError }, +} + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)))] +#[snafu(context(suffix(false)))] // disables default `Snafu` suffix +pub enum SimpleApplicationContextError { + DefaultSession { source: InternalError }, + UpdateDefaultSessionProject { source: InternalError }, + UpdateDefaultSessionView { source: InternalError }, + DefaultSessionContext { source: InternalError }, +} + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)))] +#[snafu(context(suffix(false)))] // disables default `Snafu` suffix +pub enum SessionContextError { + Volumes, +} + +/// Low-level internal error, not (directly) reported to the user. +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)))] +#[snafu(context(suffix(false)))] // disables default `Snafu` suffix +pub enum InternalError { + CouldNotGetPostgresConfig, + CouldNotGetSchemaStatus { + source: tokio_postgres::Error, + }, + CouldNotGetClearDatabaseOnStartConfig { + source: tokio_postgres::Error, + }, + #[snafu(display( + "Database cannot be cleared on startup because it was initially started without that setting." + ))] + ClearDatabaseOnStartupNotAllowed, + CouldNotRecreateDatabaseSchema { + source: tokio_postgres::Error, + }, + CouldNotInitializeSchema { + source: tokio_postgres::Error, + }, + CouldNotCreateDefaultSession { + source: tokio_postgres::Error, + }, + CouldNotLoadDefaultSession { + source: tokio_postgres::Error, + }, + InvalidSession, + Postgres { + source: tokio_postgres::Error, + }, + Bb8 { + source: bb8_postgres::bb8::RunError, + }, + // TODO: add source error from user db once it is refactored + CouldNotGetSessionById, +} diff --git a/services/src/contexts/mod.rs b/services/src/contexts/mod.rs index 6c7ad16b4..b263e4e6c 100644 --- a/services/src/contexts/mod.rs +++ b/services/src/contexts/mod.rs @@ -1,5 +1,4 @@ use crate::datasets::upload::Volume; -use crate::error::Result; use crate::layers::listing::{DatasetLayerCollectionProvider, LayerCollectionProvider}; use crate::layers::storage::{LayerDb, LayerProviderDb}; use crate::tasks::{TaskContext, TaskManager}; @@ -16,6 +15,7 @@ use tokio::fs::File; use tokio::io::AsyncWriteExt; use tokio::sync::RwLock; +pub mod error; mod postgres; mod session; mod simple_context; @@ -34,10 +34,13 @@ use geoengine_operators::engine::{ use geoengine_operators::mock::MockDatasetDataSourceLoadingInfo; use geoengine_operators::source::{GdalLoadingInfo, OgrSourceDataset}; +pub use error::ApplicationContextError; pub use postgres::{PostgresContext, PostgresDb, PostgresSessionContext}; pub use session::{MockableSession, Session, SessionId, SimpleSession}; pub use simple_context::SimpleApplicationContext; +use self::error::SessionContextError; + pub type Db = Arc>; /// The application context bundles shared resources. @@ -51,7 +54,10 @@ pub trait ApplicationContext: 'static + Send + Sync + Clone { fn session_context(&self, session: Self::Session) -> Self::SessionContext; /// Load a session by its id - async fn session_by_id(&self, session_id: SessionId) -> Result; + async fn session_by_id( + &self, + session_id: SessionId, + ) -> Result; } /// The session context bundles resources that are specific to a session. @@ -71,13 +77,13 @@ pub trait SessionContext: 'static + Send + Sync + Clone { fn tasks(&self) -> Self::TaskManager; /// Create a new query context for executing queries on processors - fn query_context(&self) -> Result; + fn query_context(&self) -> Result; /// Create a new execution context initializing operators - fn execution_context(&self) -> Result; + fn execution_context(&self) -> Result; /// Get the list of available data volumes - fn volumes(&self) -> Result>; + fn volumes(&self) -> Result, SessionContextError>; /// Get the current session fn session(&self) -> &Self::Session; diff --git a/services/src/contexts/postgres.rs b/services/src/contexts/postgres.rs index 2e9c38d5a..088505e71 100644 --- a/services/src/contexts/postgres.rs +++ b/services/src/contexts/postgres.rs @@ -1,11 +1,12 @@ use crate::api::model::datatypes::DatasetName; +use crate::contexts::error::{CouldNotInitializeSchema, CouldNotRecreateDatabaseSchema}; use crate::contexts::{ApplicationContext, QueryContextImpl, SessionId, SimpleSession}; use crate::contexts::{GeoEngineDb, SessionContext}; use crate::datasets::add_from_directory::{ add_datasets_from_directory, add_providers_from_directory, }; use crate::datasets::upload::{Volume, Volumes}; -use crate::error::{self, Error, Result}; +use crate::error::{Error, Result}; use crate::layers::add_from_directory::{ add_layer_collections_from_directory, add_layers_from_directory, UNSORTED_COLLECTION_ID, }; @@ -26,10 +27,17 @@ use geoengine_operators::engine::ChunkByteSize; use geoengine_operators::util::create_rayon_thread_pool; use log::{debug, info}; use rayon::ThreadPool; +use snafu::ResultExt; use std::path::PathBuf; use std::sync::Arc; -use super::{ExecutionContextImpl, Session, SimpleApplicationContext}; +use super::error::{ + Bb8, CouldNotCreateDefaultSession, CouldNotGetClearDatabaseOnStartConfig, + CouldNotLoadDefaultSession, CreateContext, DefaultSession, DefaultSessionContext, + InternalError, SessionById, SessionContextError, SimpleApplicationContextError, + UpdateDefaultSessionProject, UpdateDefaultSessionView, +}; +use super::{ApplicationContextError, ExecutionContextImpl, Session, SimpleApplicationContext}; // TODO: distinguish user-facing errors from system-facing error messages @@ -69,18 +77,46 @@ where tls: Tls, exe_ctx_tiling_spec: TilingSpecification, query_ctx_chunk_size: ChunkByteSize, - ) -> Result { + ) -> Result { let pg_mgr = PostgresConnectionManager::new(config, tls); - let pool = Pool::builder().build(pg_mgr).await?; - let created_schema = Self::create_schema(pool.get().await?).await?; + let pool = Pool::builder() + .build(pg_mgr) + .await + .context(super::error::Postgres) + .context(CreateContext)?; + let created_schema = Self::create_schema( + pool.get() + .await + .context(super::error::Bb8) + .context(CreateContext)?, + ) + .await + .context(CreateContext)?; let session = if created_schema { let session = SimpleSession::default(); - Self::create_default_session(pool.get().await?, session.id()).await?; + Self::create_default_session( + pool.get() + .await + .context(super::error::Bb8) + .context(CreateContext)?, + session.id(), + ) + .await + .context(CouldNotCreateDefaultSession) + .context(CreateContext)?; session } else { - Self::load_default_session(pool.get().await?).await? + Self::load_default_session( + pool.get() + .await + .context(super::error::Bb8) + .context(CreateContext)?, + ) + .await + .context(CouldNotLoadDefaultSession) + .context(CreateContext)? }; Ok(PostgresContext { @@ -105,18 +141,46 @@ where layer_collection_defs_path: PathBuf, exe_ctx_tiling_spec: TilingSpecification, query_ctx_chunk_size: ChunkByteSize, - ) -> Result { + ) -> Result { let pg_mgr = PostgresConnectionManager::new(config, tls); - let pool = Pool::builder().build(pg_mgr).await?; - let created_schema = Self::create_schema(pool.get().await?).await?; + let pool = Pool::builder() + .build(pg_mgr) + .await + .context(super::error::Postgres) + .context(CreateContext)?; + let created_schema = Self::create_schema( + pool.get() + .await + .context(super::error::Bb8) + .context(CreateContext)?, + ) + .await + .context(CreateContext)?; let session = if created_schema { let session = SimpleSession::default(); - Self::create_default_session(pool.get().await?, session.id()).await?; + Self::create_default_session( + pool.get() + .await + .context(super::error::Bb8) + .context(CreateContext)?, + session.id(), + ) + .await + .context(CouldNotCreateDefaultSession) + .context(CreateContext)?; session } else { - Self::load_default_session(pool.get().await?).await? + Self::load_default_session( + pool.get() + .await + .context(super::error::Bb8) + .context(CreateContext)?, + ) + .await + .context(CouldNotLoadDefaultSession) + .context(CreateContext)? }; let app_ctx = PostgresContext { @@ -148,7 +212,7 @@ where async fn check_schema_status( conn: &PooledConnection<'_, PostgresConnectionManager>, - ) -> Result { + ) -> Result { let stmt = match conn .prepare("SELECT clear_database_on_start from geoengine;") .await @@ -161,11 +225,14 @@ where return Ok(DatabaseStatus::Unitialized); } } - return Err(error::Error::TokioPostgres { source: e }); + return Err(InternalError::CouldNotGetSchemaStatus { source: e }); } }; - let row = conn.query_one(&stmt, &[]).await?; + let row = conn + .query_one(&stmt, &[]) + .await + .context(CouldNotGetClearDatabaseOnStartConfig)?; if row.get(0) { Ok(DatabaseStatus::InitializedClearDatabase) @@ -178,8 +245,9 @@ where /// Creates the database schema. Returns true if the schema was created, false if it already existed. pub(crate) async fn create_schema( mut conn: PooledConnection<'_, PostgresConnectionManager>, - ) -> Result { - let postgres_config = get_config_element::()?; + ) -> Result { + let postgres_config = get_config_element::() + .map_err(|_| InternalError::CouldNotGetPostgresConfig)?; let database_status = Self::check_schema_status(&conn).await?; @@ -190,10 +258,11 @@ where conn.batch_execute(&format!( "DROP SCHEMA {schema_name} CASCADE; CREATE SCHEMA {schema_name};" )) - .await?; + .await + .context(CouldNotRecreateDatabaseSchema {})?; } DatabaseStatus::InitializedKeepDatabase if postgres_config.clear_database_on_start => { - return Err(Error::ClearDatabaseOnStartupNotAllowed) + return Err(InternalError::ClearDatabaseOnStartupNotAllowed) } DatabaseStatus::InitializedClearDatabase | DatabaseStatus::InitializedKeepDatabase => { return Ok(false) @@ -201,23 +270,24 @@ where DatabaseStatus::Unitialized => (), }; - let tx = conn.build_transaction().start().await?; + async move { + let tx = conn.build_transaction().start().await?; - tx.batch_execute(include_str!("schema.sql")).await?; + tx.batch_execute(include_str!("schema.sql")).await?; - let stmt = tx - .prepare( - " + let stmt = tx + .prepare( + " INSERT INTO geoengine (clear_database_on_start) VALUES ($1);", - ) - .await?; + ) + .await?; - tx.execute(&stmt, &[&postgres_config.clear_database_on_start]) - .await?; + tx.execute(&stmt, &[&postgres_config.clear_database_on_start]) + .await?; - let stmt = tx - .prepare( - r#" + let stmt = tx + .prepare( + r#" INSERT INTO layer_collections ( id, name, @@ -229,15 +299,15 @@ where 'All available Geo Engine layers', ARRAY[]::"PropertyType"[] );"#, - ) - .await?; + ) + .await?; - tx.execute(&stmt, &[&INTERNAL_LAYER_DB_ROOT_COLLECTION_ID]) - .await?; + tx.execute(&stmt, &[&INTERNAL_LAYER_DB_ROOT_COLLECTION_ID]) + .await?; - let stmt = tx - .prepare( - r#"INSERT INTO layer_collections ( + let stmt = tx + .prepare( + r#"INSERT INTO layer_collections ( id, name, description, @@ -248,29 +318,34 @@ where 'Unsorted Layers', ARRAY[]::"PropertyType"[] );"#, - ) - .await?; + ) + .await?; - tx.execute(&stmt, &[&UNSORTED_COLLECTION_ID]).await?; + tx.execute(&stmt, &[&UNSORTED_COLLECTION_ID]).await?; - let stmt = tx - .prepare( - r#" + let stmt = tx + .prepare( + r#" INSERT INTO collection_children (parent, child) VALUES ($1, $2);"#, + ) + .await?; + + tx.execute( + &stmt, + &[ + &INTERNAL_LAYER_DB_ROOT_COLLECTION_ID, + &UNSORTED_COLLECTION_ID, + ], ) .await?; - tx.execute( - &stmt, - &[ - &INTERNAL_LAYER_DB_ROOT_COLLECTION_ID, - &UNSORTED_COLLECTION_ID, - ], - ) - .await?; + tx.commit().await?; - tx.commit().await?; + Result::<(), tokio_postgres::Error>::Ok(()) + } + .await + .context(CouldNotInitializeSchema)?; debug!("Created database schema"); @@ -280,7 +355,7 @@ where async fn create_default_session( conn: PooledConnection<'_, PostgresConnectionManager>, session_id: SessionId, - ) -> Result<()> { + ) -> Result<(), tokio_postgres::Error> { let stmt = conn .prepare("INSERT INTO sessions (id, project_id, view) VALUES ($1, NULL ,NULL);") .await?; @@ -291,7 +366,7 @@ where } async fn load_default_session( conn: PooledConnection<'_, PostgresConnectionManager>, - ) -> Result { + ) -> Result { let stmt = conn .prepare("SELECT id, project_id, view FROM sessions LIMIT 1;") .await?; @@ -300,6 +375,38 @@ where Ok(SimpleSession::new(row.get(0), row.get(1), row.get(2))) } + + pub(crate) async fn get_session_by_id( + &self, + session_id: SessionId, + ) -> Result { + let mut conn = self.pool.get().await.context(Bb8)?; + + let tx = conn + .build_transaction() + .start() + .await + .context(super::error::Postgres)?; + + let stmt = tx + .prepare( + " + SELECT + project_id, + view + FROM sessions + WHERE id = $1;", + ) + .await + .context(super::error::Postgres)?; + + let row = tx + .query_one(&stmt, &[&session_id]) + .await + .map_err(|_error| InternalError::InvalidSession)?; + + Ok(SimpleSession::new(session_id, row.get(0), row.get(1))) + } } #[async_trait] @@ -314,38 +421,71 @@ where self.default_session_id } - async fn default_session(&self) -> Result { - Self::load_default_session(self.pool.get().await?).await + async fn default_session(&self) -> Result { + Self::load_default_session(self.pool.get().await.context(Bb8).context(DefaultSession)?) + .await + .context(super::error::Postgres) + .context(DefaultSession) } - async fn update_default_session_project(&self, project: ProjectId) -> Result<()> { - let conn = self.pool.get().await?; + async fn update_default_session_project( + &self, + project: ProjectId, + ) -> Result<(), SimpleApplicationContextError> { + let conn = self + .pool + .get() + .await + .context(Bb8) + .context(UpdateDefaultSessionProject)?; let stmt = conn .prepare("UPDATE sessions SET project_id = $1 WHERE id = $2;") - .await?; + .await + .context(super::error::Postgres) + .context(UpdateDefaultSessionProject)?; conn.execute(&stmt, &[&project, &self.default_session_id]) - .await?; + .await + .context(super::error::Postgres) + .context(UpdateDefaultSessionProject)?; Ok(()) } - async fn update_default_session_view(&self, view: STRectangle) -> Result<()> { - let conn = self.pool.get().await?; + async fn update_default_session_view( + &self, + view: STRectangle, + ) -> Result<(), SimpleApplicationContextError> { + let conn = self + .pool + .get() + .await + .context(Bb8) + .context(UpdateDefaultSessionView)?; let stmt = conn .prepare("UPDATE sessions SET view = $1 WHERE id = $2;") - .await?; + .await + .context(super::error::Postgres) + .context(UpdateDefaultSessionView)?; conn.execute(&stmt, &[&view, &self.default_session_id]) - .await?; + .await + .context(super::error::Postgres) + .context(UpdateDefaultSessionView)?; Ok(()) } - async fn default_session_context(&self) -> Result { - Ok(self.session_context(self.session_by_id(self.default_session_id).await?)) + async fn default_session_context( + &self, + ) -> Result { + Ok(self.session_context( + self.get_session_by_id(self.default_session_id) + .await + .context(DefaultSessionContext)?, + )) } } @@ -367,28 +507,13 @@ where } } - async fn session_by_id(&self, session_id: SessionId) -> Result { - let mut conn = self.pool.get().await?; - - let tx = conn.build_transaction().start().await?; - - let stmt = tx - .prepare( - " - SELECT - project_id, - view - FROM sessions - WHERE id = $1;", - ) - .await?; - - let row = tx - .query_one(&stmt, &[&session_id]) + async fn session_by_id( + &self, + session_id: SessionId, + ) -> Result { + self.get_session_by_id(session_id) .await - .map_err(|_error| error::Error::InvalidSession)?; - - Ok(SimpleSession::new(session_id, row.get(0), row.get(1))) + .context(SessionById) } } @@ -428,14 +553,14 @@ where SimpleTaskManager::new(self.context.task_manager.clone()) } - fn query_context(&self) -> Result { + fn query_context(&self) -> Result { Ok(QueryContextImpl::new( self.context.query_ctx_chunk_size, self.context.thread_pool.clone(), )) } - fn execution_context(&self) -> Result { + fn execution_context(&self) -> Result { Ok(ExecutionContextImpl::>::new( self.db(), self.context.thread_pool.clone(), @@ -443,7 +568,7 @@ where )) } - fn volumes(&self) -> Result> { + fn volumes(&self) -> Result, SessionContextError> { Ok(self.context.volumes.volumes.clone()) } diff --git a/services/src/contexts/session.rs b/services/src/contexts/session.rs index 8ca7b282f..936c10578 100644 --- a/services/src/contexts/session.rs +++ b/services/src/contexts/session.rs @@ -100,7 +100,13 @@ impl FromRequest for SimpleSession { "Application context should be present because it is set during server initialization.", ); let pg_ctx = pg_ctx.get_ref().clone(); - return async move { pg_ctx.session_by_id(token).await.map_err(Into::into) }.boxed_local(); + return async move { + pg_ctx + .session_by_id(token) + .await + .map_err(|_| error::Error::InvalidSession) + } + .boxed_local(); } } diff --git a/services/src/contexts/simple_context.rs b/services/src/contexts/simple_context.rs index 70aef6ad4..9b76fc2d5 100644 --- a/services/src/contexts/simple_context.rs +++ b/services/src/contexts/simple_context.rs @@ -1,7 +1,6 @@ -use crate::error::Result; use crate::projects::{ProjectId, STRectangle}; -use super::{ApplicationContext, SessionId, SimpleSession}; +use super::{error::SimpleApplicationContextError, ApplicationContext, SessionId, SimpleSession}; use async_trait::async_trait; @@ -9,11 +8,19 @@ use async_trait::async_trait; #[async_trait] pub trait SimpleApplicationContext: ApplicationContext { async fn default_session_id(&self) -> SessionId; - async fn default_session(&self) -> Result; + async fn default_session(&self) -> Result; - async fn update_default_session_project(&self, project: ProjectId) -> Result<()>; - async fn update_default_session_view(&self, view: STRectangle) -> Result<()>; + async fn update_default_session_project( + &self, + project: ProjectId, + ) -> Result<(), SimpleApplicationContextError>; + async fn update_default_session_view( + &self, + view: STRectangle, + ) -> Result<(), SimpleApplicationContextError>; /// Get a session context for the default session. - async fn default_session_context(&self) -> Result; + async fn default_session_context( + &self, + ) -> Result; } diff --git a/services/src/error.rs b/services/src/error.rs index 5b036c6a5..3b506912b 100644 --- a/services/src/error.rs +++ b/services/src/error.rs @@ -1,6 +1,8 @@ use crate::api::model::datatypes::{ DataProviderId, DatasetId, LayerId, SpatialReference, SpatialReferenceOption, TimeInstance, }; +use crate::contexts::error::{SessionContextError, SimpleApplicationContextError}; +use crate::contexts::ApplicationContextError; #[cfg(feature = "nfdi")] use crate::datasets::external::aruna::error::ArunaProviderError; #[cfg(feature = "ebv")] @@ -445,6 +447,22 @@ pub enum Error { }, UnexpectedInvalidDbTypeConversion, + + // TODO: these errors should be used in the error type of the handlers + ApplicationContext { + source: ApplicationContextError, + }, + SimpleApplicationContext { + source: SimpleApplicationContextError, + }, + SessionContext { + source: SessionContextError, + }, + + #[cfg(feature = "pro")] + ProApplicationContext { + source: crate::pro::contexts::error::ProApplicationContextError, + }, } impl actix_web::error::ResponseError for Error { @@ -552,3 +570,29 @@ impl From for Error { Error::TokioJoin { source } } } + +// TODO: these should be used in the error types of the handlers +impl From for Error { + fn from(source: ApplicationContextError) -> Self { + Error::ApplicationContext { source } + } +} + +impl From for Error { + fn from(source: SimpleApplicationContextError) -> Self { + Error::SimpleApplicationContext { source } + } +} + +impl From for Error { + fn from(source: SessionContextError) -> Self { + Error::SessionContext { source } + } +} + +#[cfg(feature = "pro")] +impl From for Error { + fn from(source: crate::pro::contexts::error::ProApplicationContextError) -> Self { + Error::ProApplicationContext { source } + } +} diff --git a/services/src/pro/contexts/error.rs b/services/src/pro/contexts/error.rs new file mode 100644 index 000000000..730d74ceb --- /dev/null +++ b/services/src/pro/contexts/error.rs @@ -0,0 +1,22 @@ +use snafu::Snafu; + +use crate::contexts::ApplicationContextError; + +/// High-level error, reported to the user. +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)))] +#[snafu(context(suffix(false)))] // disables default `Snafu` suffix +pub enum ProApplicationContextError { + ApplicationContext { source: ApplicationContextError }, + CreateContext { source: InternalError }, + // SessionById { source: InternalError }, +} + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)))] +#[snafu(context(suffix(false)))] // disables default `Snafu` suffix +pub enum InternalError { + ApplicationContextInternal { + source: crate::contexts::error::InternalError, + }, +} diff --git a/services/src/pro/contexts/mod.rs b/services/src/pro/contexts/mod.rs index 0b64d628c..76bb958ab 100644 --- a/services/src/pro/contexts/mod.rs +++ b/services/src/pro/contexts/mod.rs @@ -1,3 +1,4 @@ +pub mod error; mod postgres; use std::str::FromStr; diff --git a/services/src/pro/contexts/postgres.rs b/services/src/pro/contexts/postgres.rs index 16efbe940..ed4d32e44 100644 --- a/services/src/pro/contexts/postgres.rs +++ b/services/src/pro/contexts/postgres.rs @@ -1,11 +1,15 @@ use crate::api::model::datatypes::DatasetName; -use crate::contexts::{ApplicationContext, PostgresContext, QueryContextImpl, SessionId}; +use crate::contexts::error::{Bb8, CouldNotInitializeSchema, SessionById, SessionContextError}; +use crate::contexts::{ + ApplicationContext, ApplicationContextError, PostgresContext, QueryContextImpl, SessionId, +}; use crate::contexts::{GeoEngineDb, SessionContext}; use crate::datasets::add_from_directory::add_providers_from_directory; use crate::datasets::upload::{Volume, Volumes}; -use crate::error::{self, Error, Result}; +use crate::error::Error; use crate::layers::add_from_directory::UNSORTED_COLLECTION_ID; use crate::layers::storage::INTERNAL_LAYER_DB_ROOT_COLLECTION_ID; +use crate::pro::contexts::error::ApplicationContextInternal; use crate::pro::datasets::add_datasets_from_directory; use crate::pro::layers::add_from_directory::{ add_layer_collections_from_directory, add_layers_from_directory, @@ -39,6 +43,7 @@ use snafu::{ensure, ResultExt}; use std::path::PathBuf; use std::sync::Arc; +use super::error::{CreateContext, InternalError, ProApplicationContextError}; use super::{ExecutionContextImpl, ProApplicationContext, ProGeoEngineDb, QuotaCheckerImpl}; // TODO: do not report postgres error details to user @@ -76,14 +81,37 @@ where exe_ctx_tiling_spec: TilingSpecification, query_ctx_chunk_size: ChunkByteSize, quota_config: Quota, - ) -> Result { + ) -> Result { let pg_mgr = PostgresConnectionManager::new(config, tls); - let pool = Pool::builder().build(pg_mgr).await?; + let pool = Pool::builder() + .build(pg_mgr) + .await + .context(crate::contexts::error::Postgres) + .context(crate::contexts::error::CreateContext) + .context(super::error::ApplicationContext)?; + + let created_schema = PostgresContext::create_schema( + pool.get() + .await + .context(Bb8) + .context(crate::contexts::error::CreateContext) + .context(super::error::ApplicationContext)?, + ) + .await + .context(crate::contexts::error::CreateContext) + .context(super::error::ApplicationContext)?; - let created_schema = PostgresContext::create_schema(pool.get().await?).await?; if created_schema { - Self::create_pro_schema(pool.get().await?).await?; + Self::create_pro_schema( + pool.get() + .await + .context(Bb8) + .context(crate::contexts::error::CreateContext) + .context(super::error::ApplicationContext)?, + ) + .await + .context(CreateContext)?; } let db = ProPostgresDb::new(pool.clone(), UserSession::admin_session()); @@ -113,14 +141,37 @@ where oidc_db: OidcRequestDb, cache_config: Cache, quota_config: Quota, - ) -> Result { + ) -> Result { let pg_mgr = PostgresConnectionManager::new(config, tls); - let pool = Pool::builder().build(pg_mgr).await?; + let pool = Pool::builder() + .build(pg_mgr) + .await + .context(crate::contexts::error::Postgres) + .context(crate::contexts::error::CreateContext) + .context(super::error::ApplicationContext)?; + + let created_schema = PostgresContext::create_schema( + pool.get() + .await + .context(Bb8) + .context(crate::contexts::error::CreateContext) + .context(super::error::ApplicationContext)?, + ) + .await + .context(crate::contexts::error::CreateContext) + .context(super::error::ApplicationContext)?; - let created_schema = PostgresContext::create_schema(pool.get().await?).await?; if created_schema { - Self::create_pro_schema(pool.get().await?).await?; + Self::create_pro_schema( + pool.get() + .await + .context(Bb8) + .context(crate::contexts::error::CreateContext) + .context(super::error::ApplicationContext)?, + ) + .await + .context(CreateContext)?; } let db = ProPostgresDb::new(pool.clone(), UserSession::admin_session()); @@ -164,14 +215,37 @@ where oidc_config: Oidc, cache_config: Cache, quota_config: Quota, - ) -> Result { + ) -> Result { let pg_mgr = PostgresConnectionManager::new(config, tls); - let pool = Pool::builder().build(pg_mgr).await?; + let pool = Pool::builder() + .build(pg_mgr) + .await + .context(crate::contexts::error::Postgres) + .context(crate::contexts::error::CreateContext) + .context(super::error::ApplicationContext)?; + + let created_schema = PostgresContext::create_schema( + pool.get() + .await + .context(Bb8) + .context(crate::contexts::error::CreateContext) + .context(super::error::ApplicationContext)?, + ) + .await + .context(crate::contexts::error::CreateContext) + .context(super::error::ApplicationContext)?; - let created_schema = PostgresContext::create_schema(pool.get().await?).await?; if created_schema { - Self::create_pro_schema(pool.get().await?).await?; + Self::create_pro_schema( + pool.get() + .await + .context(Bb8) + .context(crate::contexts::error::CreateContext) + .context(super::error::ApplicationContext)?, + ) + .await + .context(CreateContext)?; } let db = ProPostgresDb::new(pool.clone(), UserSession::admin_session()); @@ -225,36 +299,39 @@ where /// Creates the database schema. Returns true if the schema was created, false if it already existed. pub(crate) async fn create_pro_schema( mut conn: PooledConnection<'_, PostgresConnectionManager>, - ) -> Result<()> { - let user_config = get_config_element::()?; + ) -> Result<(), InternalError> { + let user_config = get_config_element::() + .map_err(|_| crate::contexts::error::InternalError::CouldNotGetPostgresConfig) + .context(ApplicationContextInternal)?; - let tx = conn.build_transaction().start().await?; + async move { + let tx = conn.build_transaction().start().await?; - tx.batch_execute(include_str!("schema.sql")).await?; + tx.batch_execute(include_str!("schema.sql")).await?; - let stmt = tx - .prepare( - r#" + let stmt = tx + .prepare( + r#" INSERT INTO roles (id, name) VALUES ($1, 'admin'), ($2, 'user'), ($3, 'anonymous');"#, + ) + .await?; + + tx.execute( + &stmt, + &[ + &Role::admin_role_id(), + &Role::registered_user_role_id(), + &Role::anonymous_role_id(), + ], ) .await?; - tx.execute( - &stmt, - &[ - &Role::admin_role_id(), - &Role::registered_user_role_id(), - &Role::anonymous_role_id(), - ], - ) - .await?; - - let stmt = tx - .prepare( - r#" + let stmt = tx + .prepare( + r#" INSERT INTO users ( id, email, @@ -268,35 +345,35 @@ where 'admin', true );"#, + ) + .await?; + + tx.execute( + &stmt, + &[ + &Role::admin_role_id(), + &user_config.admin_email, + &bcrypt::hash(user_config.admin_password) + .expect("Admin password hash should be valid"), + ], ) .await?; - tx.execute( - &stmt, - &[ - &Role::admin_role_id(), - &user_config.admin_email, - &bcrypt::hash(user_config.admin_password) - .expect("Admin password hash should be valid"), - ], - ) - .await?; - - let stmt = tx - .prepare( - r#" + let stmt = tx + .prepare( + r#" INSERT INTO user_roles (user_id, role_id) VALUES ($1, $1);"#, - ) - .await?; + ) + .await?; - tx.execute(&stmt, &[&Role::admin_role_id()]).await?; + tx.execute(&stmt, &[&Role::admin_role_id()]).await?; - let stmt = tx - .prepare( - r#" + let stmt = tx + .prepare( + r#" INSERT INTO permissions (role_id, layer_collection_id, permission) VALUES @@ -306,22 +383,28 @@ where ($1, $5, 'Owner'), ($2, $5, 'Read'), ($3, $5, 'Read');"#, + ) + .await?; + + tx.execute( + &stmt, + &[ + &Role::admin_role_id(), + &Role::registered_user_role_id(), + &Role::anonymous_role_id(), + &INTERNAL_LAYER_DB_ROOT_COLLECTION_ID, + &UNSORTED_COLLECTION_ID, + ], ) .await?; - tx.execute( - &stmt, - &[ - &Role::admin_role_id(), - &Role::registered_user_role_id(), - &Role::anonymous_role_id(), - &INTERNAL_LAYER_DB_ROOT_COLLECTION_ID, - &UNSORTED_COLLECTION_ID, - ], - ) - .await?; + tx.commit().await?; - tx.commit().await?; + Result::<(), tokio_postgres::Error>::Ok(()) + } + .await + .context(CouldNotInitializeSchema) + .context(ApplicationContextInternal)?; debug!("Created pro database schema"); @@ -351,11 +434,15 @@ where } } - async fn session_by_id(&self, session_id: SessionId) -> Result { + async fn session_by_id( + &self, + session_id: SessionId, + ) -> Result { + // TODO: properly integrate user db error self.user_session_by_id(session_id) .await - .map_err(Box::new) - .context(error::Unauthorized) + .map_err(|_| crate::contexts::error::InternalError::CouldNotGetSessionById) + .context(SessionById) } } @@ -408,7 +495,7 @@ where ProTaskManager::new(self.context.task_manager.clone(), self.session.clone()) } - fn query_context(&self) -> Result { + fn query_context(&self) -> Result { // TODO: load config only once let mut extensions = QueryContextExtensions::default(); @@ -427,7 +514,7 @@ where )) } - fn execution_context(&self) -> Result { + fn execution_context(&self) -> Result { Ok(ExecutionContextImpl::>::new( self.db(), self.context.thread_pool.clone(), @@ -435,8 +522,8 @@ where )) } - fn volumes(&self) -> Result> { - ensure!(self.session.is_admin(), error::PermissionDenied); + fn volumes(&self) -> Result, SessionContextError> { + ensure!(self.session.is_admin(), crate::contexts::error::Volumes); Ok(self.context.volumes.volumes.clone()) } @@ -469,7 +556,7 @@ where } /// Check whether the namepsace of the given dataset is allowed for insertion - pub(crate) fn check_namespace(&self, id: &DatasetName) -> Result<()> { + pub(crate) fn check_namespace(&self, id: &DatasetName) -> crate::error::Result<()> { let is_ok = match &id.namespace { Some(namespace) => namespace.as_str() == self.session.user.id.to_string(), None => self.session.is_admin(),