Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ fn main() {
.count()
.map(|i| format!("hello, world {:}", i))
.print()
.run(RunMode::RealTime, RunFor::Duration(period*3)
);
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(3))
.unwrap();
}
```
This output is produced:
Expand All @@ -58,8 +58,3 @@ We want to hear from you! Especially if you:
- have any feedback

Please email us at [hello@wingfoil.io](mailto:hello@wingfoil.io), submit an [issue](https://github.com/wingfoil-io/wingfoil/issues) or get involved in the [discussion](https://github.com/wingfoil-io/wingfoil/discussions/).





4 changes: 2 additions & 2 deletions wingfoil-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use std::time::Duration;

#[pyclass(unsendable, name = "Node")]
#[derive(Clone)]
struct PyNode(Rc<dyn Node>);
struct PyNode(Rc<dyn Node<'static> + 'static>);

impl PyNode {
fn new(node: Rc<dyn Node>) -> Self {
fn new(node: Rc<dyn Node<'static> + 'static>) -> Self {
Self(node)
}
}
Expand Down
20 changes: 13 additions & 7 deletions wingfoil-python/src/proxy_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use pyo3::prelude::*;
use crate::py_element::PyElement;
use crate::py_stream::PyStream;

use ::wingfoil::{GraphState, IntoNode, MutableNode, StreamPeekRef, UpStreams};
use ::wingfoil::{GraphState, IntoNode, MutableNode, StreamPeekRef, UpStreams, Node};
use std::rc::Rc;

/// This is used as inner class of python coded base class Stream
#[derive(Display)]
Expand All @@ -27,8 +28,8 @@ impl Clone for PyProxyStream {
}
}

impl MutableNode for PyProxyStream {
fn cycle(&mut self, _state: &mut GraphState) -> anyhow::Result<bool> {
impl<'a> MutableNode<'a> for PyProxyStream {
fn cycle(&mut self, _state: &mut GraphState<'a>) -> anyhow::Result<bool> {
Python::attach(|py| {
let this = self.0.bind(py);
let res = this
Expand All @@ -39,20 +40,25 @@ impl MutableNode for PyProxyStream {
})
}

fn upstreams(&self) -> UpStreams {
fn upstreams(&self) -> UpStreams<'a> {
let ups = Python::attach(|py| {
let this = self.0.bind(py);
let res = this.call_method0("upstreams").unwrap();
let res = res.extract::<Vec<Py<PyAny>>>().unwrap();
res.iter()
.map(|obj| {
let bound = obj.bind(py);
if let Ok(stream) = bound.extract::<PyStream>() {
let node_static = if let Ok(stream) = bound.extract::<PyStream>() {
stream.inner_stream().as_node()
} else if let Ok(stream) = bound.extract::<PyProxyStream>() {
stream.into_node()
} else {
panic!("Unexpected upstream type");
};
// SAFETY: All python-wrapped nodes are effectively 'static.
// We transmute from 'static to 'a to satisfy the return type constraint.
unsafe {
std::mem::transmute::<Rc<dyn Node<'static> + 'static>, Rc<dyn Node<'a> + 'a>>(node_static)
}
})
.collect::<Vec<_>>()
Expand All @@ -65,7 +71,7 @@ lazy_static! {
pub static ref DUMMY_PY_ELEMENT: PyElement = PyElement::none();
}

impl StreamPeekRef<PyElement> for PyProxyStream {
impl<'a> StreamPeekRef<'a, PyElement> for PyProxyStream {
// This is a bit hacky - we supply dummy value for peek ref
// but resolve it to real value in from_cell_ref.
// Currently peek_ref is only used directly in demux.
Expand All @@ -80,4 +86,4 @@ impl StreamPeekRef<PyElement> for PyProxyStream {
PyElement::new(res)
})
}
}
}
32 changes: 16 additions & 16 deletions wingfoil-python/src/py_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use log::Level;
use pyo3::BoundObject;
use std::any::type_name;

use ::wingfoil::{Element, IntoStream, NodeOperators, Stream, StreamOperators};
use ::wingfoil::{Element, IntoStream, NodeOperators, Stream, StreamOperators, NanoTime, GraphState};

use pyo3::conversion::IntoPyObject;
use pyo3::prelude::*;
Expand All @@ -16,12 +16,12 @@ use crate::*;

#[derive(Clone)]
#[pyclass(subclass, unsendable, name = "Stream")]
pub struct PyStream(pub Rc<dyn Stream<PyElement>>);
pub struct PyStream(pub Rc<dyn Stream<'static, PyElement> + 'static>);

impl PyStream {
fn extract<T>(&self) -> Rc<dyn Stream<T>>
fn extract<T>(&self) -> Rc<dyn Stream<'static, T> + 'static>
where
T: Element + for<'a, 'py> FromPyObject<'a, 'py>,
T: Element + for<'a, 'py> FromPyObject<'a, 'py> + 'static + Default,
{
self.0.map(move |x: PyElement| {
Python::attach(|py| match x.as_ref().extract::<T>(py) {
Expand All @@ -33,11 +33,11 @@ impl PyStream {
})
}

pub fn inner_stream(&self) -> Rc<dyn Stream<PyElement>> {
pub fn inner_stream(&self) -> Rc<dyn Stream<'static, PyElement> + 'static> {
self.0.clone()
}

pub fn from_inner(inner: Rc<dyn Stream<PyElement>>) -> Self {
pub fn from_inner(inner: Rc<dyn Stream<'static, PyElement> + 'static>) -> Self {
Self(inner)
}
}
Expand All @@ -58,14 +58,14 @@ pub fn vec_any_to_pyany(x: Vec<Py<PyAny>>) -> Py<PyAny> {

pub trait AsPyStream<T>
where
T: Element + for<'py> IntoPyObject<'py>,
T: Element + for<'py> IntoPyObject<'py> + 'static + Default,
{
fn as_py_stream(&self) -> PyStream;
}

impl<T> AsPyStream<T> for Rc<dyn Stream<T>>
impl<T> AsPyStream<T> for Rc<dyn Stream<'static, T> + 'static>
where
T: Element + for<'py> IntoPyObject<'py>,
T: Element + for<'py> IntoPyObject<'py> + 'static + Default,
{
fn as_py_stream(&self) -> PyStream {
let strm = self.map(|x| {
Expand Down Expand Up @@ -107,7 +107,7 @@ impl PyStream {
// begin StreamOperators

fn collect(&self) -> PyStream {
let strm = self.0.collect().map(|items| {
let strm = self.0.collect().map(|items: Vec<::wingfoil::ValueAt<PyElement>>| {
Python::attach(move |py| {
let items = items
.iter()
Expand All @@ -124,7 +124,7 @@ impl PyStream {
}

fn buffer(&self, capacity: usize) -> PyStream {
let strm = self.0.buffer(capacity).map(|items| {
let strm = self.0.buffer(capacity).map(|items: Vec<PyElement>| {
Python::attach(move |py| {
let items = items
.iter()
Expand All @@ -137,7 +137,7 @@ impl PyStream {
}

fn finally(&self, func: Py<PyAny>) -> PyNode {
let node = self.0.finally(|py_elmnt, _| {
let node = self.0.finally(|py_elmnt: PyElement, _: &GraphState<'static>| {
Python::attach(move |py| {
let res = py_elmnt.as_ref().clone_ref(py);
let args = (res,);
Expand All @@ -148,7 +148,7 @@ impl PyStream {
}

fn for_each(&self, func: Py<PyAny>) -> PyNode {
let node = self.0.for_each(move |py_elmnt, t| {
let node = self.0.for_each(move |py_elmnt: PyElement, t: NanoTime| {
Python::attach(|py| {
let res = py_elmnt.as_ref().clone_ref(py);
let t: f64 = t.into();
Expand Down Expand Up @@ -177,7 +177,7 @@ impl PyStream {

/// drops source contingent on supplied predicate (Python callable)
fn filter(&self, keep_func: Py<PyAny>) -> PyStream {
let keep = self.0.map(move |x| {
let keep = self.0.map(move |x: PyElement| {
Python::attach(|py| {
keep_func
.call1(py, (x.value(),))
Expand All @@ -201,7 +201,7 @@ impl PyStream {

/// Map’s its source into a new Stream using the supplied Python callable.
fn map(&self, func: Py<PyAny>) -> PyStream {
let stream = self.0.map(move |x| {
let stream = self.0.map(move |x: PyElement| {
Python::attach(|py| {
let res = func.call1(py, (x.value(),)).unwrap();
PyElement::new(res)
Expand Down Expand Up @@ -234,4 +234,4 @@ impl PyStream {
}

// end StreamOperators
}
}
16 changes: 7 additions & 9 deletions wingfoil/examples/async/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,21 @@ async oriented systems.
RUST_LOG=INFO cargo run --example async
```

