Skip to content

ZMQ_STREAM server cannot disconnect client when sndhwm reached #4828

@pijyoi

Description

@pijyoi

When a ZMQ_STREAM server wants to disconnect a client, it does so by sending the routing id followed by an empty payload frame.
If the sndhwm has been reached, then it will not be possible to disconnect the client as the disconnection frame cannot be sent successfully.

The code in the library implementing this behavior is:

int zmq::stream_t::xsend (msg_t *msg_)

MWE demonstrating the issue. The scenario is that of a client that has stopped draining its socket.

#include <string>

#include <cassert>
#include <cstdio>

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>

#include <zmq.h>

int main()
{
    void *zctx = zmq_ctx_new();
    void *zsock = zmq_socket(zctx, ZMQ_STREAM);
    int sndhwm = 3;     // set low sndhwm to trigger the issue earlier
    zmq_setsockopt(zsock, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm));
    zmq_bind(zsock, "tcp://127.0.0.1:8080");

    // connect to zmq stream socket
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in sa = {};
    sa.sin_family = AF_INET;
    sa.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
    sa.sin_port = htons(8080);
    connect(sock, (sockaddr*)&sa, sizeof(sa));

    // retrieve routing id
    std::string peer_id;
    {
        zmq_msg_t zframe;
        zmq_msg_init(&zframe);
        zmq_msg_recv(&zframe, zsock, 0);
        peer_id.assign(static_cast<char*>(zmq_msg_data(&zframe)), zmq_msg_size(&zframe));
        zmq_msg_close(&zframe);
        zmq_msg_init(&zframe);
        zmq_msg_recv(&zframe, zsock, 0);    // empty connection frame
        assert(zmq_msg_size(&zframe) == 0);
        zmq_msg_close(&zframe);
    }

    // send data to client until sndhwm reached
    int cnt = 0;
    while (true)
    {
        int rc = zmq_send(zsock, peer_id.data(), peer_id.size(), ZMQ_DONTWAIT | ZMQ_SNDMORE);
        if (rc == -1) break;
        zmq_msg_t zframe;
        zmq_msg_init_size(&zframe, 262144);
        rc = zmq_msg_send(&zframe, zsock, ZMQ_DONTWAIT);
        zmq_msg_close(&zframe);
        if (rc == -1) break;
        cnt++;
        printf("cnt %d\n", cnt);
    }
    assert(zmq_errno() == EAGAIN);

    // try to disconnect client by sending empty frame
    int rc = zmq_send(zsock, peer_id.data(), peer_id.size(), ZMQ_DONTWAIT | ZMQ_SNDMORE);
    if (rc == 0) {
        rc = zmq_send(zsock, "", 0, ZMQ_DONTWAIT);
    }
    if (rc == -1) {
        printf("can't disconnect: %s\n", zmq_strerror(zmq_errno()));
    }

    close(sock);

    zmq_close(zsock);
    zmq_ctx_term(zctx);
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions