Skip to content

ROUTER/DEALER mode may cause deadlock when using the same identity #210

@Remalloc

Description

@Remalloc

change src/router.rs

#[async_trait]
impl SocketSend for RouterSocket {
    async fn send(&mut self, mut message: ZmqMessage) -> ZmqResult<()> {
        assert!(message.len() > 1);
        let peer_id: PeerIdentity = message.pop_front().unwrap().try_into()?;
        match self.backend.peers.get_mut(&peer_id) {
            Some(mut peer) => {
                println!("sleep 5"); 
                tokio::time::sleep(std::time::Duration::from_secs(5)).await; // pause for 5 seconds to simulate transmission in progress
                peer.send_queue.send(Message::Message(message)).await?;
                Ok(())
            }
            None => Err(ZmqError::Other("Destination client not found by identity")),
        }
    }
}

server.rs

use bytes::Bytes;
use tokio;
use zeromq::prelude::*;
use zeromq::ZmqMessage;

struct Server {
    router: zeromq::RouterSocket,
    identity: Bytes,
}

impl Server {
    async fn new(url: &str, identity: Bytes) -> Self {
        let mut router = zeromq::RouterSocket::new();
        router.bind(url).await.unwrap();
        Self { router, identity }
    }

    async fn run(&mut self) {
        let mut ticker = tokio::time::interval(std::time::Duration::from_secs(1));
        let mut cnt = 0;
        loop {
            tokio::select! {
                msg = self.router.recv() => {
                    println!("Server recv {:?}", msg);
                }
                _ = ticker.tick() => {
                    let content = Bytes::from(format!("server-{}",cnt));
                    let msg = ZmqMessage::try_from(vec![self.identity.clone(), content]).unwrap();
                    match self.router.send(msg).await{
                        Ok(_) => {}
                        Err(e) => {
                            println!("Server send error {:?}", e);
                        }
                    }
                    cnt += 1;

                }
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let url = "tcp://0.0.0.0:5559";
    let identity = Bytes::from("identity");
    let mut server = Server::new(url, identity.clone()).await;

    tokio::spawn(async move {
        server.run().await;
        println!("server exit");
    });
    let mut ticker = tokio::time::interval(std::time::Duration::from_secs(1));
    loop {
        ticker.tick().await;
        println!("I'm alive");
    }
}

client.py

import os
import time

import zmq


def get_socket():
    context = zmq.Context()
    s = context.socket(zmq.DEALER)
    s.setsockopt_string(zmq.IDENTITY, "identity")
    s.connect("tcp://127.0.0.1:5559")
    return s


def main():
    s = get_socket()
    cnt = 0
    while True:
        msg = f"Client send {cnt}"
        print(msg)
        s.send(msg.encode(), zmq.NOBLOCK)
        if cnt == 10:
            os._exit(0)
        cnt += 1
        time.sleep(1)


if __name__ == "__main__":
    main()

Reproduce steps:

  1. run server.rs and client.py
  2. restart client.py when it shutdown
  3. the server.rs is currently not printing anything due to a deadlock

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions