Skip to content

Req-Rep threaded message lost with HWM=1 #4750

Open
@ericjmcd

Description

Please use this template for reporting suspected bugs or requests for help.

Issue description

With a REQ socket and a REP socket running in a separate thread, sending request-replies over and over with a SNDHWM=1 on the REP socket, eventually, a REP message gets lots. The REP does not throw zmq.Again or zmq.Error but the message appears to get lost. The example shows that the REQ times out receiving the response and then sets a global error flag that shows that the REP thread moved past the send (where the message is lost) and is waiting for the next request. This appears to happen on both IPC and TCP connections.

SNDHWM does not really make sense for a REQ-REP pair but we inadvertently set HWM=1 for all our sockets and occasionally experienced this failure so thought it should be reviewed. Setting HWM=2 appears to resolve the issue.

Initially submitted as pyzmq bug but they indicated it was likely a libzmq bug:
zeromq/pyzmq#2040

Environment

Python 3.12

  • libzmq version (commit hash if unreleased):
    pyzmq 26.0.3
    libzmq 4.3.5

  • OS:
    Ubuntu 20.04 docker image running on Ubuntu 20.04 host

Minimal test code / Steps to reproduce the issue

"""Test ZMQ REQ-REP with REP HWM=1
Example output:
# HWM=1 fails at ~ 2.5M messages and the REQ receive times out
# and the REP thread shows it proceeded past the send() w/o error
# and is waiting for the next receive().
Rep HWM=1
Req timed out receiving response 2659829
Waiting for request after error
Rep thread exiting
# HWM=2 does not fail and all messages are accounted for.
Rep HWM=2
Rep thread exiting
"""
import zmq
import time
import threading

URL = 'ipc:///tmp/zmq_test_pipe'

global running, error
error = False
running = True

# REP socket to run as a thread until running=False
# and will use global 'error' to show that the failure
# does not occur at the REP.send() call.
def reply_sock(hwm):
    ctx = zmq.Context.instance()
    sock = ctx.socket(zmq.REP)
    print(f'Rep HWM={hwm}')
    sock.SNDHWM=hwm  # HWM=1 fails, HWM=2 passes
    sock.RCVTIMEO=100
    sock.SNDTIMEO=1
    sock.LINGER=0
    sock.bind(URL)
        
    resp = 'x'*100  # Arbitrary response
    resp_bin = resp.encode('utf-8')

    global running, error
    # Loop with responses for every request. 
    while running:
        try:
            if error:
                print('Waiting for request after error')
                error = False # We caught the error, so clear flag
            rx = sock.recv()
        except zmq.Again:
            continue
        try:
            sock.send(resp_bin)
        except zmq.Again:
            print('Reply timed out sending response')
            break
        except Exception as ex:  # Catch any other error
            print(f'Unexpected exception: {ex}')
    sock.close()
    print(f'Rep thread exiting')  # Verify we cleanly exit and are not hung somewhere

def req_sock(count):
    ctx = zmq.Context.instance()
    sock = ctx.socket(zmq.REQ)
    sock.RCVTIMEO=100
    sock.REQ_RELAXED=1  # Does not seem to matter
    sock.LINGER=0
    sock.connect(URL)
        
    i = 0
    global running, error
    # Loop sending requests and receiving responses
    while running:
        i += 1
        try:
            msg = 'test'.encode('utf-8')  # Arbitrary request data
            sock.send(msg)
        except zmq.Again:
            print('Req timed out sending')
        try:
            resp = sock.recv()
        except zmq.Again:
            print(f'Req timed out receiving response {i}')
            error = True  # Flag to have other thread show it's still running after lost message
            time.sleep(1) # Let Rep thread print showing that it's alive post-error
            break
        except Exception as ex:
            print(f'Unexpected exception: {ex}')
        if i > count:  # Unclear how many are needed for 100% prob of failure.  1M was not enough sometimes.
            break
    running = False
    sock.close()

def run_test(hwm, N):
    global running, error
    running = True
    error = False
    rep_thread = threading.Thread(target=reply_sock, daemon=True, args=(hwm,))
    rep_thread.start()
    time.sleep(0.1)  # Let Req get started
    req_sock(N)
    rep_thread.join(timeout=3)
    if rep_thread.is_alive():
        print('Req thread did not join')
     
if __name__ == "__main__":
    N = 5000000  # Number of messages to send
    run_test(hwm=1, N=N)  # Will usually fail < 5M msgs
    run_test(hwm=2, N=N)  # Never fails

What's the actual result? (include assertion message & call stack if applicable)

Requestor eventually times out waiting for the response if HWM=1 due to lost message.
Requestor never times out if HWM=2.

What's the expected result?

Requestor never times out - all messages are sent.

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions