diff --git a/Cargo.toml b/Cargo.toml index 2e96c4ca9..0e7aca3af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ features = [ "postgres-array", "postgres-vector", "sea-orm-internal", + "d1", ] rustdoc-args = ["--cfg", "docsrs"] @@ -85,6 +86,7 @@ tracing = { version = "0.1", default-features = false, features = [ ] } url = { version = "2.2", default-features = false } uuid = { version = "1", default-features = false, optional = true } +worker = { version = "0.7", default-features = false, optional = true } [dev-dependencies] dotenv = "0.15" @@ -105,6 +107,7 @@ tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } uuid = { version = "1", features = ["v4"] } [features] +d1 = ["worker/d1"] debug-print = [] default = [ "macros", diff --git a/examples/d1_example/Cargo.toml b/examples/d1_example/Cargo.toml new file mode 100644 index 000000000..5ae6c9900 --- /dev/null +++ b/examples/d1_example/Cargo.toml @@ -0,0 +1,32 @@ +[package] +authors = ["Sea ORM Contributors"] +edition = "2024" +name = "sea-orm-d1-example" +publish = false +rust-version = "1.85.0" +version = "0.1.0" + +[workspace] + +[package.metadata.release] +release = false + +# https://github.com/rustwasm/wasm-pack/issues/1247 +[package.metadata.wasm-pack.profile.release] +wasm-opt = false + +[lib] +crate-type = ["cdylib"] +path = "src/lib.rs" + +[dependencies] +serde = { version = "1", features = ["derive"] } +serde_json = "1" + +worker = { version = "0.7", features = ["d1"] } + +sea-orm = { path = "../../", default-features = false, features = [ + "d1", + "with-json", + "macros", +] } diff --git a/examples/d1_example/README.md b/examples/d1_example/README.md new file mode 100644 index 000000000..214fc73ed --- /dev/null +++ b/examples/d1_example/README.md @@ -0,0 +1,97 @@ +# Sea-ORM D1 Example + +This example demonstrates how to use Sea-ORM with Cloudflare D1. + +## Prerequisites + +- [Rust](https://rustup.rs/) installed +- [wasm-pack](https://rustwasm.github.io/wasm-pack/installer/) installed +- [Wrangler](https://developers.cloudflare.com/workers/cli-wrangler/install-update/) CLI installed + +## Setup + +### 1. Create a D1 Database + +Create a D1 database in your Cloudflare Workers project: + +```bash +wrangler d1 create sea-orm-d1-example +``` + +### 2. Configure wrangler.toml + +Add the D1 binding to your `wrangler.toml`: + +```toml +name = "sea-orm-d1-example" +compatibility_date = "2025-01-01" + +[[d1_databases]] +binding = "DB" +database_name = "sea-orm-d1-example" +database_id = "your-database-id" +``` + +### 3. Create the Schema + +Create a `schema.sql` file: + +```sql +CREATE TABLE IF NOT EXISTS cake ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL +); +``` + +### 4. Initialize the Database + +Run the migrations: + +```bash +wrangler d1 execute sea-orm-d1-example --file=./schema.sql --remote +``` + +## Development + +### Build + +```bash +wasm-pack build --target web --out-dir ./dist +``` + +### Deploy + +```bash +wrangler deploy +``` + +## API Endpoints + +- `GET /cakes` - List all cakes +- `POST /cakes` - Create a new cake (`{"name": "Chocolate"}`) +- `GET /cakes/:id` - Get a cake by ID +- `DELETE /cakes/:id` - Delete a cake + +## Example Usage + +```bash +# List all cakes +curl https://your-worker.dev/cakes + +# Create a cake +curl -X POST https://your-worker.dev/cakes \ + -H "Content-Type: application/json" \ + -d '{"name": "Chocolate Cake"}' + +# Get a cake +curl https://your-worker.dev/cakes/1 + +# Delete a cake +curl -X DELETE https://your-worker.dev/cakes/1 +``` + +## Notes + +- D1 uses SQLite-compatible SQL syntax +- D1 connections require direct access via `as_d1_connection()` because `wasm-bindgen` futures are not `Send` +- Streaming is not supported for D1; use `query_all()` instead of `stream_raw()` diff --git a/examples/d1_example/src/cake.rs b/examples/d1_example/src/cake.rs new file mode 100644 index 000000000..26d588330 --- /dev/null +++ b/examples/d1_example/src/cake.rs @@ -0,0 +1,21 @@ +//! Cake entity for D1 example + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel)] +#[sea_orm(table_name = "cake")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + #[sea_orm(column_name = "name")] + pub name: String, + #[sea_orm(column_name = "price")] + pub price: Option, + #[sea_orm(column_name = "category")] + pub category: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/examples/d1_example/src/lib.rs b/examples/d1_example/src/lib.rs new file mode 100644 index 000000000..647c4929d --- /dev/null +++ b/examples/d1_example/src/lib.rs @@ -0,0 +1,326 @@ +//! Cloudflare D1 Example for Sea-ORM +//! +//! This example demonstrates how to use Sea-ORM with Cloudflare D1. +//! +//! # Setup +//! +//! 1. Create a D1 database in your Cloudflare Workers project: +//! ```bash +//! wrangler d1 create sea-orm-d1-example +//! ``` +//! +//! 2. Add the D1 binding to your `wrangler.toml`: +//! ```toml +//! [[d1_databases]] +//! binding = "DB" +//! database_name = "sea-orm-d1-example" +//! database_id = "your-database-id" +//! ``` +//! +//! 3. Create the schema migration: +//! ```sql +//! CREATE TABLE IF NOT EXISTS cake ( +//! id INTEGER PRIMARY KEY AUTOINCREMENT, +//! name TEXT NOT NULL, +//! price REAL DEFAULT NULL, +//! category TEXT DEFAULT NULL +//! ); +//! ``` +//! +//! 4. Run migrations: +//! ```bash +//! wrangler d1 execute sea-orm-d1-example --file=./schema.sql --remote +//! ``` +//! +//! # Features Demonstrated +//! +//! - `/cakes` - Direct SQL queries using `D1Connection` +//! - `/cakes-entity` - Entity queries using `D1QueryExecutor::find_all()` +//! - `/cakes-filtered` - Entity queries with filters and ordering +//! - `/cakes-search?q=...` - Entity queries with search parameters + +mod cake; + +use sea_orm::{ColumnTrait, DbBackend, D1Connection, D1QueryExecutor, EntityTrait, QueryFilter, QueryOrder, Statement, Value}; +use worker::{event, Context, Env, Method, Request, Response, Result}; + +#[event(fetch)] +async fn fetch(req: Request, env: Env, _ctx: Context) -> Result { + // Get D1 binding from environment + let d1 = match env.d1("DB") { + Ok(d1) => d1, + Err(e) => return Response::error(format!("Failed to get D1 binding: {}", e), 500), + }; + + // Connect to Sea-ORM + let db = match sea_orm::Database::connect_d1(d1).await { + Ok(db) => db, + Err(e) => return Response::error(format!("Failed to connect: {}", e), 500), + }; + + // Get D1 connection for direct access + let d1_conn = db.as_d1_connection(); + + // Route handling + let url = req.url()?; + let path = url.path(); + + match (req.method(), path) { + (Method::Get, "/") => Response::ok("Welcome to Sea-ORM D1 Example! Try /cakes, /cakes-entity, /cakes-filtered, or /cakes-search"), + (Method::Get, "/cakes") => handle_list_cakes(d1_conn).await, + (Method::Post, "/cakes") => handle_create_cake(d1_conn, req).await, + (Method::Get, "/cakes-entity") => handle_list_cakes_entity(d1_conn).await, + (Method::Get, "/cakes-filtered") => handle_filtered_cakes(d1_conn).await, + (Method::Get, path) if path.starts_with("/cakes-search") => handle_search_cakes(d1_conn, req).await, + (method, path) if path.starts_with("/cakes/") => { + let id = path.trim_start_matches("/cakes/"); + match method { + Method::Get => handle_get_cake(d1_conn, id).await, + Method::Delete => handle_delete_cake(d1_conn, id).await, + _ => Response::error("Method not allowed", 405), + } + } + _ => Response::error("Not found", 404), + } +} + +/// List all cakes using the Entity pattern (D1QueryExecutor) +async fn handle_list_cakes_entity(d1_conn: &D1Connection) -> Result { + // Use Entity::find() with D1QueryExecutor! + let cakes: Vec = match d1_conn.find_all(cake::Entity::find()).await { + Ok(cakes) => cakes, + Err(e) => return Response::error(format!("Query failed: {}", e), 500), + }; + + // Convert to response format + let results: Vec = cakes + .into_iter() + .map(|cake| CakeResponse { + id: cake.id, + name: cake.name, + }) + .collect(); + + Response::from_json(&results) +} + +/// List cakes with filters and ordering using Entity pattern +async fn handle_filtered_cakes(d1_conn: &D1Connection) -> Result { + // Use Entity::find() with filter and ordering + let cakes: Vec = match d1_conn + .find_all( + cake::Entity::find() + .filter(cake::Column::Category.is_not_null()) + .order_by_asc(cake::Column::Name), + ) + .await + { + Ok(cakes) => cakes, + Err(e) => return Response::error(format!("Query failed: {}", e), 500), + }; + + // Convert to response format + let results: Vec = cakes + .into_iter() + .map(|cake| CakeDetailResponse { + id: cake.id, + name: cake.name, + price: cake.price, + category: cake.category, + }) + .collect(); + + Response::from_json(&results) +} + +/// Escape SQL wildcards in a search term to prevent unexpected behavior +fn escape_like_pattern(s: &str) -> String { + s.replace('%', "\\%") + .replace('_', "\\_") + .replace('[', "\\[") + .replace(']', "\\]") +} + +/// Search cakes by name using query parameter +async fn handle_search_cakes(d1_conn: &D1Connection, req: Request) -> Result { + let url = req.url()?; + let query = url.query_pairs().find(|(key, _)| key == "q"); + let search_term = query.map(|(_, v)| v.to_string()).unwrap_or_default(); + + if search_term.is_empty() { + return Response::error("Missing 'q' query parameter", 400); + } + + // Escape SQL wildcards to prevent them from being interpreted as wildcards + let escaped_term = escape_like_pattern(&search_term); + + // Use Entity::find() with LIKE filter (case-sensitive in SQLite) + let cakes: Vec = match d1_conn + .find_all( + cake::Entity::find() + .filter(cake::Column::Name.like(&format!("%{}%", escaped_term))) + .order_by_asc(cake::Column::Name), + ) + .await + { + Ok(cakes) => cakes, + Err(e) => return Response::error(format!("Query failed: {}", e), 500), + }; + + let results: Vec = cakes + .into_iter() + .map(|cake| CakeResponse { + id: cake.id, + name: cake.name, + }) + .collect(); + + Response::from_json(&serde_json::json!({ + "query": search_term, + "count": results.len(), + "results": results + })) +} + +/// List all cakes +async fn handle_list_cakes(d1_conn: &D1Connection) -> Result { + let stmt = Statement::from_string( + DbBackend::Sqlite, + "SELECT id, name FROM cake ORDER BY id".to_string(), + ); + + let cakes = match d1_conn.query_all(stmt).await { + Ok(cakes) => cakes, + Err(e) => return Response::error(format!("Query failed: {}", e), 500), + }; + + let mut results = Vec::new(); + for row in cakes { + let id: i32 = match row.try_get_by("id") { + Ok(id) => id, + Err(e) => return Response::error(format!("Failed to get id: {}", e), 500), + }; + let name: String = match row.try_get_by("name") { + Ok(name) => name, + Err(e) => return Response::error(format!("Failed to get name: {}", e), 500), + }; + results.push(CakeResponse { id, name }); + } + + Response::from_json(&results) +} + +/// Create a new cake +async fn handle_create_cake(d1_conn: &D1Connection, mut req: Request) -> Result { + let body = match req.json::().await { + Ok(body) => body, + Err(e) => return Response::error(format!("Invalid JSON: {}", e), 400), + }; + + let stmt = Statement::from_sql_and_values( + DbBackend::Sqlite, + "INSERT INTO cake (name) VALUES (?) RETURNING id, name", + vec![Value::from(body.name)], + ); + + let result = match d1_conn.query_one(stmt).await { + Ok(result) => result, + Err(e) => return Response::error(format!("Query failed: {}", e), 500), + }; + + match result { + Some(row) => { + let id: i32 = match row.try_get_by("id") { + Ok(id) => id, + Err(e) => return Response::error(format!("Failed to get id: {}", e), 500), + }; + let name: String = match row.try_get_by("name") { + Ok(name) => name, + Err(e) => return Response::error(format!("Failed to get name: {}", e), 500), + }; + Response::from_json(&CakeResponse { id, name }) + } + None => Response::error("Failed to create cake", 500), + } +} + +/// Get a cake by ID +async fn handle_get_cake(d1_conn: &D1Connection, id: &str) -> Result { + let id: i32 = match id.parse() { + Ok(id) => id, + Err(_) => return Response::error("Invalid ID", 400), + }; + + let stmt = Statement::from_sql_and_values( + DbBackend::Sqlite, + "SELECT id, name FROM cake WHERE id = ?", + vec![Value::from(id)], + ); + + let result = match d1_conn.query_one(stmt).await { + Ok(result) => result, + Err(e) => return Response::error(format!("Query failed: {}", e), 500), + }; + + match result { + Some(row) => { + let id: i32 = match row.try_get_by("id") { + Ok(id) => id, + Err(e) => return Response::error(format!("Failed to get id: {}", e), 500), + }; + let name: String = match row.try_get_by("name") { + Ok(name) => name, + Err(e) => return Response::error(format!("Failed to get name: {}", e), 500), + }; + Response::from_json(&CakeResponse { id, name }) + } + None => Response::error("Cake not found", 404), + } +} + +/// Delete a cake by ID +async fn handle_delete_cake(d1_conn: &D1Connection, id: &str) -> Result { + let id: i32 = match id.parse() { + Ok(id) => id, + Err(_) => return Response::error("Invalid ID", 400), + }; + + let stmt = Statement::from_sql_and_values( + DbBackend::Sqlite, + "DELETE FROM cake WHERE id = ?", + vec![Value::from(id)], + ); + + let result = match d1_conn.execute(stmt).await { + Ok(result) => result, + Err(e) => return Response::error(format!("Execute failed: {}", e), 500), + }; + + if result.rows_affected() > 0 { + Response::from_json(&serde_json::json!({ "deleted": true })) + } else { + Response::error("Cake not found", 404) + } +} + +/// Response type for cake +#[derive(serde::Serialize)] +struct CakeResponse { + id: i32, + name: String, +} + +/// Response type for cake with full details (price and category) +#[derive(serde::Serialize)] +struct CakeDetailResponse { + id: i32, + name: String, + price: Option, + category: Option, +} + +/// Request type for creating a cake +#[derive(serde::Deserialize)] +struct CreateCakeRequest { + name: String, +} diff --git a/src/database/db_connection.rs b/src/database/db_connection.rs index 108111ad1..f563f375d 100644 --- a/src/database/db_connection.rs +++ b/src/database/db_connection.rs @@ -56,10 +56,28 @@ pub enum DatabaseConnectionType { #[cfg(feature = "proxy")] ProxyDatabaseConnection(Arc), + /// Cloudflare D1 database connection + #[cfg(feature = "d1")] + D1Connection(crate::D1Connection), + /// The connection has never been established Disconnected, } +#[cfg(feature = "d1")] +impl From for DatabaseConnectionType { + fn from(conn: crate::D1Connection) -> Self { + Self::D1Connection(conn) + } +} + +#[cfg(feature = "d1")] +impl From for DatabaseConnection { + fn from(conn: crate::D1Connection) -> Self { + DatabaseConnectionType::from(conn).into() + } +} + /// The same as a [DatabaseConnection] pub type DbConn = DatabaseConnection; @@ -79,6 +97,14 @@ impl From for DatabaseConnection { } } +#[cfg(feature = "d1")] +impl From for DatabaseConnection { + fn from(d1: worker::d1::D1Database) -> Self { + let conn = crate::D1Connection::from(d1); + DatabaseConnectionType::D1Connection(conn).into() + } +} + /// The type of database backend for real world databases. /// This is enabled by feature flags as specified in the crate documentation #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -96,6 +122,7 @@ pub enum DatabaseBackend { pub type DbBackend = DatabaseBackend; #[derive(Debug)] +#[allow(dead_code)] pub(crate) enum InnerConnection { #[cfg(feature = "sqlx-mysql")] MySql(PoolConnection), @@ -109,6 +136,8 @@ pub(crate) enum InnerConnection { Mock(Arc), #[cfg(feature = "proxy")] Proxy(Arc), + #[cfg(feature = "d1")] + D1(std::sync::Arc), } impl Debug for DatabaseConnectionType { @@ -129,6 +158,8 @@ impl Debug for DatabaseConnectionType { Self::MockDatabaseConnection(_) => "MockDatabaseConnection", #[cfg(feature = "proxy")] Self::ProxyDatabaseConnection(_) => "ProxyDatabaseConnection", + #[cfg(feature = "d1")] + Self::D1Connection(_) => "D1Connection", Self::Disconnected => "Disconnected", } ) @@ -171,6 +202,10 @@ impl ConnectionTrait for DatabaseConnection { DatabaseConnectionType::ProxyDatabaseConnection(conn) => { conn.execute(stmt).await } + #[cfg(feature = "d1")] + DatabaseConnectionType::D1Connection(_) => Err(conn_err( + "D1 connections require direct access via as_d1_connection(). See https://docs.sea-ql.org/sea-orm/master/feature-flags.html#d1 for details.", + )), DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")), } } @@ -215,6 +250,11 @@ impl ConnectionTrait for DatabaseConnection { let stmt = Statement::from_string(db_backend, sql); conn.execute(stmt).await } + // D1 connections must use as_d1_connection() directly + #[cfg(feature = "d1")] + DatabaseConnectionType::D1Connection(_) => Err(conn_err( + "D1 connections require direct access via as_d1_connection(). See https://docs.sea-ql.org/sea-orm/master/feature-flags.html#d1 for details.", + )), DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")), } } @@ -251,6 +291,11 @@ impl ConnectionTrait for DatabaseConnection { DatabaseConnectionType::ProxyDatabaseConnection(conn) => { conn.query_one(stmt).await } + // D1 connections must use as_d1_connection() directly + #[cfg(feature = "d1")] + DatabaseConnectionType::D1Connection(_) => Err(conn_err( + "D1 connections require direct access via as_d1_connection(). See https://docs.sea-ql.org/sea-orm/master/feature-flags.html#d1 for details.", + )), DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")), } } @@ -287,6 +332,11 @@ impl ConnectionTrait for DatabaseConnection { DatabaseConnectionType::ProxyDatabaseConnection(conn) => { conn.query_all(stmt).await } + // D1 connections must use as_d1_connection() directly + #[cfg(feature = "d1")] + DatabaseConnectionType::D1Connection(_) => Err(conn_err( + "D1 connections require direct access via as_d1_connection(). See https://docs.sea-ql.org/sea-orm/master/feature-flags.html#d1 for details.", + )), DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")), } } @@ -337,6 +387,11 @@ impl StreamTrait for DatabaseConnection { DatabaseConnectionType::ProxyDatabaseConnection(conn) => { Ok(crate::QueryStream::from((Arc::clone(conn), stmt, None))) } + // D1 connections must use as_d1_connection() directly + #[cfg(feature = "d1")] + DatabaseConnectionType::D1Connection(_) => Err(conn_err( + "D1 streaming is not supported. Use query_all() instead. See https://docs.sea-ql.org/sea-orm/master/feature-flags.html#d1 for details.", + )), DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")), } }) @@ -347,7 +402,6 @@ impl StreamTrait for DatabaseConnection { impl TransactionTrait for DatabaseConnection { type Transaction = DatabaseTransaction; - #[instrument(level = "trace")] async fn begin(&self) -> Result { match &self.inner { #[cfg(feature = "sqlx-mysql")] @@ -368,11 +422,15 @@ impl TransactionTrait for DatabaseConnection { DatabaseConnectionType::ProxyDatabaseConnection(conn) => { DatabaseTransaction::new_proxy(conn.clone(), None).await } + // D1 connections must use as_d1_connection() directly + #[cfg(feature = "d1")] + DatabaseConnectionType::D1Connection(_) => Err(conn_err( + "D1 transactions require direct access via as_d1_connection(). See https://docs.sea-ql.org/sea-orm/master/feature-flags.html#d1 for details.", + )), DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")), } } - #[instrument(level = "trace")] async fn begin_with_config( &self, _isolation_level: Option, @@ -403,13 +461,17 @@ impl TransactionTrait for DatabaseConnection { DatabaseConnectionType::ProxyDatabaseConnection(conn) => { DatabaseTransaction::new_proxy(conn.clone(), None).await } + // D1 connections must use as_d1_connection() directly + #[cfg(feature = "d1")] + DatabaseConnectionType::D1Connection(_) => Err(conn_err( + "D1 transactions require direct access via as_d1_connection(). See https://docs.sea-ql.org/sea-orm/master/feature-flags.html#d1 for details.", + )), DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")), } } /// Execute the function inside a transaction. /// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed. - #[instrument(level = "trace", skip(_callback))] async fn transaction(&self, _callback: F) -> Result> where F: for<'c> FnOnce( @@ -450,13 +512,17 @@ impl TransactionTrait for DatabaseConnection { .map_err(TransactionError::Connection)?; transaction.run(_callback).await } + // D1 connections must use as_d1_connection() directly + #[cfg(feature = "d1")] + DatabaseConnectionType::D1Connection(_) => { + Err(conn_err("D1 transactions require direct access via as_d1_connection(). See https://docs.sea-ql.org/sea-orm/master/feature-flags.html#d1 for details.").into()) + } DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected").into()), } } /// Execute the function inside a transaction. /// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed. - #[instrument(level = "trace", skip(_callback))] async fn transaction_with_config( &self, _callback: F, @@ -505,6 +571,11 @@ impl TransactionTrait for DatabaseConnection { .map_err(TransactionError::Connection)?; transaction.run(_callback).await } + // D1 connections must use as_d1_connection() directly + #[cfg(feature = "d1")] + DatabaseConnectionType::D1Connection(_) => { + Err(conn_err("D1 transactions require direct access via as_d1_connection(). See https://docs.sea-ql.org/sea-orm/master/feature-flags.html#d1 for details.").into()) + } DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected").into()), } } @@ -554,6 +625,48 @@ impl DatabaseConnection { } } +/// D1-specific helper methods +/// +/// D1 uses `wasm-bindgen` futures which are not `Send`, so `ConnectionTrait` +/// and `TransactionTrait` cannot be implemented. Use these methods to access +/// the `D1Connection` directly for all operations. +#[cfg(feature = "d1")] +#[cfg_attr(docsrs, doc(cfg(feature = "d1")))] +impl DatabaseConnection { + /// Get a reference to the D1 connection for direct operations + /// + /// D1 connections require direct access to `D1Connection` because + /// `wasm-bindgen` futures are not `Send`. + /// + /// # Example + /// + /// ```ignore + /// let d1 = env.d1("DB")?; + /// let db = sea_orm::Database::connect_d1(d1).await?; + /// + /// // Use as_d1_connection() for D1 operations + /// let d1_conn = db.as_d1_connection(); + /// let users = d1_conn.query_all(stmt).await?; + /// ``` + /// + /// # Panics + /// + /// Panics if this is not a D1 connection. + pub fn as_d1_connection(&self) -> &crate::D1Connection { + match &self.inner { + DatabaseConnectionType::D1Connection(conn) => conn, + _ => panic!("Not D1 connection"), + } + } + + /// Check if this is a D1 connection + /// + /// Returns `true` if the connection was created with [`Database::connect_d1`]. + pub fn is_d1_connection(&self) -> bool { + matches!(self.inner, DatabaseConnectionType::D1Connection(_)) + } +} + #[cfg(feature = "rbac")] impl DatabaseConnection { /// Load RBAC data from the same database as this connection and setup RBAC engine. @@ -611,6 +724,8 @@ impl DatabaseConnection { DatabaseConnectionType::MockDatabaseConnection(conn) => conn.get_database_backend(), #[cfg(feature = "proxy")] DatabaseConnectionType::ProxyDatabaseConnection(conn) => conn.get_database_backend(), + #[cfg(feature = "d1")] + DatabaseConnectionType::D1Connection(_) => DbBackend::Sqlite, // D1 is SQLite-compatible DatabaseConnectionType::Disconnected => panic!("Disconnected"), } } @@ -650,6 +765,8 @@ impl DatabaseConnection { DatabaseConnectionType::RusqliteSharedConnection(conn) => { conn.set_metric_callback(_callback) } + #[cfg(feature = "d1")] + DatabaseConnectionType::D1Connection(conn) => conn.set_metric_callback(_callback), _ => {} } } @@ -669,6 +786,8 @@ impl DatabaseConnection { DatabaseConnectionType::MockDatabaseConnection(conn) => conn.ping(), #[cfg(feature = "proxy")] DatabaseConnectionType::ProxyDatabaseConnection(conn) => conn.ping().await, + #[cfg(feature = "d1")] + DatabaseConnectionType::D1Connection(conn) => conn.ping().await, DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")), } } @@ -700,6 +819,8 @@ impl DatabaseConnection { // Nothing to cleanup, we just consume the `DatabaseConnection` Ok(()) } + #[cfg(feature = "d1")] + DatabaseConnectionType::D1Connection(conn) => conn.close_by_ref().await, DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")), } } diff --git a/src/database/mod.rs b/src/database/mod.rs index ab0898567..f0bb109a4 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -184,6 +184,33 @@ impl Database { } } } + + /// Method to create a [DatabaseConnection] on a Cloudflare D1 database + /// + /// # Example + /// + /// ```ignore + /// use worker::Env; + /// + /// async fn fetch(req: HttpRequest, env: Env, ctx: Context) -> Result { + /// // Get D1 binding from environment + /// let d1 = env.d1("DB")?; + /// + /// // Connect to Sea-ORM + /// let db = sea_orm::Database::connect_d1(d1).await?; + /// + /// // Use db as normal - all Sea-ORM operations work! + /// let users = user::Entity::find().all(&db).await?; + /// + /// Ok(Response::ok(...)? + /// } + /// ``` + #[cfg(feature = "d1")] + #[cfg_attr(docsrs, doc(cfg(feature = "d1")))] + #[instrument(level = "trace")] + pub async fn connect_d1(d1: worker::d1::D1Database) -> Result { + crate::D1Connector::connect(d1).await + } } impl From for ConnectOptions diff --git a/src/database/stream/query.rs b/src/database/stream/query.rs index b3687a53e..569e657bb 100644 --- a/src/database/stream/query.rs +++ b/src/database/stream/query.rs @@ -108,6 +108,12 @@ impl QueryStream { let elapsed = start.map(|s| s.elapsed().unwrap_or_default()); MetricStream::new(_metric_callback, stmt, elapsed, stream) } + // D1 doesn't support streaming due to Send bound requirements + // See db_connection.rs for stream_raw implementation + #[cfg(feature = "d1")] + InnerConnection::D1(_) => { + unreachable!("D1 streaming is not supported. Use query_all() instead.") + } #[allow(unreachable_patterns)] _ => unreachable!(), }, diff --git a/src/database/transaction.rs b/src/database/transaction.rs index e16f158bc..8dc9de9d5 100644 --- a/src/database/transaction.rs +++ b/src/database/transaction.rs @@ -108,6 +108,13 @@ impl DatabaseTransaction { c.begin().await; Ok(()) } + #[cfg(feature = "d1")] + InnerConnection::D1(_) => { + // D1 doesn't support explicit transactions - statements auto-commit + // We return Ok to allow transaction-like code to run, but each + // statement is committed independently + Ok(()) + } #[allow(unreachable_patterns)] _ => Err(conn_err("Disconnected")), } @@ -187,6 +194,11 @@ impl DatabaseTransaction { c.commit().await; Ok(()) } + #[cfg(feature = "d1")] + InnerConnection::D1(_) => { + // D1 auto-commits each statement, so commit is a no-op + Ok(()) + } #[allow(unreachable_patterns)] _ => Err(conn_err("Disconnected")), } @@ -244,6 +256,14 @@ impl DatabaseTransaction { c.rollback().await; Ok(()) } + #[cfg(feature = "d1")] + InnerConnection::D1(_) => { + // D1 doesn't support rollback - each statement auto-commits + // We return Ok since there's nothing to rollback, but warn that + // transactional behavior is not guaranteed + tracing::warn!("D1 doesn't support rollback - statements were auto-committed"); + Ok(()) + } #[allow(unreachable_patterns)] _ => Err(conn_err("Disconnected")), } @@ -285,6 +305,10 @@ impl DatabaseTransaction { InnerConnection::Proxy(c) => { c.start_rollback(); } + #[cfg(feature = "d1")] + InnerConnection::D1(_) => { + // D1 doesn't support rollback - nothing to do + } #[allow(unreachable_patterns)] _ => return Err(conn_err("Disconnected")), } @@ -371,6 +395,12 @@ impl ConnectionTrait for DatabaseTransaction { InnerConnection::Mock(conn) => conn.execute(stmt), #[cfg(feature = "proxy")] InnerConnection::Proxy(conn) => conn.execute(stmt).await, + #[cfg(feature = "d1")] + InnerConnection::D1(conn) => { + // D1 doesn't support transactions in the traditional sense, + // but we need to support execute within transaction context + Err(conn_err("D1 transactions do not support execute_raw. Use D1Connection directly.")) + } #[allow(unreachable_patterns)] _ => Err(conn_err("Disconnected")), } @@ -433,6 +463,10 @@ impl ConnectionTrait for DatabaseTransaction { let stmt = Statement::from_string(db_backend, sql); conn.execute(stmt).await } + #[cfg(feature = "d1")] + InnerConnection::D1(conn) => { + Err(conn_err("D1 transactions do not support execute_unprepared. Use D1Connection directly.")) + } #[allow(unreachable_patterns)] _ => Err(conn_err("Disconnected")), } @@ -493,6 +527,10 @@ impl ConnectionTrait for DatabaseTransaction { InnerConnection::Mock(conn) => conn.query_one(stmt), #[cfg(feature = "proxy")] InnerConnection::Proxy(conn) => conn.query_one(stmt).await, + #[cfg(feature = "d1")] + InnerConnection::D1(conn) => { + Err(conn_err("D1 transactions do not support query_one_raw. Use D1Connection directly.")) + } #[allow(unreachable_patterns)] _ => Err(conn_err("Disconnected")), } @@ -559,6 +597,10 @@ impl ConnectionTrait for DatabaseTransaction { InnerConnection::Mock(conn) => conn.query_all(stmt), #[cfg(feature = "proxy")] InnerConnection::Proxy(conn) => conn.query_all(stmt).await, + #[cfg(feature = "d1")] + InnerConnection::D1(conn) => { + Err(conn_err("D1 transactions do not support query_all_raw. Use D1Connection directly.")) + } #[allow(unreachable_patterns)] _ => Err(conn_err("Disconnected")), } diff --git a/src/driver/d1.rs b/src/driver/d1.rs new file mode 100644 index 000000000..086843792 --- /dev/null +++ b/src/driver/d1.rs @@ -0,0 +1,935 @@ +//! Cloudflare D1 database driver for Sea-ORM +//! +//! This module provides native D1 support for Sea-ORM, allowing you to use +//! Sea-ORM directly with Cloudflare Workers without needing to implement +//! the `ProxyDatabaseTrait`. +//! +//! # Architecture Overview +//! +//! Due to `wasm-bindgen` futures not being `Send`, the standard `ConnectionTrait` +//! and `TransactionTrait` cannot be implemented for D1. The driver provides: +//! +//! - [`D1Connection`]: Direct database access via raw SQL statements +//! - [`D1QueryExecutor`]: Entity/ActiveRecord-style queries with `Entity::find()` +//! +//! # Usage Patterns +//! +//! ## 1. Direct SQL Access +//! +//! ```ignore +//! use sea_orm::{DbBackend, D1Connection, Statement, Value}; +//! +//! let stmt = Statement::from_sql_and_values( +//! DbBackend::Sqlite, +//! "SELECT * FROM users WHERE id = ?", +//! vec![Value::from(1)], +//! ); +//! let users = d1_conn.query_all(stmt).await?; +//! ``` +//! +//! ## 2. Entity Queries (via D1QueryExecutor) +//! +//! ```ignore +//! use sea_orm::{D1QueryExecutor, EntityTrait}; +//! +//! let cakes: Vec = d1_conn.find_all(cake::Entity::find()).await?; +//! ``` +//! +//! # Supported Types +//! +//! D1 supports the following Sea-ORM value types: +//! - Numeric: i8, i16, i32, i64, u8, u16, u32, u64, f32, f64 +//! - String: String, &str +//! - Binary: Vec (stored as hex string) +//! - JSON: serde_json::Value +//! - DateTime: chrono::DateTime types (stored as RFC3339 strings) +//! - Decimal: rust_decimal::Decimal, bigdecimal::BigDecimal +//! - UUID: uuid::Uuid +//! - Network: ipnetwork::IpNetwork +//! +//! # Limitations +//! +//! - **Transactions**: D1 has limited transaction support. Use [`D1Connection::transaction()`] +//! directly, but be aware D1 doesn't guarantee ACID transactions. +//! - **Streaming**: D1 does not support streaming queries. Use `query_all()` to load all results. +//! - **Join queries**: [`D1QueryExecutor`] only supports simple `Select` queries. +//! For joins, use raw SQL with [`D1Connection`]. +//! - **No `ConnectionTrait`**: The standard Sea-ORM connection interface isn't available. +//! Use [`D1Connection`] methods directly or [`D1QueryExecutor`] for Entity operations. + +//! # Entity Support with D1QueryExecutor +//! +//! Due to `wasm-bindgen` futures not being `Send`, the standard `ConnectionTrait` +//! cannot be implemented for D1. However, you can still use Entity queries via +//! the [`D1QueryExecutor`] trait: +//! +//! ```ignore +//! use sea_orm::{EntityTrait, D1QueryExecutor}; +//! +//! async fn fetch(req: Request, env: Env, _ctx: Context) -> Result { +//! let d1 = env.d1("DB")?; +//! let db = sea_orm::Database::connect_d1(d1).await?; +//! let d1_conn = db.as_d1_connection(); +//! +//! // Use Entity::find() with D1! +//! let cakes: Vec = d1_conn.find_all(cake::Entity::find()).await?; +//! +//! // Or find one +//! let cake: Option = d1_conn.find_one(cake::Entity::find_by_id(1)).await?; +//! +//! // With filters +//! let filtered: Vec = d1_conn +//! .find_all(cake::Entity::find().filter(cake::Column::Name.contains("chocolate"))) +//! .await?; +//! +//! Ok(Response::ok("Hello")?) +//! } +//! ``` + +use futures_util::lock::Mutex; +use sea_query::Values; +use std::{pin::Pin, sync::Arc}; +use tracing::instrument; +use worker::wasm_bindgen::JsValue; + +use crate::{ + AccessMode, DatabaseConnection, DatabaseConnectionType, DatabaseTransaction, DbErr, ExecResult, + FromQueryResult, IsolationLevel, QueryResult, Statement, TransactionError, Value, debug_print, + error::*, executor::*, +}; + +/// D1 Connector for Sea-ORM +/// +/// This struct is used to create a connection to a D1 database. +#[derive(Debug)] +pub struct D1Connector; + +/// A D1 database connection +/// +/// This wraps a `worker::d1::D1Database` instance using `Arc` for cheap cloning, +/// since D1 connections are stateless and can be shared across threads. +#[derive(Clone)] +pub struct D1Connection { + pub(crate) d1: Arc, + pub(crate) metric_callback: Option, +} + +impl std::fmt::Debug for D1Connection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "D1Connection {{ d1: Arc }}") + } +} + +impl From for D1Connection { + fn from(d1: worker::d1::D1Database) -> Self { + D1Connection { + d1: Arc::new(d1), + metric_callback: None, + } + } +} + +/// Result from executing a D1 query +#[derive(Debug, Clone)] +pub struct D1ExecResult { + /// The last inserted row ID + pub last_insert_id: u64, + /// The number of rows affected + pub rows_affected: u64, +} + +/// A row returned from D1 +/// +/// This wraps the raw D1 row data which comes as `serde_json::Value`. +#[derive(Debug, Clone)] +pub struct D1Row { + pub(crate) row: serde_json::Value, +} + +impl D1Connector { + /// Create a connection to a D1 database + /// + /// This takes a `worker::d1::D1Database` instance directly, which you can obtain + /// from the Cloudflare Workers environment. + /// + /// # Example + /// + /// ```ignore + /// let d1 = env.d1("DB")?; + /// let db = D1Connector::connect(d1).await?; + /// ``` + #[instrument(level = "trace")] + pub async fn connect(d1: worker::d1::D1Database) -> Result { + let conn = D1Connection::from(d1); + Ok(DatabaseConnectionType::D1Connection(conn).into()) + } +} + +impl D1Connection { + /// Execute a prepared statement on D1 + #[instrument(level = "trace")] + pub async fn execute(&self, stmt: Statement) -> Result { + debug_print!("{}", stmt); + + let sql = stmt.sql.clone(); + let values = stmt + .values + .as_ref() + .cloned() + .unwrap_or_else(|| Values(Vec::new())); + + crate::metric::metric!(self.metric_callback, &stmt, { + match self.execute_inner(&sql, &values).await { + Ok(result) => Ok(result.into()), + Err(err) => Err(d1_error_to_exec_err(err)), + } + }) + } + + /// Execute an unprepared SQL statement on D1 + #[instrument(level = "trace")] + pub async fn execute_unprepared(&self, sql: &str) -> Result { + debug_print!("{}", sql); + + let values = Values(Vec::new()); + + match self.execute_inner(sql, &values).await { + Ok(result) => Ok(result.into()), + Err(err) => Err(d1_error_to_exec_err(err)), + } + } + + /// Query a single row from D1 + #[instrument(level = "trace")] + pub async fn query_one(&self, stmt: Statement) -> Result, DbErr> { + debug_print!("{}", stmt); + + let sql = stmt.sql.clone(); + let values = stmt + .values + .as_ref() + .cloned() + .unwrap_or_else(|| Values(Vec::new())); + + crate::metric::metric!(self.metric_callback, &stmt, { + match self.query_inner(&sql, &values).await { + Ok(rows) => Ok(rows.into_iter().next().map(|r| r.into())), + Err(err) => Err(d1_error_to_query_err(err)), + } + }) + } + + /// Query all rows from D1 + #[instrument(level = "trace")] + pub async fn query_all(&self, stmt: Statement) -> Result, DbErr> { + debug_print!("{}", stmt); + + let sql = stmt.sql.clone(); + let values = stmt + .values + .as_ref() + .cloned() + .unwrap_or_else(|| Values(Vec::new())); + + crate::metric::metric!(self.metric_callback, &stmt, { + match self.query_inner(&sql, &values).await { + Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), + Err(err) => Err(d1_error_to_query_err(err)), + } + }) + } + + /// Begin a transaction + /// + /// # D1 Transaction Limitations + /// + /// **Important:** D1 has limited transaction support compared to traditional databases: + /// - **No ACID guarantees**: D1 does not provide full ACID transaction semantics + /// - **No isolation levels**: Isolation levels are not supported and will be ignored + /// - **No access mode control**: Read-only vs read-write modes are not enforced + /// - **Best-effort only**: Each statement is executed independently; if one fails, + /// previous statements are not automatically rolled back + /// - **No savepoints**: Nested transactions are not supported + /// + /// For production use cases requiring strong transactional guarantees, consider + /// using a different database backend or implementing application-level compensation logic. + /// + /// # Example + /// + /// ```ignore + /// let tx = d1_conn.begin(None, None).await?; + /// + /// // Execute operations... + /// let result = d1_conn.execute(stmt1).await; + /// + /// // Commit or rollback + /// tx.commit().await?; + /// ``` + #[instrument(level = "trace")] + pub async fn begin( + &self, + isolation_level: Option, + access_mode: Option, + ) -> Result { + if isolation_level.is_some() { + tracing::warn!("Setting isolation level in a D1 transaction isn't supported"); + } + if access_mode.is_some() { + tracing::warn!("Setting access mode in a D1 transaction isn't supported"); + } + + // D1 doesn't support explicit transactions in the traditional sense. + // Each statement is executed independently. + DatabaseTransaction::new_d1(self.d1.clone(), self.metric_callback.clone()).await + } + + /// Execute a function inside a transaction + /// + /// # D1 Transaction Limitations + /// + /// **Important:** This method provides a transaction-like interface, but due to D1's + /// limitations, it cannot provide full ACID guarantees: + /// + /// - **Partial failure risk**: If the callback fails partway through, earlier statements + /// may have already been committed by D1 and cannot be rolled back + /// - **No atomicity**: Operations are not executed atomically + /// - **No consistency guarantees**: Database constraints may be violated between statements + /// - **Isolation and access mode**: These parameters are ignored + /// + /// For production use requiring strong guarantees, consider implementing + /// idempotent operations or application-level compensation logic. + /// + /// # Example + /// + /// ```ignore + /// d1_conn.transaction(|tx| Box::pin(async move { + /// // Your operations here... + /// Ok(result) + /// }), None, None).await?; + /// ``` + #[instrument(level = "trace", skip(callback))] + pub async fn transaction( + &self, + callback: F, + isolation_level: Option, + access_mode: Option, + ) -> Result> + where + F: for<'b> FnOnce( + &'b DatabaseTransaction, + ) -> Pin> + Send + 'b>> + + Send, + T: Send, + E: std::fmt::Display + std::fmt::Debug + Send, + { + let transaction = + DatabaseTransaction::new_d1(self.d1.clone(), self.metric_callback.clone()) + .await + .map_err(|e| TransactionError::Connection(e))?; + transaction.run(callback).await + } + + /// Check if the connection is still valid + pub async fn ping(&self) -> Result<(), DbErr> { + // D1 doesn't have a ping method, so we execute a simple query + // to check if the connection is still valid + match self.query_inner("SELECT 1", &Values(Vec::new())).await { + Ok(_) => Ok(()), + Err(err) => Err(d1_error_to_conn_err(err)), + } + } + + /// Close the connection + pub async fn close_by_ref(&self) -> Result<(), DbErr> { + // D1 doesn't need explicit closing - it's managed by the worker runtime + Ok(()) + } + + /// Internal method to execute SQL and get execution result + /// + /// Note: D1 always uses prepared statements, so there's no unprepared execution path. + async fn execute_inner( + &self, + sql: &str, + values: &Values, + ) -> Result { + let js_values = values_to_js_values(values)?; + + let prepared = self + .d1 + .prepare(sql) + .bind(&js_values) + .map_err(|e| D1Error::Prepare(e.into()))?; + + let result = prepared + .run() + .await + .map_err(|e| D1Error::Execute(e.into()))?; + let meta = result.meta().map_err(|e| D1Error::Meta(e.into()))?; + + let (last_insert_id, rows_affected) = match meta { + Some(m) => ( + m.last_row_id.unwrap_or(0) as u64, + m.rows_written.unwrap_or(0) as u64, + ), + None => (0, 0), + }; + + Ok(D1ExecResult { + last_insert_id, + rows_affected, + }) + } + + /// Internal method to query and get rows + async fn query_inner(&self, sql: &str, values: &Values) -> Result, D1Error> { + let js_values = values_to_js_values(values)?; + + let prepared = self + .d1 + .prepare(sql) + .bind(&js_values) + .map_err(|e| D1Error::Prepare(e.into()))?; + + let result = prepared.all().await.map_err(|e| D1Error::Query(e.into()))?; + + if let Some(error) = result.error() { + return Err(D1Error::Response(error.to_string())); + } + + let results: Vec = + result.results().map_err(|e| D1Error::Results(e.into()))?; + + let rows: Vec = results.into_iter().map(|row| D1Row { row }).collect(); + + Ok(rows) + } +} + +/// Set the metric callback for this connection +impl D1Connection { + pub(crate) fn set_metric_callback(&mut self, callback: F) + where + F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static, + { + self.metric_callback = Some(Arc::new(callback)); + } +} + +impl From for QueryResult { + fn from(row: D1Row) -> Self { + QueryResult { + row: QueryResultRow::D1(row), + } + } +} + +impl From for ExecResult { + fn from(result: D1ExecResult) -> Self { + ExecResult { + result: ExecResultHolder::D1(result), + } + } +} + +/// Internal D1 error type +#[derive(Debug, thiserror::Error)] +enum D1Error { + #[error("D1 prepare error: {0:?}")] + Prepare(JsValue), + #[error("D1 execute error: {0:?}")] + Execute(JsValue), + #[error("D1 query error: {0:?}")] + Query(JsValue), + #[error("D1 response error: {0}")] + Response(String), + #[error("D1 meta error: {0:?}")] + Meta(JsValue), + #[error("D1 results error: {0:?}")] + Results(JsValue), +} + +/// Convert D1 values to JS values for binding +fn values_to_js_values(values: &Values) -> Result, D1Error> { + values.0.iter().map(value_to_js_value).collect() +} + +/// Convert a Sea-ORM Value to a JS Value for D1 +fn value_to_js_value(val: &Value) -> Result { + match val { + Value::Bool(Some(v)) => Ok(JsValue::from(*v)), + Value::Int(Some(v)) => Ok(JsValue::from(*v)), + Value::BigInt(Some(v)) => Ok(JsValue::from(v.to_string())), + Value::SmallInt(Some(v)) => Ok(JsValue::from(*v)), + Value::TinyInt(Some(v)) => Ok(JsValue::from(*v)), + Value::Unsigned(Some(v)) => Ok(JsValue::from(*v)), + Value::BigUnsigned(Some(v)) => Ok(JsValue::from(v.to_string())), + Value::SmallUnsigned(Some(v)) => Ok(JsValue::from(*v)), + Value::TinyUnsigned(Some(v)) => Ok(JsValue::from(*v)), + Value::Float(Some(v)) => Ok(JsValue::from_f64(*v as f64)), + Value::Double(Some(v)) => Ok(JsValue::from_f64(*v)), + Value::String(Some(v)) => Ok(JsValue::from(v.as_str())), + Value::Char(Some(v)) => Ok(JsValue::from(v.to_string())), + Value::Bytes(Some(v)) => { + // Convert bytes to hex string for D1 + let hex: String = v.iter().map(|byte| format!("{:02x}", byte)).collect(); + Ok(JsValue::from(format!("X'{}'", hex))) + } + Value::Json(Some(v)) => Ok(JsValue::from(v.to_string())), + #[cfg(feature = "with-chrono")] + Value::ChronoDate(Some(v)) => Ok(JsValue::from(v.to_string())), + #[cfg(feature = "with-chrono")] + Value::ChronoTime(Some(v)) => Ok(JsValue::from(v.to_string())), + #[cfg(feature = "with-chrono")] + Value::ChronoDateTime(Some(v)) => Ok(JsValue::from(v.to_string())), + #[cfg(feature = "with-chrono")] + Value::ChronoDateTimeUtc(Some(v)) => Ok(JsValue::from(v.to_string())), + #[cfg(feature = "with-chrono")] + Value::ChronoDateTimeLocal(Some(v)) => Ok(JsValue::from(v.to_string())), + #[cfg(feature = "with-chrono")] + Value::ChronoDateTimeWithTimeZone(Some(v)) => Ok(JsValue::from(v.to_string())), + #[cfg(feature = "with-time")] + Value::TimeDate(Some(v)) => Ok(JsValue::from(v.to_string())), + #[cfg(feature = "with-time")] + Value::TimeTime(Some(v)) => Ok(JsValue::from(v.to_string())), + #[cfg(feature = "with-time")] + Value::TimeDateTime(Some(v)) => Ok(JsValue::from(v.to_string())), + #[cfg(feature = "with-time")] + Value::TimeDateTimeWithTimeZone(Some(v)) => Ok(JsValue::from(v.to_string())), + // Null values and unsupported types + Value::Bool(None) + | Value::Int(None) + | Value::BigInt(None) + | Value::SmallInt(None) + | Value::TinyInt(None) + | Value::Unsigned(None) + | Value::BigUnsigned(None) + | Value::SmallUnsigned(None) + | Value::TinyUnsigned(None) + | Value::Float(None) + | Value::Double(None) + | Value::String(None) + | Value::Char(None) + | Value::Bytes(None) + | Value::Json(None) => Ok(JsValue::NULL), + #[cfg(feature = "with-chrono")] + Value::ChronoDate(None) + | Value::ChronoTime(None) + | Value::ChronoDateTime(None) + | Value::ChronoDateTimeUtc(None) + | Value::ChronoDateTimeLocal(None) + | Value::ChronoDateTimeWithTimeZone(None) => Ok(JsValue::NULL), + #[cfg(feature = "with-time")] + Value::TimeDate(None) + | Value::TimeTime(None) + | Value::TimeDateTime(None) + | Value::TimeDateTimeWithTimeZone(None) => Ok(JsValue::NULL), + // Unsupported types - log error and return NULL + // Note: In strict mode, this should return an error instead + #[allow(unreachable_patterns)] + val => { + tracing::error!( + "D1 does not support value type {:?} - data will be lost (converting to NULL). \ + Use a supported type (bool, i8, i16, i32, i64, u8, u16, u32, u64, f32, f64, String, Vec, serde_json::Value, chrono types, time types). \ + Consider enabling strict mode to catch these errors at development time.", + std::mem::discriminant(val) + ); + Ok(JsValue::NULL) + } + } +} + +/// Convert D1 error to DbErr for execution +fn d1_error_to_exec_err(err: D1Error) -> DbErr { + DbErr::Query(RuntimeErr::Internal(format!("D1 execute error: {}", err))) +} + +/// Convert D1 error to DbErr for queries +fn d1_error_to_query_err(err: D1Error) -> DbErr { + DbErr::Query(RuntimeErr::Internal(format!("D1 query error: {}", err))) +} + +/// Convert D1 error to DbErr for connection +fn d1_error_to_conn_err(err: D1Error) -> DbErr { + DbErr::Conn(RuntimeErr::Internal(format!( + "D1 connection error: {}", + err + ))) +} + +/// Internal helper for converting D1 values to target types +/// +/// This provides a MockRow-like interface for value conversion without +/// requiring the mock feature to be enabled. +#[derive(Debug, Clone)] +pub(crate) struct D1ValueWrapper { + values: std::collections::BTreeMap, +} + +impl D1ValueWrapper { + /// Create a new wrapper with a single value + pub(crate) fn with_value(key: String, value: Value) -> Self { + let mut values = std::collections::BTreeMap::new(); + values.insert(key, value); + Self { values } + } + + /// Get a value from the wrapper + pub(crate) fn try_get(&self, index: &str) -> Result + where + T: sea_query::ValueType, + { + T::try_from( + self.values + .get(index) + .ok_or_else(|| query_err(format!("No column for index {index:?}")))? + .clone(), + ) + .map_err(type_err) + } +} + +/// Convert D1 JSON row to Sea-ORM values +pub(crate) fn d1_row_to_values(row: &D1Row) -> Vec<(String, Value)> { + let mut values = Vec::new(); + + if let Some(obj) = row.row.as_object() { + for (key, value) in obj { + let sea_value = d1_json_to_value(value); + values.push((key.clone(), sea_value)); + } + } + + values +} + +/// Convert D1 JSON value to Sea-ORM Value +fn d1_json_to_value(json: &serde_json::Value) -> Value { + match json { + serde_json::Value::Null => Value::Bool(None), + serde_json::Value::Bool(v) => Value::Bool(Some(*v)), + serde_json::Value::Number(v) => { + if let Some(i) = v.as_i64() { + Value::BigInt(Some(i)) + } else if let Some(u) = v.as_u64() { + Value::BigUnsigned(Some(u)) + } else if let Some(f) = v.as_f64() { + Value::Double(Some(f)) + } else { + Value::Double(None) + } + } + serde_json::Value::String(v) => Value::String(Some(v.clone())), + serde_json::Value::Array(_) | serde_json::Value::Object(_) => { + Value::Json(Some(Box::new(json.clone()))) + } + } +} + +impl D1Row { + /// Try to get a value from this D1 row by column name or index + pub fn try_get_by(&self, idx: I) -> Result { + let values = d1_row_to_values(self); + let col_name = idx.as_str().ok_or_else(|| { + crate::TryGetError::Null(format!("D1 row doesn't support numeric index: {:?}", idx)) + })?; + + values + .iter() + .find(|(name, _)| name == col_name) + .map(|(_, v)| v.clone()) + .ok_or_else(|| { + crate::TryGetError::Null(format!("Column '{}' not found in D1 row", col_name)) + }) + } +} + +impl crate::DatabaseTransaction { + pub(crate) async fn new_d1( + d1: Arc, + metric_callback: Option, + ) -> Result { + Self::begin( + Arc::new(Mutex::new(crate::InnerConnection::D1(d1))), + crate::DbBackend::Sqlite, + metric_callback, + None, + None, + ) + .await + } +} + +/// A trait for executing Entity queries on D1. +/// +/// This trait enables `Entity::find()` operations with D1 by providing +/// methods that take `Select` directly and execute them on D1. +/// +/// Due to `wasm-bindgen` futures not being `Send`, the standard `ConnectionTrait` +/// cannot be implemented for D1. This trait provides an alternative way to use +/// Entity queries with D1. +/// +/// # Example +/// +/// ```ignore +/// use sea_orm::{EntityTrait, D1QueryExecutor}; +/// +/// let cakes: Vec = d1_conn.find_all(cake::Entity::find()).await?; +/// let cake: Option = d1_conn.find_one(cake::Entity::find_by_id(1)).await?; +/// ``` +/// +/// # Limitations +/// +/// - **Transactions**: D1 has limited transaction support. Use [`D1Connection::transaction()`] +/// directly for transactional operations. +/// - **Streaming**: D1 does not support streaming queries. Use `find_all()` to load all results. +/// - **Join queries**: Only simple `Select` queries are supported, not `SelectTwo` or `SelectTwoMany`. +/// - **No `ConnectionTrait`**: This trait provides Entity query support but doesn't implement +/// the full `ConnectionTrait` interface. +/// +/// For operations not covered by this trait, use [`D1Connection`] directly with +/// [`Statement`](crate::Statement) and the [`execute`](D1Connection::execute), +/// [`query_one`](D1Connection::query_one), and [`query_all`](D1Connection::query_all) methods. +pub trait D1QueryExecutor { + /// Execute a `Select` and return all matching models. + /// + /// This allows you to use `Entity::find()` with D1: + /// + /// ```ignore + /// let cakes: Vec = d1_conn.find_all(cake::Entity::find()).await?; + /// ``` + /// + /// # Ordering and Filtering + /// + /// ```ignore + /// use sea_orm::{EntityTrait, QueryOrder}; + /// + /// let cakes: Vec = d1_conn + /// .find_all( + /// cake::Entity::find() + /// .filter(cake::Column::Name.contains("chocolate")) + /// .order_by_asc(cake::Column::Name) + /// ) + /// .await?; + /// ``` + fn find_all( + &self, + select: crate::Select, + ) -> impl std::future::Future, DbErr>> + where + E: crate::EntityTrait; + + /// Execute a `Select` and return at most one model. + /// + /// This is useful for `Entity::find_by_id()` or queries with limits: + /// + /// ```ignore + /// let cake: Option = d1_conn.find_one(cake::Entity::find_by_id(1)).await?; + /// ``` + fn find_one( + &self, + select: crate::Select, + ) -> impl std::future::Future, DbErr>> + where + E: crate::EntityTrait; + + /// Build a `Statement` from a `Select` for manual execution. + /// + /// This allows you to get the SQL statement for debugging or custom execution: + /// + /// ```ignore + /// let stmt = d1_conn.build_statement(cake::Entity::find().filter( + /// cake::Column::Name.contains("chocolate") + /// )); + /// ``` + fn build_statement(&self, select: crate::Select) -> Statement + where + E: crate::EntityTrait; +} + +impl D1QueryExecutor for D1Connection { + #[allow(clippy::manual_async_fn)] + fn find_all( + &self, + select: crate::Select, + ) -> impl std::future::Future, DbErr>> + where + E: crate::EntityTrait, + { + async move { + let stmt = self.build_statement(select); + let results = self.query_all(stmt).await?; + results + .into_iter() + .map(|row| E::Model::from_query_result(&row, "")) + .collect() + } + } + + #[allow(clippy::manual_async_fn)] + fn find_one( + &self, + select: crate::Select, + ) -> impl std::future::Future, DbErr>> + where + E: crate::EntityTrait, + { + async move { + let stmt = self.build_statement(select); + let result = self.query_one(stmt).await?; + match result { + Some(row) => E::Model::from_query_result(&row, "").map(Some), + None => Ok(None), + } + } + } + + fn build_statement(&self, select: crate::Select) -> Statement + where + E: crate::EntityTrait, + { + crate::DbBackend::Sqlite.build(&select.query) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Test conversion of D1 JSON null to Sea-ORM Value + #[test] + fn test_d1_null_conversion() { + let json = serde_json::Value::Null; + let value = d1_json_to_value(&json); + assert_eq!(value, Value::Bool(None)); + } + + /// Test conversion of D1 JSON bool to Sea-ORM Value + #[test] + fn test_d1_bool_conversion() { + let json = serde_json::Value::Bool(true); + let value = d1_json_to_value(&json); + assert_eq!(value, Value::Bool(Some(true))); + + let json = serde_json::Value::Bool(false); + let value = d1_json_to_value(&json); + assert_eq!(value, Value::Bool(Some(false))); + } + + /// Test conversion of D1 JSON number (i64) to Sea-ORM Value + #[test] + fn test_d1_i64_conversion() { + let json = serde_json::json!(42); + let value = d1_json_to_value(&json); + assert_eq!(value, Value::BigInt(Some(42))); + } + + /// Test conversion of D1 JSON number (u64) to Sea-ORM Value + #[test] + fn test_d1_u64_conversion() { + // D1 returns numbers as i64 when they fit, test the u64 path + let json = serde_json::json!(9999999999999999999u64); + let value = d1_json_to_value(&json); + assert!(matches!(value, Value::BigUnsigned(_))); + } + + /// Test conversion of D1 JSON number (f64) to Sea-ORM Value + #[test] + fn test_d1_f64_conversion() { + let json = serde_json::json!(3.14159); + let value = d1_json_to_value(&json); + assert_eq!(value, Value::Double(Some(3.14159))); + } + + /// Test conversion of D1 JSON string to Sea-ORM Value + #[test] + fn test_d1_string_conversion() { + let json = serde_json::json!("hello world"); + let value = d1_json_to_value(&json); + assert_eq!(value, Value::String(Some("hello world".to_string()))); + } + + /// Test conversion of D1 JSON array to Sea-ORM Value (as JSON) + #[test] + fn test_d1_array_conversion() { + let json = serde_json::json!([1, 2, 3]); + let value = d1_json_to_value(&json); + assert!(matches!(value, Value::Json(Some(_)))); + } + + /// Test conversion of D1 JSON object to Sea-ORM Value (as JSON) + #[test] + fn test_d1_object_conversion() { + let json = serde_json::json!({"key": "value"}); + let value = d1_json_to_value(&json); + assert!(matches!(value, Value::Json(Some(_)))); + } + + /// Test d1_row_to_values function + #[test] + fn test_d1_row_to_values() { + let row = D1Row { + row: serde_json::json!({ + "id": 1, + "name": "Chocolate Cake", + "price": 9.99, + "available": true + }), + }; + + let values = d1_row_to_values(&row); + assert_eq!(values.len(), 4); + + let id_value = values.iter().find(|(k, _)| k == "id").unwrap().1.clone(); + assert_eq!(id_value, Value::BigInt(Some(1))); + + let name_value = values.iter().find(|(k, _)| k == "name").unwrap().1.clone(); + assert_eq!( + name_value, + Value::String(Some("Chocolate Cake".to_string())) + ); + } + + /// Test D1Row try_get_by with valid column + #[test] + fn test_d1_row_try_get_by_valid() { + let row = D1Row { + row: serde_json::json!({ + "id": 42, + "name": "Test" + }), + }; + + let id_value = row.try_get_by("id").unwrap(); + assert_eq!(id_value, Value::BigInt(Some(42))); + + let name_value = row.try_get_by("name").unwrap(); + assert_eq!(name_value, Value::String(Some("Test".to_string()))); + } + + /// Test D1Row try_get_by with missing column + #[test] + fn test_d1_row_try_get_by_missing() { + let row = D1Row { + row: serde_json::json!({ + "id": 42 + }), + }; + + // Missing column should return an error, not panic + let result = row.try_get_by("nonexistent"); + assert!(result.is_err()); + } + + /// Test D1ExecResult creation + #[test] + fn test_d1_exec_result() { + let result = D1ExecResult { + last_insert_id: 123, + rows_affected: 5, + }; + + assert_eq!(result.last_insert_id, 123); + assert_eq!(result.rows_affected, 5); + } +} diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 05936460f..70d1f4a4d 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "d1")] +pub(crate) mod d1; #[cfg(feature = "mock")] mod mock; #[cfg(feature = "proxy")] @@ -15,6 +17,8 @@ pub(crate) mod sqlx_postgres; #[cfg(feature = "sqlx-sqlite")] pub(crate) mod sqlx_sqlite; +#[cfg(feature = "d1")] +pub use d1::*; #[cfg(feature = "mock")] pub use mock::*; #[cfg(feature = "proxy")] diff --git a/src/executor/execute.rs b/src/executor/execute.rs index d05c15fd2..e5211765e 100644 --- a/src/executor/execute.rs +++ b/src/executor/execute.rs @@ -28,6 +28,9 @@ pub(crate) enum ExecResultHolder { /// Holds the result of executing an operation on the Proxy database #[cfg(feature = "proxy")] Proxy(crate::ProxyExecResult), + /// Holds the result of executing an operation on D1 + #[cfg(feature = "d1")] + D1(crate::driver::d1::D1ExecResult), } // ExecResult // @@ -68,6 +71,8 @@ impl ExecResult { ExecResultHolder::Mock(result) => result.last_insert_id, #[cfg(feature = "proxy")] ExecResultHolder::Proxy(result) => result.last_insert_id, + #[cfg(feature = "d1")] + ExecResultHolder::D1(result) => result.last_insert_id, #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -88,6 +93,8 @@ impl ExecResult { ExecResultHolder::Mock(result) => result.rows_affected, #[cfg(feature = "proxy")] ExecResultHolder::Proxy(result) => result.rows_affected, + #[cfg(feature = "d1")] + ExecResultHolder::D1(result) => result.rows_affected, #[allow(unreachable_patterns)] _ => unreachable!(), } diff --git a/src/executor/query.rs b/src/executor/query.rs index 89ac4e125..89d5c91ee 100644 --- a/src/executor/query.rs +++ b/src/executor/query.rs @@ -33,6 +33,8 @@ pub(crate) enum QueryResultRow { Mock(crate::MockRow), #[cfg(feature = "proxy")] Proxy(crate::ProxyRow), + #[cfg(feature = "d1")] + D1(crate::driver::d1::D1Row), } /// An interface to get a value from the query result @@ -173,6 +175,14 @@ impl QueryResult { .into_column_value_tuples() .map(|(c, _)| c.to_string()) .collect(), + #[cfg(feature = "d1")] + QueryResultRow::D1(row) => { + if let Some(obj) = row.row.as_object() { + obj.keys().cloned().collect() + } else { + Vec::new() + } + } #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -227,6 +237,16 @@ impl QueryResult { _ => None, } } + + /// Access the underlying `D1Row` if we use D1. + #[cfg(feature = "d1")] + pub fn try_as_d1_row(&self) -> Option<&crate::driver::d1::D1Row> { + match &self.row { + QueryResultRow::D1(d1_row) => Some(d1_row), + #[allow(unreachable_patterns)] + _ => None, + } + } } #[allow(unused_variables)] @@ -245,6 +265,8 @@ impl Debug for QueryResultRow { Self::Mock(row) => write!(f, "{row:?}"), #[cfg(feature = "proxy")] Self::Proxy(row) => write!(f, "{row:?}"), + #[cfg(feature = "d1")] + Self::D1(row) => write!(f, "{row:?}"), #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -404,6 +426,19 @@ macro_rules! try_getable_all { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "d1")] + QueryResultRow::D1(row) => { + let val = row.try_get_by(idx)?; + // Convert Value to the target type using D1ValueWrapper + let col_name = idx.as_str() + .ok_or_else(|| err_null_idx_col(idx))? + .to_string(); + let wrapper = crate::driver::d1::D1ValueWrapper::with_value( + col_name.clone(), + val, + ); + wrapper.try_get(&col_name).map_err(|e| e.into()) + } #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -448,6 +483,19 @@ macro_rules! try_getable_unsigned { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "d1")] + QueryResultRow::D1(row) => { + let val = row.try_get_by(idx)?; + // Convert Value to the target type using D1ValueWrapper + let col_name = idx.as_str() + .ok_or_else(|| err_null_idx_col(idx))? + .to_string(); + let wrapper = crate::driver::d1::D1ValueWrapper::with_value( + col_name.clone(), + val, + ); + wrapper.try_get(&col_name).map_err(|e| e.into()) + } #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -495,6 +543,19 @@ macro_rules! try_getable_mysql { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "d1")] + QueryResultRow::D1(row) => { + let val = row.try_get_by(idx)?; + // Convert Value to the target type using D1ValueWrapper + let col_name = idx.as_str() + .ok_or_else(|| err_null_idx_col(idx))? + .to_string(); + let wrapper = crate::driver::d1::D1ValueWrapper::with_value( + col_name.clone(), + val, + ); + wrapper.try_get(&col_name).map_err(|e| e.into()) + } #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -596,6 +657,26 @@ macro_rules! try_getable_date_time { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "d1")] + QueryResultRow::D1(row) => { + // D1 returns datetime as string, parse it + use chrono::DateTime; + let val: crate::sea_query::Value = row.try_get_by(idx)?; + match val { + crate::sea_query::Value::String(Some(s)) => { + let dt = DateTime::parse_from_rfc3339(&s).map_err(|e| { + crate::error::type_err(format!( + "Failed to parse datetime from D1: {}", + e + )) + })?; + Ok(dt.into()) + } + _ => Err(TryGetError::DbErr( + crate::error::type_err("D1: Expected RFC3339 datetime string, got NULL or non-string value".to_string()) + )), + } + } #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -711,6 +792,27 @@ impl TryGetable for Decimal { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "d1")] + QueryResultRow::D1(row) => { + // D1 returns numbers as JSON, parse from string representation + let val: crate::sea_query::Value = row.try_get_by(idx)?; + // Get as f64 then convert to Decimal + match val { + crate::sea_query::Value::Double(Some(f)) => { + Decimal::try_from(f).map_err(|e| { + DbErr::TryIntoErr { + from: "f64", + into: "Decimal", + source: Arc::new(e), + } + .into() + }) + } + _ => Err(TryGetError::DbErr( + crate::error::type_err("D1: Expected Decimal as f64, got NULL or non-numeric value".to_string()) + )), + } + } #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -779,6 +881,27 @@ impl TryGetable for BigDecimal { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "d1")] + QueryResultRow::D1(row) => { + // D1 returns numbers as JSON, parse from string representation + let val: crate::sea_query::Value = row.try_get_by(idx)?; + // Get as f64 then convert to BigDecimal + match val { + crate::sea_query::Value::Double(Some(f)) => { + BigDecimal::try_from(f).map_err(|e| { + DbErr::TryIntoErr { + from: "f64", + into: "BigDecimal", + source: Arc::new(e), + } + .into() + }) + } + _ => Err(TryGetError::DbErr( + crate::error::type_err("D1: Expected BigDecimal as f64, got NULL or non-numeric value".to_string()) + )), + } + } #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -850,6 +973,21 @@ macro_rules! try_getable_uuid { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "d1")] + QueryResultRow::D1(row) => { + // D1 stores UUIDs as strings + let val: crate::sea_query::Value = row.try_get_by(idx)?; + match val { + crate::sea_query::Value::String(Some(s)) => { + uuid::Uuid::parse_str(&s).map_err(|_| { + TryGetError::DbErr(crate::error::type_err("Invalid UUID".to_owned())) + }) + } + _ => Err(TryGetError::DbErr( + crate::error::type_err("D1: Expected UUID string, got NULL or non-string value".to_string()) + )), + } + } #[allow(unreachable_patterns)] _ => unreachable!(), }; @@ -875,7 +1013,62 @@ try_getable_uuid!(uuid::fmt::Simple, uuid::Uuid::simple); try_getable_uuid!(uuid::fmt::Urn, uuid::Uuid::urn); #[cfg(feature = "with-ipnetwork")] -try_getable_postgres!(ipnetwork::IpNetwork); +impl TryGetable for ipnetwork::IpNetwork { + #[allow(unused_variables)] + fn try_get_by(res: &QueryResult, idx: I) -> Result { + match &res.row { + #[cfg(feature = "sqlx-mysql")] + QueryResultRow::SqlxMySql(_) => { + Err(type_err("ipnetwork unsupported by sqlx-mysql").into()) + } + #[cfg(feature = "sqlx-postgres")] + QueryResultRow::SqlxPostgres(row) => row + .try_get::, _>(idx.as_sqlx_postgres_index()) + .map_err(|e| sqlx_error_to_query_err(e).into()) + .and_then(|opt| opt.ok_or_else(|| err_null_idx_col(idx))), + #[cfg(feature = "sqlx-sqlite")] + QueryResultRow::SqlxSqlite(_) => { + Err(type_err("ipnetwork unsupported by sqlx-sqlite").into()) + } + #[cfg(feature = "rusqlite")] + QueryResultRow::Rusqlite(_) => { + Err(type_err("ipnetwork unsupported by rusqlite").into()) + } + #[cfg(feature = "mock")] + #[allow(unused_variables)] + QueryResultRow::Mock(row) => row.try_get::(idx).map_err(|e| { + debug_print!("{:#?}", e.to_string()); + err_null_idx_col(idx) + }), + #[cfg(feature = "proxy")] + #[allow(unused_variables)] + QueryResultRow::Proxy(row) => { + row.try_get::(idx).map_err(|e| { + debug_print!("{:#?}", e.to_string()); + err_null_idx_col(idx) + }) + } + #[cfg(feature = "d1")] + QueryResultRow::D1(row) => { + // D1 stores IP networks as strings + let val: crate::sea_query::Value = row.try_get_by(idx)?; + match val { + crate::sea_query::Value::String(Some(s)) => { + use std::str::FromStr; + ipnetwork::IpNetwork::from_str(&s).map_err(|_| { + TryGetError::DbErr(DbErr::Type("Invalid IP network format in D1".to_owned())) + }) + } + _ => Err(TryGetError::DbErr( + crate::error::type_err("D1: Expected IP network string, got NULL or non-string value".to_string()) + )), + } + } + #[allow(unreachable_patterns)] + _ => unreachable!(), + } + } +} impl TryGetable for u32 { #[allow(unused_variables)] @@ -931,6 +1124,19 @@ impl TryGetable for u32 { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "d1")] + QueryResultRow::D1(row) => { + let val = row.try_get_by(idx)?; + // Convert Value to the target type using D1ValueWrapper + let col_name = idx.as_str() + .ok_or_else(|| err_null_idx_col(idx))? + .to_string(); + let wrapper = crate::driver::d1::D1ValueWrapper::with_value( + col_name.clone(), + val, + ); + wrapper.try_get(&col_name).map_err(|e| e.into()) + } #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -980,6 +1186,19 @@ impl TryGetable for String { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "d1")] + QueryResultRow::D1(row) => { + let val = row.try_get_by(idx)?; + // Convert Value to the target type using D1ValueWrapper + let col_name = idx.as_str() + .ok_or_else(|| err_null_idx_col(idx))? + .to_string(); + let wrapper = crate::driver::d1::D1ValueWrapper::with_value( + col_name.clone(), + val, + ); + wrapper.try_get(&col_name).map_err(|e| e.into()) + } #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -1037,6 +1256,12 @@ mod postgres_array { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "d1")] + QueryResultRow::D1(_) => Err(type_err(format!( + "{} unsupported by d1 (postgres arrays not supported)", + stringify!($type) + )) + .into()), #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -1141,6 +1366,12 @@ mod postgres_array { err_null_idx_col(idx) }) } + #[cfg(feature = "d1")] + QueryResultRow::D1(_) => Err(type_err(format!( + "{} unsupported by d1 (postgres arrays not supported)", + stringify!($type) + )) + .into()), #[allow(unreachable_patterns)] _ => unreachable!(), }; @@ -1205,6 +1436,12 @@ mod postgres_array { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "d1")] + QueryResultRow::D1(_) => Err(type_err(format!( + "{} unsupported by d1 (postgres arrays not supported)", + stringify!($type) + )) + .into()), #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -1242,6 +1479,8 @@ impl TryGetable for pgvector::Vector { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "d1")] + QueryResultRow::D1(_) => Err(type_err("Vector unsupported by d1").into()), #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -1478,6 +1717,22 @@ where .and_then(|json| { serde_json::from_value(json).map_err(|e| crate::error::json_err(e).into()) }), + #[cfg(feature = "d1")] + QueryResultRow::D1(row) => { + // D1 returns JSON as sea_query::Value, convert to serde_json::Value + let val: crate::sea_query::Value = row.try_get_by(idx)?; + // Extract the inner serde_json::Value from the sea_query::Value + let json = match val { + crate::sea_query::Value::Json(Some(box_val)) => *box_val, + crate::sea_query::Value::Bool(Some(b)) => serde_json::Value::Bool(b), + crate::sea_query::Value::BigInt(Some(i)) => serde_json::Value::from(i), + crate::sea_query::Value::BigUnsigned(Some(u)) => serde_json::Value::from(u), + crate::sea_query::Value::Double(Some(f)) => serde_json::Value::from(f), + crate::sea_query::Value::String(Some(s)) => serde_json::Value::String(s), + _ => serde_json::Value::Null, + }; + serde_json::from_value(json).map_err(|e| crate::error::json_err(e).into()) + } #[allow(unreachable_patterns)] _ => unreachable!(), }