diff --git a/Cargo.toml b/Cargo.toml index 4996808dd..538c4eb60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,5 @@ [workspace] members = [ "clients/rust", + "clients/python", ] diff --git a/clients/python/.gitignore b/clients/python/.gitignore new file mode 100644 index 000000000..c8f044299 --- /dev/null +++ b/clients/python/.gitignore @@ -0,0 +1,72 @@ +/target + +# Byte-compiled / optimized / DLL files +__pycache__/ +.pytest_cache/ +*.py[cod] + +# C extensions +*.so + +# Distribution / packaging +.Python +.venv/ +env/ +bin/ +build/ +develop-eggs/ +dist/ +eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +include/ +man/ +venv/ +*.egg-info/ +.installed.cfg +*.egg + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt +pip-selfcheck.json + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.cache +nosetests.xml +coverage.xml + +# Translations +*.mo + +# Mr Developer +.mr.developer.cfg +.project +.pydevproject + +# Rope +.ropeproject + +# Django stuff: +*.log +*.pot + +.DS_Store + +# Sphinx documentation +docs/_build/ + +# PyCharm +.idea/ + +# VSCode +.vscode/ + +# Pyenv +.python-version diff --git a/clients/python/Cargo.toml b/clients/python/Cargo.toml new file mode 100644 index 000000000..4ab2fcae7 --- /dev/null +++ b/clients/python/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "python_actor_core_client" +version = "0.7.7" +edition = "2021" + +[lib] +name = "python_actor_core_client" +crate-type = ["cdylib"] + +[dependencies] +actor-core-client = { path = "../rust/", version = "0.7.7" } +futures-util = "0.3.31" +once_cell = "1.21.3" +pyo3 = { version = "0.24.0", features = ["extension-module"] } +pyo3-async-runtimes = { version = "0.24.0", features = ["tokio-runtime"] } +serde_json = "1.0.140" +tokio = "1.44.2" diff --git a/clients/python/pyproject.toml b/clients/python/pyproject.toml new file mode 100644 index 000000000..0dae4efcd --- /dev/null +++ b/clients/python/pyproject.toml @@ -0,0 +1,15 @@ +[build-system] +requires = ["maturin>=1.8,<2.0"] +build-backend = "maturin" + +[project] +name = "python_actor_core_client" +requires-python = ">=3.8" +classifiers = [ + "Programming Language :: Rust", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", +] +dynamic = ["version"] +[tool.maturin] +features = ["pyo3/extension-module"] diff --git a/clients/python/requirements.txt b/clients/python/requirements.txt new file mode 100644 index 000000000..a4aee91c2 --- /dev/null +++ b/clients/python/requirements.txt @@ -0,0 +1,3 @@ +-e . +maturin==1.8.3 +# source .venv/bin/activate \ No newline at end of file diff --git a/clients/python/src/events/async/client.rs b/clients/python/src/events/async/client.rs new file mode 100644 index 000000000..3096f3324 --- /dev/null +++ b/clients/python/src/events/async/client.rs @@ -0,0 +1,129 @@ +use std::sync::Arc; + +use actor_core_client::{self as actor_core_rs, CreateOptions, GetOptions, GetWithIdOptions}; +use pyo3::prelude::*; + +use crate::util::{try_opts_from_kwds, PyKwdArgs}; + +use super::handle::ActorHandle; + +#[pyclass(name = "AsyncClient")] +pub struct Client { + client: Arc, +} + +#[pymethods] +impl Client { + #[new] + #[pyo3(signature=( + endpoint, + transport_kind="websocket", + encoding_kind="json" + ))] + fn py_new( + endpoint: &str, + transport_kind: &str, + encoding_kind: &str, + ) -> PyResult { + let transport_kind = try_transport_kind_from_str(&transport_kind)?; + let encoding_kind = try_encoding_kind_from_str(&encoding_kind)?; + let client = actor_core_rs::Client::new( + endpoint.to_string(), + transport_kind, + encoding_kind + ); + + Ok(Client { + client: Arc::new(client) + }) + } + + #[pyo3(signature = (name, **kwds))] + fn get<'a>(&self, py: Python<'a>, name: &str, kwds: Option) -> PyResult> { + let opts = try_opts_from_kwds::(kwds)?; + let name = name.to_string(); + let client = self.client.clone(); + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let handle = client.get(&name, opts).await; + + match handle { + Ok(handle) => Ok(ActorHandle { + handle + }), + Err(e) => Err(py_runtime_err!( + "Failed to get actor: {}", + e + )) + } + }) + } + + #[pyo3(signature = (id, **kwds))] + fn get_with_id<'a>(&self, py: Python<'a>, id: &str, kwds: Option) -> PyResult> { + let opts = try_opts_from_kwds::(kwds)?; + let id = id.to_string(); + let client = self.client.clone(); + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let handle = client.get_with_id(&id, opts).await; + + match handle { + Ok(handle) => Ok(ActorHandle { + handle + }), + Err(e) => Err(py_runtime_err!( + "Failed to get actor: {}", + e + )) + } + }) + } + + #[pyo3(signature = (name, **kwds))] + fn create<'a>(&self, py: Python<'a>, name: &str, kwds: Option) -> PyResult> { + let opts = try_opts_from_kwds::(kwds)?; + let name = name.to_string(); + let client = self.client.clone(); + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let handle = client.create(&name, opts).await; + + match handle { + Ok(handle) => Ok(ActorHandle { + handle + }), + Err(e) => Err(py_runtime_err!( + "Failed to get actor: {}", + e + )) + } + }) + } +} + +fn try_transport_kind_from_str( + transport_kind: &str +) -> PyResult { + match transport_kind { + "websocket" => Ok(actor_core_rs::TransportKind::WebSocket), + "sse" => Ok(actor_core_rs::TransportKind::Sse), + _ => Err(py_value_err!( + "Invalid transport kind: {}", + transport_kind + )), + } +} + +fn try_encoding_kind_from_str( + encoding_kind: &str +) -> PyResult { + match encoding_kind { + "json" => Ok(actor_core_rs::EncodingKind::Json), + "cbor" => Ok(actor_core_rs::EncodingKind::Cbor), + _ => Err(py_value_err!( + "Invalid encoding kind: {}", + encoding_kind + )), + } +} \ No newline at end of file diff --git a/clients/python/src/events/async/handle.rs b/clients/python/src/events/async/handle.rs new file mode 100644 index 000000000..2af619d28 --- /dev/null +++ b/clients/python/src/events/async/handle.rs @@ -0,0 +1,95 @@ + +use actor_core_client::{self as actor_core_rs}; +use pyo3::{prelude::*, types::PyTuple}; + +use crate::util; + +#[pyclass] +pub struct ActorHandle { + pub handle: actor_core_rs::handle::ActorHandle, +} + +#[pymethods] +impl ActorHandle { + #[new] + pub fn new() -> PyResult { + Err(py_runtime_err!( + "Actor handle cannot be instantiated directly", + )) + } + + pub fn action<'a>( + &self, + py: Python<'a>, + method: &str, + args: Vec + ) -> PyResult> { + let method = method.to_string(); + let handle = self.handle.clone(); + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let args = Python::with_gil(|py| util::py_to_json_value(py, &args))?; + let result = handle.action(&method, args).await; + let Ok(result) = result else { + return Err(py_runtime_err!( + "Failed to call action: {:?}", + result.err() + )); + }; + let mut result = Python::with_gil(|py| { + match util::json_to_py_value(py, &vec![result]) { + Ok(value) => Ok( + value.iter() + .map(|x| x.clone().unbind()) + .collect::>() + ), + Err(e) => Err(e), + } + })?; + let Some(result) = result.drain(0..1).next() else { + return Err(py_runtime_err!( + "Expected one result, got {}", + result.len() + )); + }; + + Ok(result) + }) + } + + pub fn on_event<'a>( + &self, + py: Python<'a>, + event_name: &str, + callback: PyObject + ) -> PyResult> { + let event_name = event_name.to_string(); + let handle = self.handle.clone(); + pyo3_async_runtimes::tokio::future_into_py(py, async move { + handle.on_event(&event_name, move |args| { + if let Err(e) = Python::with_gil(|py| -> PyResult<()> { + let args = util::json_to_py_value(py, args)?; + let args = PyTuple::new(py, args)?; + + callback.call(py, args, None)?; + + Ok(()) + }) { + eprintln!("Failed to call event callback: {}", e); + } + }).await; + + Ok(()) + }) + } + + pub fn disconnect<'a>(&self, py: Python<'a>) -> PyResult> { + let handle = self.handle.clone(); + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + handle.disconnect().await; + + Ok(()) + }) + } +} \ No newline at end of file diff --git a/clients/python/src/events/async/mod.rs b/clients/python/src/events/async/mod.rs new file mode 100644 index 000000000..97a804cae --- /dev/null +++ b/clients/python/src/events/async/mod.rs @@ -0,0 +1,11 @@ +use pyo3::prelude::*; + +mod handle; +mod client; + +pub fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add_class::()?; + + + Ok(()) +} \ No newline at end of file diff --git a/clients/python/src/events/mod.rs b/clients/python/src/events/mod.rs new file mode 100644 index 000000000..a5d945c3d --- /dev/null +++ b/clients/python/src/events/mod.rs @@ -0,0 +1,12 @@ +use pyo3::prelude::*; + +mod sync; +mod r#async; + +pub fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { + sync::init_module(m)?; + r#async::init_module(m)?; + + + Ok(()) +} \ No newline at end of file diff --git a/clients/python/src/events/sync/client.rs b/clients/python/src/events/sync/client.rs new file mode 100644 index 000000000..10065a713 --- /dev/null +++ b/clients/python/src/events/sync/client.rs @@ -0,0 +1,108 @@ +use actor_core_client::{self as actor_core_rs, CreateOptions, GetOptions, GetWithIdOptions}; +use pyo3::prelude::*; + +use super::handle::ActorHandle; +use crate::util::{try_opts_from_kwds, PyKwdArgs, SYNC_RUNTIME}; + +#[pyclass(name = "Client")] +pub struct Client { + client: actor_core_rs::Client, +} + +#[pymethods] +impl Client { + #[new] + #[pyo3(signature=( + endpoint, + transport_kind="websocket", + encoding_kind="json" + ))] + fn py_new( + endpoint: &str, + transport_kind: &str, + encoding_kind: &str, + ) -> PyResult { + let transport_kind = try_transport_kind_from_str(&transport_kind)?; + let encoding_kind = try_encoding_kind_from_str(&encoding_kind)?; + let client = actor_core_rs::Client::new( + endpoint.to_string(), + transport_kind, + encoding_kind + ); + + Ok(Client { + client + }) + } + + #[pyo3(signature = (name, **kwds))] + fn get(&self, name: &str, kwds: Option) -> PyResult { + let opts = try_opts_from_kwds::(kwds)?; + let handle = self.client.get(name, opts); + let handle = SYNC_RUNTIME.block_on(handle); + + match handle { + Ok(handle) => Ok(ActorHandle { handle }), + Err(e) => Err(py_runtime_err!( + "Failed to get actor: {}", + e + )) + } + } + + #[pyo3(signature = (id, **kwds))] + fn get_with_id(&self, id: &str, kwds: Option) -> PyResult { + let opts = try_opts_from_kwds::(kwds)?; + let handle = self.client.get_with_id(id, opts); + let handle = SYNC_RUNTIME.block_on(handle); + + match handle { + Ok(handle) => Ok(ActorHandle { handle }), + Err(e) => Err(py_runtime_err!( + "Failed to get actor: {}", + e + )) + } + } + + #[pyo3(signature = (name, **kwds))] + fn create(&self, name: &str, kwds: Option) -> PyResult { + let opts = try_opts_from_kwds::(kwds)?; + let handle = self.client.create(name, opts); + let handle = SYNC_RUNTIME.block_on(handle); + + match handle { + Ok(handle) => Ok(ActorHandle { handle }), + Err(e) => Err(py_runtime_err!( + "Failed to get actor: {}", + e + )) + } + } +} + +fn try_transport_kind_from_str( + transport_kind: &str +) -> PyResult { + match transport_kind { + "websocket" => Ok(actor_core_rs::TransportKind::WebSocket), + "sse" => Ok(actor_core_rs::TransportKind::Sse), + _ => Err(py_value_err!( + "Invalid transport kind: {}", + transport_kind + )), + } +} + +fn try_encoding_kind_from_str( + encoding_kind: &str +) -> PyResult { + match encoding_kind { + "json" => Ok(actor_core_rs::EncodingKind::Json), + "cbor" => Ok(actor_core_rs::EncodingKind::Cbor), + _ => Err(py_value_err!( + "Invalid encoding kind: {}", + encoding_kind + )), + } +} \ No newline at end of file diff --git a/clients/python/src/events/sync/handle.rs b/clients/python/src/events/sync/handle.rs new file mode 100644 index 000000000..0bf9979ee --- /dev/null +++ b/clients/python/src/events/sync/handle.rs @@ -0,0 +1,69 @@ +use actor_core_client::{self as actor_core_rs}; +use pyo3::{prelude::*, types::PyTuple}; + +use crate::util::{self, SYNC_RUNTIME}; + +#[pyclass] +pub struct ActorHandle { + pub handle: actor_core_rs::handle::ActorHandle, +} + +#[pymethods] +impl ActorHandle { + #[new] + pub fn new() -> PyResult { + Err(py_runtime_err!( + "Actor handle cannot be instantiated directly", + )) + } + + pub fn action<'a>( + &self, + py: Python<'a>, + method: &str, + args: Vec + ) -> PyResult> { + let result = self.handle.action( + method, + util::py_to_json_value(py, &args)? + ); + let result = SYNC_RUNTIME.block_on(result); + let Ok(result) = result else { + return Err(py_runtime_err!( + "Failed to call action: {:?}", + result.err() + )); + }; + let mut result = util::json_to_py_value(py, &vec![result])?; + let Some(result) = result.drain(0..1).next() else { + return Err(py_runtime_err!( + "Expected one result, got {}", + result.len() + )); + }; + Ok(result) + } + + pub fn on_event(&self, event_name: &str, callback: PyObject) { + SYNC_RUNTIME.block_on( + self.handle.on_event(event_name, move |args| { + if let Err(e) = Python::with_gil(|py| -> PyResult<()> { + let args = util::json_to_py_value(py, args)?; + let args = PyTuple::new(py, args)?; + + callback.call(py, args, None)?; + + Ok(()) + }) { + eprintln!("Failed to call event callback: {}", e); + } + }) + ); + } + + pub fn disconnect(&self) { + SYNC_RUNTIME.block_on( + self.handle.disconnect() + ) + } +} \ No newline at end of file diff --git a/clients/python/src/events/sync/mod.rs b/clients/python/src/events/sync/mod.rs new file mode 100644 index 000000000..97a804cae --- /dev/null +++ b/clients/python/src/events/sync/mod.rs @@ -0,0 +1,11 @@ +use pyo3::prelude::*; + +mod handle; +mod client; + +pub fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add_class::()?; + + + Ok(()) +} \ No newline at end of file diff --git a/clients/python/src/lib.rs b/clients/python/src/lib.rs new file mode 100644 index 000000000..e708fb6a2 --- /dev/null +++ b/clients/python/src/lib.rs @@ -0,0 +1,15 @@ +use pyo3::prelude::*; + +#[macro_use] +mod util; +mod simple; +mod events; + +#[pymodule] +fn python_actor_core_client(m: &Bound<'_, PyModule>) -> PyResult<()> { + simple::init_module(m)?; + events::init_module(m)?; + + + Ok(()) +} diff --git a/clients/python/src/simple/async/client.rs b/clients/python/src/simple/async/client.rs new file mode 100644 index 000000000..e85cbdacf --- /dev/null +++ b/clients/python/src/simple/async/client.rs @@ -0,0 +1,132 @@ +use std::sync::Arc; + +use actor_core_client::{self as actor_core_rs, CreateOptions, GetOptions, GetWithIdOptions}; +use pyo3::prelude::*; + +use crate::util::{try_opts_from_kwds, PyKwdArgs}; + +use super::handle::{ActorHandle, InnerActorData}; + +#[pyclass(name = "AsyncSimpleClient")] +pub struct Client { + client: Arc, +} + +#[pymethods] +impl Client { + #[new] + #[pyo3(signature=( + endpoint, + transport_kind="websocket", + encoding_kind="json" + ))] + fn py_new( + endpoint: &str, + transport_kind: &str, + encoding_kind: &str, + ) -> PyResult { + let transport_kind = try_transport_kind_from_str(&transport_kind)?; + let encoding_kind = try_encoding_kind_from_str(&encoding_kind)?; + let client = actor_core_rs::Client::new( + endpoint.to_string(), + transport_kind, + encoding_kind + ); + + Ok(Client { + client: Arc::new(client) + }) + } + + #[pyo3(signature = (name, **kwds))] + fn get<'a>(&self, py: Python<'a>, name: &str, kwds: Option) -> PyResult> { + let opts = try_opts_from_kwds::(kwds)?; + let name = name.to_string(); + let client = self.client.clone(); + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let handle = client.get(&name, opts).await; + + match handle { + Ok(handle) => Ok(ActorHandle { + handle, + data: InnerActorData::new(), + }), + Err(e) => Err(py_runtime_err!( + "Failed to get actor: {}", + e + )) + } + }) + } + + #[pyo3(signature = (id, **kwds))] + fn get_with_id<'a>(&self, py: Python<'a>, id: &str, kwds: Option) -> PyResult> { + let opts = try_opts_from_kwds::(kwds)?; + let id = id.to_string(); + let client = self.client.clone(); + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let handle = client.get_with_id(&id, opts).await; + + match handle { + Ok(handle) => Ok(ActorHandle { + handle, + data: InnerActorData::new(), + }), + Err(e) => Err(py_runtime_err!( + "Failed to get actor: {}", + e + )) + } + }) + } + + #[pyo3(signature = (name, **kwds))] + fn create<'a>(&self, py: Python<'a>, name: &str, kwds: Option) -> PyResult> { + let opts = try_opts_from_kwds::(kwds)?; + let name = name.to_string(); + let client = self.client.clone(); + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let handle = client.create(&name, opts).await; + + match handle { + Ok(handle) => Ok(ActorHandle { + handle, + data: InnerActorData::new(), + }), + Err(e) => Err(py_runtime_err!( + "Failed to get actor: {}", + e + )) + } + }) + } +} + +fn try_transport_kind_from_str( + transport_kind: &str +) -> PyResult { + match transport_kind { + "websocket" => Ok(actor_core_rs::TransportKind::WebSocket), + "sse" => Ok(actor_core_rs::TransportKind::Sse), + _ => Err(py_value_err!( + "Invalid transport kind: {}", + transport_kind + )), + } +} + +fn try_encoding_kind_from_str( + encoding_kind: &str +) -> PyResult { + match encoding_kind { + "json" => Ok(actor_core_rs::EncodingKind::Json), + "cbor" => Ok(actor_core_rs::EncodingKind::Cbor), + _ => Err(py_value_err!( + "Invalid encoding kind: {}", + encoding_kind + )), + } +} \ No newline at end of file diff --git a/clients/python/src/simple/async/handle.rs b/clients/python/src/simple/async/handle.rs new file mode 100644 index 000000000..096ec8827 --- /dev/null +++ b/clients/python/src/simple/async/handle.rs @@ -0,0 +1,199 @@ +use std::sync::Arc; +use actor_core_client::{self as actor_core_rs}; +use futures_util::FutureExt; +use pyo3::{prelude::*, types::{PyList, PyString, PyTuple}}; +use tokio::sync::{mpsc, Mutex}; + +use crate::util; + +struct ActorEvent { + name: String, + args: Vec, +} + +pub struct InnerActorData { + event_tx: Mutex>>, +} + +impl InnerActorData { + pub fn new() -> Arc { + Arc::new(Self { + event_tx: Mutex::new(None), + }) + } +} + +impl InnerActorData { + pub async fn on_event( + &self, + event_name: String, + args: &Vec + ) { + let tx = &self.event_tx.lock().await; + let Some(tx) = tx.as_ref() else { + return; + }; + + tx.send(ActorEvent { + name: event_name, + args: args.clone(), + }).await.map_err(|e| { + py_runtime_err!( + "Failed to send via inner tx: {}", + e + ) + }).ok(); + } +} + +#[pyclass] +pub struct ActorHandle { + pub handle: actor_core_rs::handle::ActorHandle, + pub data: Arc, +} + +#[pymethods] +impl ActorHandle { + #[new] + pub fn new() -> PyResult { + Err(py_runtime_err!( + "Actor handle cannot be instantiated directly", + )) + } + + pub fn action<'a>( + &self, + py: Python<'a>, + method: &str, + args: Vec + ) -> PyResult> { + let method = method.to_string(); + let handle = self.handle.clone(); + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let args = Python::with_gil(|py| util::py_to_json_value(py, &args))?; + let result = handle.action(&method, args).await; + let Ok(result) = result else { + return Err(py_runtime_err!( + "Failed to call action: {:?}", + result.err() + )); + }; + let mut result = Python::with_gil(|py| { + match util::json_to_py_value(py, &vec![result]) { + Ok(value) => Ok( + value.iter() + .map(|x| x.clone().unbind()) + .collect::>() + ), + Err(e) => Err(e), + } + })?; + let Some(result) = result.drain(0..1).next() else { + return Err(py_runtime_err!( + "Expected one result, got {}", + result.len() + )); + }; + + Ok(result) + }) + } + + pub fn subscribe<'a>( + &self, + py: Python<'a>, + event_name: &str + ) -> PyResult> { + let event_name = event_name.to_string(); + let data = self.data.clone(); + let handle = self.handle.clone(); + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + handle.on_event(&event_name.clone(), move |args| { + let event_name = event_name.clone(); + let args = args.clone(); + let data = data.clone(); + + tokio::spawn(async move { + data.on_event(event_name, &args).await; + }); + }).await; + + Ok(()) + }) + } + + #[pyo3(signature=(count, timeout=None))] + pub fn receive<'a>( + &self, + py: Python<'a>, + count: u32, + timeout: Option + ) -> PyResult> { + let (tx, mut rx) = mpsc::channel(count as usize); + + let data = self.data.clone(); + pyo3_async_runtimes::tokio::future_into_py(py, async move { + data.event_tx.lock().await.replace(tx); + + let result: Vec = { + let mut events: Vec = Vec::new(); + + loop { + if events.len() >= count as usize { + break; + } + + let timeout_rx_future = match timeout { + Some(timeout) => { + let timeout = std::time::Duration::from_secs_f64(timeout); + tokio::time::timeout(timeout, rx.recv()) + .map(|x| x.unwrap_or(None)).boxed() + }, + None => rx.recv().boxed() + }; + + tokio::select! { + result = timeout_rx_future => { + match result { + Some(event) => events.push(event), + None => break, + } + }, + // TODO: Add more signal support + _ = tokio::signal::ctrl_c() => { + Python::with_gil(|py| py.check_signals())?; + } + }; + } + + Ok::<_, PyErr>(events) + }?; + + // Convert events to Python objects + Python::with_gil(|py| { + let py_events = PyList::empty(py); + for event in result { + let event = PyTuple::new(py, &[ + PyString::new(py, &event.name).as_any(), + PyList::new(py, &util::json_to_py_value(py, &event.args)?)?.as_any(), + ])?; + py_events.append(event)?; + } + + Ok(py_events.unbind()) + }) + }) + } + + pub fn disconnect<'a>(&self, py: Python<'a>) -> PyResult> { + let handle = self.handle.clone(); + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + handle.disconnect().await; + + Ok(()) + }) + } +} \ No newline at end of file diff --git a/clients/python/src/simple/async/mod.rs b/clients/python/src/simple/async/mod.rs new file mode 100644 index 000000000..97a804cae --- /dev/null +++ b/clients/python/src/simple/async/mod.rs @@ -0,0 +1,11 @@ +use pyo3::prelude::*; + +mod handle; +mod client; + +pub fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add_class::()?; + + + Ok(()) +} \ No newline at end of file diff --git a/clients/python/src/simple/mod.rs b/clients/python/src/simple/mod.rs new file mode 100644 index 000000000..a5d945c3d --- /dev/null +++ b/clients/python/src/simple/mod.rs @@ -0,0 +1,12 @@ +use pyo3::prelude::*; + +mod sync; +mod r#async; + +pub fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { + sync::init_module(m)?; + r#async::init_module(m)?; + + + Ok(()) +} \ No newline at end of file diff --git a/clients/python/src/simple/sync/client.rs b/clients/python/src/simple/sync/client.rs new file mode 100644 index 000000000..7206e8262 --- /dev/null +++ b/clients/python/src/simple/sync/client.rs @@ -0,0 +1,118 @@ +use actor_core_client::{self as actor_core_rs, CreateOptions, GetOptions, GetWithIdOptions}; +use pyo3::prelude::*; + +use super::handle::{ActorHandle, InnerActorData}; +use crate::util::{try_opts_from_kwds, PyKwdArgs, SYNC_RUNTIME}; + +#[pyclass(name = "SimpleClient")] +pub struct Client { + client: actor_core_rs::Client, +} + +#[pymethods] +impl Client { + #[new] + #[pyo3(signature=( + endpoint, + transport_kind="websocket", + encoding_kind="json" + ))] + fn py_new( + endpoint: &str, + transport_kind: &str, + encoding_kind: &str, + ) -> PyResult { + let transport_kind = try_transport_kind_from_str(&transport_kind)?; + let encoding_kind = try_encoding_kind_from_str(&encoding_kind)?; + let client = actor_core_rs::Client::new( + endpoint.to_string(), + transport_kind, + encoding_kind + ); + + Ok(Client { + client + }) + } + + #[pyo3(signature = (name, **kwds))] + fn get(&self, name: &str, kwds: Option) -> PyResult { + let opts = try_opts_from_kwds::(kwds)?; + + let handle = self.client.get(name, opts); + let handle = SYNC_RUNTIME.block_on(handle); + + match handle { + Ok(handle) => Ok(ActorHandle { + handle, + data: InnerActorData::new(), + }), + Err(e) => Err(py_runtime_err!( + "Failed to get actor: {}", + e + )) + } + } + + #[pyo3(signature = (id, **kwds))] + fn get_with_id(&self, id: &str, kwds: Option) -> PyResult { + let opts = try_opts_from_kwds::(kwds)?; + let handle = self.client.get_with_id(id, opts); + let handle = SYNC_RUNTIME.block_on(handle); + + match handle { + Ok(handle) => Ok(ActorHandle { + handle, + data: InnerActorData::new(), + }), + Err(e) => Err(py_runtime_err!( + "Failed to get actor: {}", + e + )) + } + } + + #[pyo3(signature = (name, **kwds))] + fn create(&self, name: &str, kwds: Option) -> PyResult { + let opts = try_opts_from_kwds::(kwds)?; + let handle = self.client.create(name, opts); + let handle = SYNC_RUNTIME.block_on(handle); + + match handle { + Ok(handle) => Ok(ActorHandle { + handle, + data: InnerActorData::new(), + }), + Err(e) => Err(py_runtime_err!( + "Failed to get actor: {}", + e + )) + } + } +} + +fn try_transport_kind_from_str( + transport_kind: &str +) -> PyResult { + match transport_kind { + "websocket" => Ok(actor_core_rs::TransportKind::WebSocket), + "sse" => Ok(actor_core_rs::TransportKind::Sse), + _ => Err(py_value_err!( + "Invalid transport kind: {}", + transport_kind + )), + } +} + +fn try_encoding_kind_from_str( + encoding_kind: &str +) -> PyResult { + match encoding_kind { + "json" => Ok(actor_core_rs::EncodingKind::Json), + "cbor" => Ok(actor_core_rs::EncodingKind::Cbor), + _ => Err(py_value_err!( + "Invalid encoding kind: {}", + encoding_kind + )), + } +} \ No newline at end of file diff --git a/clients/python/src/simple/sync/handle.rs b/clients/python/src/simple/sync/handle.rs new file mode 100644 index 000000000..b79dabb10 --- /dev/null +++ b/clients/python/src/simple/sync/handle.rs @@ -0,0 +1,179 @@ +use std::sync::Arc; +use actor_core_client::{self as actor_core_rs}; +use futures_util::FutureExt; +use pyo3::{prelude::*, types::{PyList, PyString, PyTuple}}; +use tokio::sync::{mpsc, Mutex}; + +use crate::util::{self, SYNC_RUNTIME}; + +struct ActorEvent { + name: String, + args: Vec, +} + +pub struct InnerActorData { + event_tx: Mutex>>, +} + +impl InnerActorData { + pub fn new() -> Arc { + Arc::new(Self { + event_tx: Mutex::new(None), + }) + } +} + +impl InnerActorData { + pub async fn on_event( + &self, + event_name: String, + args: &Vec + ) { + let tx = &self.event_tx.lock().await; + let Some(tx) = tx.as_ref() else { + return; + }; + + tx.send(ActorEvent { + name: event_name, + args: args.clone(), + }).await.map_err(|e| { + py_runtime_err!( + "Failed to send via inner tx: {}", + e + ) + }).ok(); + } +} + +#[pyclass] +pub struct ActorHandle { + pub handle: actor_core_rs::handle::ActorHandle, + pub data: Arc, +} + +#[pymethods] +impl ActorHandle { + #[new] + pub fn new() -> PyResult { + Err(py_runtime_err!("Actor handle cannot be instantiated directly")) + } + + pub fn action<'a>( + &self, + py: Python<'a>, + method: &str, + args: Vec + ) -> PyResult> { + let result = self.handle.action( + method, + util::py_to_json_value(py, &args)? + ); + let result = SYNC_RUNTIME.block_on(result); + + let Ok(result) = result else { + return Err(py_runtime_err!( + "Failed to call action: {:?}", + result.err() + )); + }; + + let mut result = util::json_to_py_value(py, &vec![result])?; + let Some(result) = result.drain(0..1).next() else { + return Err(py_runtime_err!( + "Expected one result, got {}", + result.len() + )); + }; + + Ok(result) + } + + pub fn subscribe( + &self, + event_name: &str, + ) -> PyResult<()> { + let event_name = event_name.to_string(); + let data = self.data.clone(); + + SYNC_RUNTIME.block_on( + self.handle.on_event(&event_name.clone(), move |args| { + let event_name = event_name.clone(); + let args = args.clone(); + let data = data.clone(); + + tokio::spawn(async move { + data.on_event(event_name, &args).await; + }); + }) + ); + + Ok(()) + } + + #[pyo3(signature=(count, timeout=None))] + pub fn receive<'a>( + &self, + py: Python<'a>, + count: u32, + timeout: Option + ) -> PyResult> { + let (tx, mut rx) = mpsc::channel(count as usize); + + SYNC_RUNTIME.block_on(async { + self.data.event_tx.lock().await.replace(tx); + }); + + let result: Vec = SYNC_RUNTIME.block_on(async { + let mut events: Vec = Vec::new(); + + loop { + if events.len() >= count as usize { + break; + } + + let timeout_rx_future = match timeout { + Some(timeout) => { + let timeout = std::time::Duration::from_secs_f64(timeout); + tokio::time::timeout(timeout, rx.recv()) + .map(|x| x.unwrap_or(None)).boxed() + }, + None => rx.recv().boxed() + }; + + tokio::select! { + result = timeout_rx_future => { + match result { + Some(event) => events.push(event), + None => break, + } + }, + // TODO: Add more signal support + _ = tokio::signal::ctrl_c() => { + py.check_signals()?; + } + }; + } + + Ok::<_, PyErr>(events) + })?; + + // Convert events to Python objects + let py_events = PyList::empty(py); + for event in result { + let event = PyTuple::new(py, &[ + PyString::new(py, &event.name).as_any(), + PyList::new(py, &util::json_to_py_value(py, &event.args)?)?.as_any(), + ])?; + py_events.append(event)?; + } + + Ok(py_events) + } + + pub fn disconnect(&self) { + SYNC_RUNTIME.block_on( + self.handle.disconnect() + ) + } +} \ No newline at end of file diff --git a/clients/python/src/simple/sync/mod.rs b/clients/python/src/simple/sync/mod.rs new file mode 100644 index 000000000..97a804cae --- /dev/null +++ b/clients/python/src/simple/sync/mod.rs @@ -0,0 +1,11 @@ +use pyo3::prelude::*; + +mod handle; +mod client; + +pub fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add_class::()?; + + + Ok(()) +} \ No newline at end of file diff --git a/clients/python/src/util.rs b/clients/python/src/util.rs new file mode 100644 index 000000000..24fc3c0a9 --- /dev/null +++ b/clients/python/src/util.rs @@ -0,0 +1,252 @@ +use actor_core_client::{ + client, CreateOptions, GetOptions, GetWithIdOptions +}; +use once_cell::sync::Lazy; +use pyo3::{prelude::*, types::PyDict}; +use tokio::runtime::{self, Runtime}; + +pub static SYNC_RUNTIME: Lazy = Lazy::new(|| { + runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .unwrap() +}); + +macro_rules! py_runtime_err { + ($msg:expr) => { + pyo3::exceptions::PyRuntimeError::new_err($msg) + }; + ($msg:expr, $($arg:tt)*) => { + pyo3::exceptions::PyRuntimeError::new_err(format!( + $msg, + $($arg)* + )) + }; +} + +macro_rules! py_value_err { + ($msg:expr) => { + pyo3::exceptions::PyValueError::new_err($msg) + }; + ($msg:expr, $($arg:tt)*) => { + pyo3::exceptions::PyValueError::new_err(format!( + $msg, + $($arg)* + )) + }; +} + +// See ACTR-96 +pub fn py_to_json_value( + py: Python<'_>, + py_obj: &Vec +) -> PyResult> { + let py_json = py.import("json")?; + + let obj_strs: Vec = py_obj + .into_iter() + .map(|obj| { + let obj_str: String = py_json + .call_method("dumps", (obj,), None)? + .extract::()?; + + Ok(obj_str) + }) + .collect::>>()?; + + let json_value = obj_strs + .into_iter() + .map(|s| { + match serde_json::from_str(&s) { + Ok(value) => Ok(value), + Err(e) => Err(py_value_err!( + "Failed to parse JSON: {}", + e + )) + } + }) + .collect::>>()?; + + Ok(json_value) +} + +pub fn json_to_py_value<'a>( + py: Python<'a>, + val: &Vec +) -> PyResult>> { + let py_json = py.import("json")?; + + val.into_iter() + .map(|f| { + let str = serde_json::to_string(f) + .map_err(|e| py_value_err!( + "Failed to serialize JSON value: {}", + e + ))?; + + py_json + .call_method("loads", (str,), None) + .map_err(|e| py_value_err!( + "Failed to load JSON value: {}", + e + )) + }) + .collect() +} + +fn extract_tags(tags: Option>) -> PyResult>> { + let Some(tags) = tags else { + return Ok(None) + }; + + // tags should be a Tuple> + // or a List> + // Convert it to a Vec<(String, String)> + let tags: Vec<(String, String)> = tags + .extract::>() + .map_err(|_| { + py_value_err!( + "Invalid tags format. Expected a list of tuples, ex: [(\"key\", \"value\")]" + ) + })?; + + Ok(Some(tags)) +} + +fn extract_params(params: Option>) -> PyResult> { + let Some(params) = params else { + return Ok(None) + }; + + let value = Python::with_gil(|py| py_to_json_value(py, &vec![params.unbind()]))?; + let Some(value) = value.first() else { + return Err(py_runtime_err!("Failed to convert params to JSON value")); + }; + + Ok(Some(value.clone())) +} + + +pub type PyKwdArgs<'a> = Bound<'a, PyDict>; +pub struct PyKwdArgsWrapper<'a>(pub PyKwdArgs<'a>); +pub fn try_opts_from_kwds<'a, T>(kwds: Option>) -> PyResult +where + T: TryFrom, Error = PyErr> + Default, +{ + let opts = kwds.map_or(Ok(T::default()), |kwds| { + T::try_from(PyKwdArgsWrapper(kwds)) + })?; + + Ok(opts) +} + +impl TryFrom::> for GetOptions { + type Error = PyErr; + + fn try_from(kwds: PyKwdArgsWrapper) -> PyResult { + let tags = extract_tags(kwds.0.get_item("tags")?)?; + + let params = match kwds.0.get_item("params")? { + Some(params) => { + let value = Python::with_gil(|py| py_to_json_value(py, &vec![params.unbind()]))?; + let Some(value) = value.first() else { + return Err(py_runtime_err!("Failed to convert params to JSON value")); + }; + + Some(value.clone()) + }, + None => None + }; + + let no_create = match kwds.0.get_item("no_create")? { + Some(no_create) => { + Some(no_create.extract::().map_err(|_| { + py_value_err!( + "Invalid no_create format. Expected a boolean" + ) + })?) + }, + None => None + }; + + let create = match kwds.0.get_item("create")? { + Some(create) => { + let create_req_metadata = create.downcast::().map_err(|_| { + py_value_err!( + "Invalid create format. Expected a dictionary" + ) + })?; + + let tags = extract_tags(create_req_metadata.get_item("tags")?)?; + + let region = create_req_metadata + .get_item("region")? + .map(|v| v.extract::()) + .transpose()?; + + Some(client::PartialCreateRequestMetadata { + tags, + region + }) + }, + None => None + }; + + Ok(GetOptions { + tags, + params, + no_create, + create + }) + } +} + +impl TryFrom::> for CreateOptions { + type Error = PyErr; + + fn try_from(kwds: PyKwdArgsWrapper) -> PyResult { + let params = extract_params(kwds.0.get_item("params")?)?; + + let create = match kwds.0.get_item("create")? { + Some(create) => { + let create_req_metadata = create.downcast::().map_err(|_| { + py_value_err!( + "Invalid create format. Expected a dictionary" + ) + })?; + + let tags = extract_tags(create_req_metadata.get_item("tags")?)? + .unwrap_or_default(); + + let region = create_req_metadata + .get_item("region")? + .map(|v| v.extract::()) + .transpose()?; + + client::CreateRequestMetadata { + tags, + region + } + }, + None => client::CreateRequestMetadata::default() + }; + + Ok(CreateOptions { + params, + create, + }) + } +} + +impl TryFrom::> for GetWithIdOptions { + type Error = PyErr; + + fn try_from(kwds: PyKwdArgsWrapper) -> PyResult { + let params = extract_params(kwds.0.get_item("params")?)?; + + Ok(GetWithIdOptions { + params + }) + } +} \ No newline at end of file diff --git a/clients/python/tests/async.py b/clients/python/tests/async.py new file mode 100644 index 000000000..12548f469 --- /dev/null +++ b/clients/python/tests/async.py @@ -0,0 +1,19 @@ +import asyncio +from python_actor_core_client import AsyncClient as ActorClient + +async def main(): + client = ActorClient("http://127.0.0.1:6420") + + handle = await client.get("counter", tags=[("tag", "valu3")]) + print("Actor handle:", handle) + + print("Subscribed to newCount") + handle.on_event("newCount", lambda msg: print("Received msg:", msg)) + + print("Sending action") + print(await handle.action("increment", [1])) + + await asyncio.sleep(2) + + +asyncio.run(main()) \ No newline at end of file diff --git a/clients/python/tests/simple_async.py b/clients/python/tests/simple_async.py new file mode 100644 index 000000000..6bbe6bb83 --- /dev/null +++ b/clients/python/tests/simple_async.py @@ -0,0 +1,20 @@ +import asyncio +from python_actor_core_client import AsyncSimpleClient as ActorClient + +async def main(): + client = ActorClient("http://127.0.0.1:6420") + + handle = await client.get("counter") + print("Actor handle:", handle) + + print("Subscribing to newCount") + await handle.subscribe("newCount") + + print("Receiving msgs") + print(await handle.receive(1)) + + print("Sending action") + print(await handle.action("increment", [1])) + + +asyncio.run(main()) \ No newline at end of file diff --git a/clients/python/tests/simple_sync.py b/clients/python/tests/simple_sync.py new file mode 100644 index 000000000..525c05bd4 --- /dev/null +++ b/clients/python/tests/simple_sync.py @@ -0,0 +1,15 @@ +from python_actor_core_client import SimpleClient as ActorClient + +client = ActorClient("http://127.0.0.1:6420") + +handle = client.get("counter") +print("Actor handle:", handle) + +print("Subscribing to newCount") +handle.subscribe("newCount") + +print("Receiving msgs") +print(handle.receive(1)) + +print("Sending action") +print(handle.action("increment", [1])) diff --git a/clients/python/tests/sync.py b/clients/python/tests/sync.py new file mode 100644 index 000000000..5bb6c372d --- /dev/null +++ b/clients/python/tests/sync.py @@ -0,0 +1,12 @@ +from python_actor_core_client import Client as ActorClient + +client = ActorClient("http://127.0.0.1:6420") + +handle = client.get("counter") +print("Actor handle:", handle) + +print("Subscribing to newCount") +handle.on_event("newCount", lambda msg: print("Received msg:", msg)) + +print("Sending action") +print(handle.action("increment", [1])) diff --git a/clients/rust/src/client.rs b/clients/rust/src/client.rs index 95f8908ab..7f1ec24ea 100644 --- a/clients/rust/src/client.rs +++ b/clients/rust/src/client.rs @@ -12,6 +12,15 @@ pub struct CreateRequestMetadata { pub region: Option, } +impl Default for CreateRequestMetadata { + fn default() -> Self { + Self { + tags: vec![], + region: None, + } + } +} + pub struct PartialCreateRequestMetadata { pub tags: Option, pub region: Option, @@ -54,10 +63,7 @@ impl Default for CreateOptions { fn default() -> Self { Self { params: None, - create: CreateRequestMetadata { - tags: vec![], - region: None, - }, + create: CreateRequestMetadata::default() } } } diff --git a/clients/rust/src/drivers/mod.rs b/clients/rust/src/drivers/mod.rs index 560639529..5c9fe83ca 100644 --- a/clients/rust/src/drivers/mod.rs +++ b/clients/rust/src/drivers/mod.rs @@ -25,6 +25,7 @@ pub enum DriverStopReason { pub(crate) type MessageToClient = Arc; pub(crate) type MessageToServer = Arc; +#[derive(Debug)] pub(crate) struct DriverHandle { abort_handle: AbortHandle, sender: mpsc::Sender, diff --git a/clients/rust/src/handle.rs b/clients/rust/src/handle.rs index 2285629b8..3765ded57 100644 --- a/clients/rust/src/handle.rs +++ b/clients/rust/src/handle.rs @@ -1,6 +1,7 @@ use anyhow::Result; use futures_util::FutureExt; use serde_json::Value; +use std::fmt::Debug; use std::ops::Deref; use std::sync::atomic::{AtomicI64, Ordering}; use std::time::Duration; @@ -401,3 +402,13 @@ impl ActorHandleInner { rx.await.ok(); } } + +impl Debug for ActorHandleInner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ActorHandle") + .field("endpoint", &self.endpoint) + .field("transport_kind", &self.transport_kind) + .field("encoding_kind", &self.encoding_kind) + .finish() + } +} \ No newline at end of file diff --git a/clients/rust/tests/e2e.rs b/clients/rust/tests/e2e.rs index 7b695b50e..1ca48f135 100644 --- a/clients/rust/tests/e2e.rs +++ b/clients/rust/tests/e2e.rs @@ -3,8 +3,6 @@ use fs_extra; use portpicker; use serde_json::json; use std::process::{Child, Command}; -use std::sync::atomic::{AtomicI64, Ordering}; -use std::sync::Arc; use std::time::Duration; use tempfile; use tokio::time::sleep;