```rust
```rust, no_run
use async_stream::stream;
use std::time::Duration;
use std::pin::Pin;
use futures::StreamExt;
use wingfoil::*;

let period = Duration::from_millis(10);
let run_for = RunFor::Duration(period * 5);
let run_mode = RunMode::RealTime;
let run_for = RunFor::Cycles(5);
let run_mode = RunMode::HistoricalFrom(NanoTime::ZERO);
let producer = async move || {
stream! {
for i in 0.. {
tokio::time::sleep(period).await; // simulate waiting IO
let time = NanoTime::now();
yield (time, i * 10);
let time = NanoTime::new(i * 10_000_000);
yield (time, i as u32 * 10);
}
}
};
Expand All @@ -58,6 +57,5 @@ produce_async(producer)
.logged("on-graph", log::Level::Info)
.collapse()
.consume_async(Box::new(consumer))
.run(run_mode, run_for);
```

.run(run_mode, run_for).unwrap();
```
2 changes: 1 addition & 1 deletion wingfoil/examples/breadth_first/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ and [Wikipedia](https://en.wikipedia.org/wiki/Reactive_programming#Glitches)
for more details.

In wingfoil we build an example with a depth of 127 branch / recombine operations:
```rust
```rust, no_run
use wingfoil::*;

fn main(){
Expand Down
2 changes: 1 addition & 1 deletion wingfoil/examples/order_book/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Completed 91998 cycles in 287.125397ms. 3.12µs average.

<div style="page-break-after: always;"></div>

```rust, ignore
```rust, no_run

pub fn main() {
env_logger::init();
Expand Down
48 changes: 24 additions & 24 deletions wingfoil/examples/rfq/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,25 @@ fn main() {
let run_for = RunFor::Forever;
let n_rfqs = 100; // max number of concurrent rfqs
let n_msgs_historical = 1_000_000;

// We declare everything outside the match to extend lifetimes
let env = Environment::Test;
let rt_market_data = RealTimeMarketDataProvider::new(env);
let hist_market_data = HistoricalMarketDataProvider::new(n_msgs_historical);
let gateway = RealTimeOrderGateway::new();

let rt_responder = RfqResponder::new(n_rfqs, rt_market_data, gateway.clone());
let hist_responder = RfqResponder::new(n_rfqs, hist_market_data, gateway);

let nodes = match run_mode {
RunMode::RealTime => {
let env = Environment::Test;
let responder = RfqResponder::new(
n_rfqs,
RealTimeMarketDataProvider::new(env),
RealTimeOrderGateway::new(),
);
responder.build()
rt_responder.build()
}
RunMode::HistoricalFrom(_) => {
let responder = RfqResponder::new(
n_rfqs,
HistoricalMarketDataProvider::new(n_msgs_historical),
RealTimeOrderGateway::new(),
);
responder.build()
hist_responder.build()
}
};

let mut graph = Graph::new(nodes, run_mode, run_for);
let t0 = std::time::Instant::now();
graph.run().unwrap();
Expand Down Expand Up @@ -71,23 +71,23 @@ where
O: OrderGateway + 'static,
{
/// input streams
fn source(&self) -> Rc<dyn Stream<TinyVec<[MarketData; 1]>>> {
fn source<'a>(&self) -> Rc<dyn Stream<'a, TinyVec<[MarketData; 1]>> + 'a> {
// async is abstracted away
self.market_data_provider.notifications()
}

/// output node
fn send(&self, orders: Rc<dyn Stream<TinyVec<[Order; 1]>>>) -> Rc<dyn Node> {
fn send<'a>(&self, orders: Rc<dyn Stream<'a, TinyVec<[Order; 1]>> + 'a>) -> Rc<dyn Node<'a> + 'a> {
// async is abstracted away
self.order_gateway.send(orders)
}

/// demuxed sources
fn sources(
fn sources<'a>(
&self,
) -> (
Vec<Rc<dyn Stream<TinyVec<[MarketData; 1]>>>>,
Overflow<TinyVec<[MarketData; 1]>>,
Vec<Rc<dyn Stream<'a, TinyVec<[MarketData; 1]>> + 'a>>,
Overflow<'a, TinyVec<[MarketData; 1]>>,
) {
self.source().demux_it(
self.n_rfqs, // max num of current rfqs
Expand All @@ -96,7 +96,7 @@ where
}

/// main entry point to build the nodes
pub fn build(&self) -> Vec<Rc<dyn Node>> {
pub fn build<'a>(&'a self) -> Vec<Rc<dyn Node<'a> + 'a>> {
let (sources, overflow) = self.sources();
let order_streams = sources
.iter()
Expand All @@ -110,12 +110,12 @@ where

/// The main rfq circuit.
/// Each circuit can easily be set up to run on worker a thread.
fn rfq_circuit(
fn rfq_circuit<'a>(
subcircuit_id: usize,
market_data: Rc<dyn Stream<TinyVec<[MarketData; 1]>>>,
) -> Rc<dyn Stream<Order>> {
let label = format!("subcircuit {subcircuit_id} received");
market_data.logged(&label, Info).map(|_mkt_data_burst| {
market_data: Rc<dyn Stream<'a, TinyVec<[MarketData; 1]>> + 'a>,
) -> Rc<dyn Stream<'a, Order> + 'a> {
let label = format!("subcircuit {} received", subcircuit_id);
market_data.logged(&label, Info).map(move |_mkt_data_burst| {
//println!("{:?}", mkt_data_burst.len());
Order::new()
})
Expand Down
Loading
Loading