Skip to content

Commit 212b84d

Browse files
authored
wait for a connection before sending command in the rodbus-client app (#148)
1 parent 92cca0c commit 212b84d

File tree

1 file changed

+38
-3
lines changed

1 file changed

+38
-3
lines changed

rodbus-client/src/main.rs

+38-3
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,27 @@ struct Args {
4242
period: Option<Duration>,
4343
}
4444

45+
struct ConnectionListener {
46+
tx: tokio::sync::mpsc::Sender<ClientState>,
47+
}
48+
49+
impl ConnectionListener {
50+
fn create() -> (Self, tokio::sync::mpsc::Receiver<ClientState>) {
51+
let (tx, rx) = tokio::sync::mpsc::channel(32);
52+
(Self { tx }, rx)
53+
}
54+
}
55+
56+
impl Listener<ClientState> for ConnectionListener {
57+
fn update(&mut self, state: ClientState) -> MaybeAsync<()> {
58+
let tx = self.tx.clone();
59+
let future = async move {
60+
let _ = tx.try_send(state);
61+
};
62+
MaybeAsync::asynchronous(future)
63+
}
64+
}
65+
4566
impl Args {
4667
fn new(address: SocketAddr, id: UnitId, command: Command, period: Option<Duration>) -> Self {
4768
Self {
@@ -68,16 +89,30 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6889
Ok(())
6990
}
7091

71-
async fn run() -> Result<(), Error> {
92+
async fn run() -> Result<(), Box<dyn std::error::Error>> {
7293
let args = parse_args()?;
94+
95+
let (listener, mut rx) = ConnectionListener::create();
96+
7397
let mut channel = spawn_tcp_client_task(
7498
HostAddr::ip(args.address.ip(), args.address.port()),
7599
1,
76100
default_retry_strategy(),
77101
AppDecodeLevel::DataValues.into(),
78-
None,
102+
Some(Box::new(listener)),
79103
);
80104
channel.enable().await?;
105+
106+
'connect: loop {
107+
let state = rx.recv().await.expect("should never be empty");
108+
tracing::info!("state: {state:?}");
109+
match state {
110+
ClientState::Disabled | ClientState::Connecting => {}
111+
ClientState::Connected => break 'connect,
112+
_ => return Err("unable to connect".into()),
113+
}
114+
}
115+
81116
let params = RequestParam::new(args.id, Duration::from_secs(1));
82117

83118
match args.period {
@@ -93,7 +128,7 @@ async fn run_command(
93128
command: &Command,
94129
channel: &mut Channel,
95130
params: RequestParam,
96-
) -> Result<(), Error> {
131+
) -> Result<(), Box<dyn std::error::Error>> {
97132
match command {
98133
Command::ReadCoils(range) => {
99134
for x in channel.read_coils(params, *range).await? {

0 commit comments

Comments
 (0)