Skip to content

Transport Layer Design #104

@magniloquency

Description

@magniloquency

Research:

We're trying to replace ZMQ
Fact: ZMQ is very complex
Sockets are a very powerful abstraction

  • guarantee delivery, even through network failures
  • zmq sockets are not like regular sockets, because they actually represent any number of connections / peers
  • zmq sockets abstract over a wide array of transport types: tcp, unix, intraprocess (which aren't even sockets)
  • sockets have different types: router, dealer, pub, sub, etc.
  • support both sync and async interfaces

Scaler has 4 different classes that use zmq:

  • async binder
  • async connector
  • sync binder
  • sync subscriber
  • ???

what's the difference between these 4+? Actually very little, they all use a single zmq socket and differ only in transport type and router, dealer, etc. type.

Sessions -> thread pool, also a context for intraprocess to communicate (because they're not sockets)

  • very similar in concept to zmq contexts
    connectors -> very similar to a zmq socket, the underlying concept for the async binder, async connector, etc. -> has peers
    what is a peer: zmq sockets are an abstraction, they can be connected to multiple other sockets at a time: we call these peers
    peer: represents a single side of a connection between two connectors (sockets)

socket a: pub, bound
socket b: connected to a
socket c: connected to a

we have two sockets connected to a

a has 3 sockets: the bound tcp socket for listening, and a socket per peer (2 of them)

peer_sock <- accept(bound_sock)

in this situation there are 3 things to epoll, at least 3:

  • poll the bound socket <- returns when there is a connection to accept
  • poll the peer sockets <- might be readable (incoming message), might be writable (we can write a message)
  • do we need to send a message to a peer? <- comes in on a concurrent queue
  • did the client issue a read? <- comes in on a concurrent queue

router is special: when you a send a message on a router, you must include an address. the router will then send it to a specific peer with that address.

assume a is a router: a.send(b, msg) <-- send the message to peer b
also: when you receive a message, it will tell you which peer it came from <-- for simplicity in our impl all of the sockets do this

msg <- a.recv()
msg.address -> b or c

connector types: how the peer to send to is determined

  • router: send() must include destination address
  • pub: send to all peers
  • dealer: round-robin the peers e.g. send to b, then send to c, then send to b, ...
  • sub: does not support send
  • pair: send to the ONLY peer

peers are not directly exposed by the library to the caller/client. they only work with connectors

Python creates future -> send_async() -> pass to io thread -> epoll() returns -> the send is queued for the peer* -> when the peer becomes writable, epoll() returns** -> write the message to the peer's socket*** -> complete -> future_set_result()

*which peer depends on the connector type, e.g. router, pair, etc.
**: we don't know when the peer will become writable, just at SOME point in the future
***: this could take multiple returns from epoll() if e.g. the tcp buffer fills up, or maybe the peer's connection fails and it has to reconnect. we guarantee sends so it's possible that the message is being written to the peer, but the conn fails, so we re-establish, try sending it again, and THEN the future is resolved <-- this could take a "long" time
async: we don't know when the future is going to resolve

problem: how to communicate from python thread to io threads?
solution: use a concurrent queue

python thread can put messages, etc. in queue without needing extra sync with io threads

python thread puts things in the queue
io threads remove things from the queue
^ it's possible for these to happen at the exact same time

python thread:
queue.enqueue(...)
eventfd_signal(queue_signal)

io thread:
epoll() <- wait for queue_signal
queue.dequeue()

Q: is it possible for dequeue and enqueue to race?
A: maybe?

do we need concurrent queue?

class MQueue {
    mutex m_lock;
    std::queue q;

    void send(...) {
        // lock
        // send
        // unlock
    }

    void recv(...) {
        // lock
        // recv
        // unlock
    }
}

epoll only knows about fds
std::mutex is not an fd.

various kinds of fds:

  • sockets
  • signal fds
  • event fds (can work as semaphore)
  • timer fds
  • ?

Q: what if don't block on epoll -- what if we block on the queue?
A: this won't work because then the io thread would only do work when something arrives in the queue -- what if a peer had a message or disconnected or became writable?

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions