Transport Layer Design #295
Replies: 28 comments
-
|
You only need two type of sockets:
Please control yourself, and DO NOT think about publisher/subscriber pattern, that can be finished in application level as long as we have above sender and router |
Beta Was this translation helpful? Give feedback.
-
This Comment contains design interface.Once updated, a comment is sent to this issue to explain what has been done. // -*- C++ -*-
#include <sys/socket.h>
#include <atomic>
#include <functional>
#include <map>
#include <optional>
/* NOTE:
* - This file is organized in a down-to-top fashion. Internal structures are listed before Interface classes.
* - This file is also separated by abstraction layers. There are three layers,
* - Network layer,
* - Eventloop layer,
* - API layer.
* - These layers are listed in this file in above order.
* - Do NOT panic when seeing structures that are not defined. They will be defined and explained later.
* - There are some resources online that are useful for implementing a network lib. They are listed below.
* - (1). https://github.com/yugabyte/libev/blob/master/ev.pod
* - (2). https://cvs.schmorp.de/libev/ev.html#ANATOMY_OF_A_WATCHER
* */
// NOTE: NETWORK LAYER
// This class is an RAII class that holds a file descriptor.
// It should,
// - Hold an OPENED file descriptor.
// - Automatically close the file descriptor when destructed.
// - When destructed, or moved, set internal fd to invalid val.
//
// It should NOT,
// - be copyable.
// - provide method that operates the internal fd.
struct FileDescriptor {
int fd; // <-- public
FileDescriptor(int fd);
// int accept() { return accept(fd, ...); }
FileDescriptor(const FileDescriptor&) = delete;
FileDescriptor& operator=(const FileDescriptor&) = delete;
FileDescriptor(FileDescriptor&&);
FileDescriptor& operator=(FileDescriptor&&);
~FileDescriptor();
};
// This class, as the name suggests, is responsible for accepting
// new connections.
// It should,
// - Hold a socket that is already listening.
// - Accept new connections in a callback.
// - Handle the typical error of accept, see (1).
// - Notify the event loop when destructed.
//
// It should NOT,
// - Have a USER PROVIDED callback, as there isn't much to do besides accepting.
// - be copyable.
class TcpServer {
EventLoopThread& eventLoop; // eventLoop thread will call onRead that is associated w/ the eventManager
std::unique_ptr<EventManager> eventManager; // will copy the `onRead()` to itself
FileDescriptor fd;
// Implementation defined method. accept(3) should happen here.
// This function will call user defined onAcceptReturn()
// It will handle error it can handle. If it is unreasonable to
// handle the error here, pass it to onAcceptReturn()
void onRead();
public:
TcpServer(const TcpServer&) = delete;
TcpServer& operator=(const TcpServer&) = delete;
using AcceptReturnCallback = std::function<void(FileDescriptor, sockaddr, int)>;
AcceptReturnCallback onAcceptReturn;
};
// This class, as the name suggests, is responsible for connecting
// new connections.
// It should do similar things as the TcpServer does, and
// - Provide retry functionality.
// - remember the current state of the fd (whether it is connected)
class TcpClient {
EventLoopThread& eventLoop; /* shared ownership */
std::unique_ptr<EventManager> eventManager;
// Implementation defined method. connect(3) should happen here.
// This function will call user defined onConnectReturn()
// It will handle error it can handle. If it is unreasonable to
// handle the error here, pass it to onConnectReturn()
void onWrite();
public:
TcpClient(const TcpClient&) = delete;
TcpClient& operator=(const TcpClient&) = delete;
using ConnectReturnCallback = std::function<void(FileDescriptor, sockaddr, int)>;
ConnectReturnCallback onConnectReturn;
void retry(/* Arguments */);
};
// This class wraps a tcp-connection. It does not know anything that behaves like ZMQ.
// It should,
// - remembers its current state (whether it is connected).
// - remembers its local and remote address.
// - read and write BYTES.
// - *NEW* Remember the length of last send()
// - maintains internal buffer for send and recv.
// - close reading to the remote end.
//
// It should NOT,
// - be copyable.
// - do actual send. send in this class copies msg to internal buffer.
// Rename to MessageConnectionTCP, later on be MessageConnectionIPC
class MessageConnectionTCP {
EventLoopThread& eventLoop; /* shared ownership */
std::unique_ptr<EventManager> eventManager;
std::string remoteIdentity;
using Buffer = std::vector<unsigned char>; // placeholder, detail TBD
// recvBuffer moved to the MessageCallback
Buffer sendBuffer; // < powerful bufferr that maintains a cursor
Buffer recvBuffer;
sockaddr_storage localAddr;
sockaddr_storage remoteAddr;
// - Short read disconnect:
// We need to give the user back the rest of the buffer
// - Short write disconnect:
// We need to give the user back the rest of the buffer
// Put in the EventManager
void onRead();
void onWrite();
void onClose();
void onError();
// Deal with internal Buffer
// Message de-frame should happen here
void onReceiveBytes();
void onSendBytes();
void sendBytes(char* msg, size_t len);
bool reading; // Are we going to read? (maybe helper functions) (maybe atomic)
enum MessageConnectionTCPState { Disconnected, Connecting, Connected } tcpConnectionState;
// ^-- perhaps provide functions like bool disconnected()?
using SendMessageContinuation = std::function<void()>; // placeholder, detail TBD
using RecvMessageContinuation = std::function<void()>; // placeholder, detail TBD
std::vector<size_t> sendMessageLength;
std::vector<size_t> recvMessageDesiredLength;
public:
MessageConnectionTCP(const MessageConnectionTCP&) = delete;
MessageConnectionTCP& operator=(const MessageConnectionTCP&) = delete;
using ConnectionCallback = std::function<void(sockaddr, sockaddr)>;
using RemoteCloseCallback = std::function<void()>; // placeholder, detail TBD
void sendMessage(Message* msg, SendMessageContinuation cont);
void recvMessage(Message* msg, RecvMessageContinuation cont);
void shutdown();
void forceClose();
ConnectionCallback onConnected;
RemoteCloseCallback onRemoteClose;
};
// NOTE: EVENTLOOP LAYER
// This class is a unified event manager. This will be the user data we passed in to
// BackendContext(e.g. epoll_ctl). TcpServer, TcpClient, and MessageConnectionTCP use this class.
//
// It should,
// - provide a unified interface to io_uring and epoll. This means we cannot use events
// directly. An ad-hoc way is to replace `int events` with `uint64_t custom_events` and
// parse it ourselve in the corresponding BackendContext. For now, let's use `int events`.
// - Hold reference to the buffer, instead of actual ones.
// - Let the user define eventCallback
//
// It should NOT,
// - know about zmq-like handle's internal. (this logic should be placed in eventCallback)
class EventManager {
EventLoopThread& eventLoop;
const int fd;
// Implementation defined method, will call onRead, onWrite etc based on events
void onEvents();
public:
int events;
int revents;
void updateEvents();
// User that registered them should have everything they need
// In the future, we might add more onXX() methods, for now these are all we need.
using OnEventCallback = std::function<void()>;
OnEventCallback onRead;
OnEventCallback onWrite;
OnEventCallback onClose;
OnEventCallback onError;
};
// This class defines the interface of an event loop. It allows the user to swap its
// backend context as long as they are equipped with the correct interface.
//
// Below interface will dispatch the call to its backend.
// - loop() will start looping until stopped. Imagine the main logic of event loop goes
// here.
// - stop() will send a stop signal (not necessarily a `signal(2)`) to the thread it bundled
// with. The event loop will quit when receiving this signal.
// - execute_now() will interrupt the current wait, and execute a function.
// - execute_later() will remember a function, of which will be executed after a round of wait.
// - execute_at() will remember a function, and execute them when time arrives.
// - cancel_execution() will cancel a function execution request, if that request is not in
// execution yet.
// - registerCallbackBeforeLoop will register an event to the backend before the eventloop start
// to loop(). This comes handy when we want to add an accept request, for example, to the loop.
//
// There will be three channels of communication from possibly another thread (aka Python thread).
// - immediateExecutionQueue, functors pushed to this queue will wake up the wait, and let the
// eventloop execute the functor.
// - timedExecutionQueue, functors pushed to this queue will have a timer equipped. They will wakeup
// eventloop when the time is right, and execute that functor. This comes handy for reconnect().
// - delayedExecutionQueue, functors pushed to this queue will be executed once when wait ends.
//
// From the backend's perspective, these three queues does not have to be concurrent queue, as the
// EventLoop already sort the problem out for them. The backend's
// - immediateExecutionQueue will possibly be a std::queue
// - timedExecutionQueue will possibly be a priorityQueue with relation imposes on timer.
// - delayedExecutionQueue will possibly be a std::queue as well.
//
// We impose no constraints on what data structure the backend will be using.
template <class EventLoopBackend = EpollContext>
struct EventLoop {
using Function = std::function<void()>; // TBD
using TimeStamp = int; // TBD
using Identifier = int; // TBD
void loop();
void stop();
// send_to_connector() C API
// []() {
// binding a ref to the ioSocket; (B)
// auto tcpconn = B.find_tcp_connection[A]
// tcpconn.send(message);
// tcpconn.setWriteCompleteCallback(resolve future);
// };
void executeNow(Function func);
void executeLater(Function func, Identifier identifier);
void executeAt(TimeStamp, Function, Identifier identifier);
void cancelExecution(Identifier identifier);
void registerCallbackBeforeLoop(EventManager*);
InterruptiveConcurrentQueue<FunctionType> immediateExecutionQueue;
TimedConcurrentQueue<FunctionType> timedExecutionQueue;
ConcurrentQueue<FunctionType> delayedExecutionQueue;
EventLoopBackend eventLoopBackend;
};
// EpollContext, passed in as template argument of a EventLoop
struct EpollContext {
using Function = std::function<void()>; // TBD
using TimeStamp = int; // TBD
using Identifier = int; // TBD
void registerCallbackBeforeLoop(EventManager*);
void loop() {
for (;;) {
// EXAMPLE
// epoll_wait;
// for each event that is returned to the caller {
// cast the event back to EventManager
// if this event is an eventfd {
// func = queue front
// func()
// } else if this event is an timerfd {
// if it is oneshot then execute once
// if it is multishot then execute and rearm timer
// } else {
// eventmanager.revent = events return by epoll
// eventmanager.on_event() ,
// where as on_event is set differently for tcpserver, tcpclient, and tcpconn
// they are defined something like:
// tcpserver.on_event() {
// accept the socket and generate a new tcpConn or handle err
// this.ioSocket.addNewConn(tcpConn)
// }
// tcpclient.on_event() {
// connect the socket and generate a new tcpConn
// this.ioSocket.addNewConn(tcpConn)
// if there need retry {
// close this socket
// this.eventloop.executeAfter(the time you want from now)
// }
// }
// tcpConn.on_event() {
// read everything you can to the buffer
// write everything you can to write the remote end
// if tcpconn.ioSocket is something special, for example dealer
// tcpConn.ioSocket.route to corresponding tcpConn
// }
//
// }
// }
}
}
void stop();
void executeNow(Function func);
void executeLater(Function func, Identifier identifier);
void executeAt(TimeStamp, Function, Identifier identifier);
void cancelExecution(Identifier identifier);
// int epoll_fd;
// int connect_timer_tfd;
// std::map<int, EventManager*> monitoringEvent;
// bool timer_armed;
// // NOTE: Utility functions, may be defined otherwise
// void ensure_timer_armed();
// void add_epoll(int fd, uint32_t flags, EpollType type, void* data);
// void remove_epoll(int fd);
// EpollData* epoll_by_fd(int fd);
};
// Types are configured here
struct configuration {
using polling_context_t = EpollContext;
};
// NOTE: API LAYER
// - This class wraps an IOSocket, which will be the primary interface user operates on.
// - User may want to call ioSocket.send(...) from python, for example.
//
// - Every instance of this class is bundled with a thread. This is a temporary solution.
//
// It should,
// - Remember the thread it is bundled with.
// - Owns the pollingContext.
// - Maintains a bunch of connections, an acceptor, and a connector. The last two are optional.
// - Remember what kind of a socket it is.
// - Provide apis for sending and receiving messages.
class IOSocket {
EventLoopThread& eventLoopThread;
enum SocketTypes { Binder, Sub, Pub, Dealer, Router, Pair /* etc. */ };
SocketTypes socketTypes;
std::optional<TcpServer> tcpServer;
std::optional<TcpClient> tcpClient;
std::map<int /* class FileDescriptor */, MessageConnectionTCP*> fdToConnection;
std::map<std::string, MessageConnectionTCP*> identityToConnection;
public:
IOSocket(const IOSocket&) = delete;
IOSocket& operator=(const IOSocket&) = delete;
IOSocket(IOSocket&&) = delete;
IOSocket& operator=(IOSocket&&) = delete;
const std::string identity;
// string -> connection mapping
// and connection->string mapping
// put it into the concurrent q, which is execute_now
void sendMessage(Message* msg, Continuation cont) {
// EXAMPLE
// execute_now(
// switch (socketTypes) {
// case Pub:
// for (auto [fd, conn] &: fd_to_conn) {
// conn.send(msg.len, msg.size);
// conn.setWriteCompleteCallback(cont);
// eventLoopThread.getEventLoop().update_events(turn write on for this fd);
// }
// break;
// }
// )
}
void recvMessage(Message* msg);
};
class EventLoopThread {
using PollingContext = configuration::polling_context_t;
std::thread thread;
std::map<std::string /* type of IOSocket's identity */, IOSocket> identityToIOSocket;
EventLoop<PollingContext> eventLoop;
public:
// Why not make the class a friend class of IOContext?
// Because the removeIOSocket method is a bit trickier than addIOSocket,
// the IOSocket that is being removed will first remove every MessageConnectionTCP
// managed by it from the EventLoop, before it removes it self from ioSockets.
IOSocket* addIOSocket(/* args */);
bool removeIOSocket(IOSocket*);
EventLoop<PollingContext>& getEventLoop();
IOSocket* getIOSocketByIdentity(size_t identity);
EventLoopThread(const EventLoopThread&) = delete;
EventLoopThread& operator=(const EventLoopThread&) = delete;
};
class IOContext {
std::vector<EventLoopThread> threads;
// std::vector<IntraProcessTcpClient*> inprocs;
// std::shared_mutex intra_process_mutex;
public:
IOContext(const IOContext&) = delete;
IOContext& operator=(const IOContext&) = delete;
IOContext(IOContext&&) = delete;
IOContext& operator=(IOContext&&) = delete;
// These methods need to be thread-safe.
IOSocket* addIOSocket(/* args */);
// ioSocket.getEventLoop().removeIOSocket(&ioSocket);
bool removeIOSocket(IOSocket*);
};
// NOTE: There are some structure that needs to be defined. These are,
// - Buffer. The Buffer should be a vector equipped with a cursor. It should remember
// the user's message len and call writeCompleteCallback when it finishes writing.
// - TimedConcurrentQueue. A priority queue container that is concurrent.
// NOTE:
// Major changes since last revision:
// - It used to be the case that each IOSocket maintains one thread. This is not feasible
// because Object Storage Server will also use this code. Since it tends to maintain a
// large amount of IOSocket, where each IOSocket contains only one MessageConnectionTCP, the
// overhead will be too heavy. Plus, with this design user will not be able to control
// the amount of thread being spawned. NOW, multiple IOSockets share the same event loop
// thread, and the amound of event loop thread should be specified by the user.
// With respect to last week's discussion:
// - Discussion result, renaming Acceptor -> TcpServer, Connector -> TcpClient.
// - FileDescriptor should not have methods FOR NOW. This is because we are currently in
// design phase, the decision should be postponed to the implementation phase.
// - Bundled some arguments together. EventLoopThread will maintain identity to IOSockets,
// which must present due to the need to support guaranteed message delivery. Information
// that used to be retrieved from IOSocket are now retrieved indirectly from
// EventLoopThread.
// - Do not fill in the function call back type FOR NOW. This will not affect the design
// in a significant way.
// - Do NOT remove EventManager type. This is because we need to remember its members'
// states. Especially what events that user cares about. There's no other
// good place to store, for example, `EventManager::events` if we turn EventManager
// to a std::function. The proposed "capture this" requires every component that interact
// with epoll to know about `events` and `revents`. Secondly, we sometime need to
// retrieve this information outside the callback. For example, we might be interested
// in turning off reading/writing from a connection. If we store information that used
// to be stored in EventManager in components like MessageConnectionTCP etc. then we need to
// find the actual object instead of simply get it from the EventManager. This essentially
// make the epoll context know each and every component and breaks encapsulation. What we
// can do, however, is to put the common part in EventManager, and store a std::function
// in the EventManager and let components interact with that.
|
Beta Was this translation helpful? Give feedback.
-
|
I am okay to remove concurrentqueue if use asyncio, basically moved concurrentqueue to async loop, but I would like to have mutilple-producer, single-consumer capacity (as multiple clients can share same connection) (make push to queue thread safe) I still don't like the name Session, can we have a terminology name post above |
Beta Was this translation helpful? Give feedback.
-
|
@sharpener6, we wrote some documentation here: https://github.com/Citi/scaler/pull/54/files#diff-65f6678f6bb355538db317b3c40483046c108d3b5a6ede3bae5733ae7ffb3844 It's incomplete for now, there is more to do. |
Beta Was this translation helpful? Give feedback.
-
// delete copy constructor
struct ThreadContext {
const size_t id; // is this going to change?
IoContext* ioctx;
std::thread thread;
std::deque<RawPeer*> connecting;
// This should contain union of epoll_context, io_uring_context, etc.
// This will be our switching point
// Every thing epoll related is delegate to here
BackendContext backend_context;
std::vector<EpollData*> io_cache;
ConcurrentQueue<ControlRequest> control_queue;
// Everything epoll related should be delegate to backend context
// int control_efd;
// int epoll_fd;
// int epoll_close_efd;
// int connect_timer_tfd;
bool timer_armed;
void ensure_timer_armed();
// This two should be conbined to one, with template/overload
// Eventually call add_epoll in backend context
void add_connector(NetworkConnector* connector);
void add_peer(RawPeer* peer);
// add_epoll is deleted
// // must be called on io-thread
// void add_epoll(int fd, uint32_t flags, EpollType type, void* data);
// This two should be combined to one, with template/overload
// Eventually call remove_epoll in backend context
void remove_connector(NetworkConnector* connector);
void remove_peer(RawPeer* peer);
// remove_epoll is deleted
// void remove_epoll(int fd);
// must be called on io-thread
// This should probably goes into backend context as well
EpollData* epoll_by_fd(int fd);
void control(ControlRequest request);
void start();
};// epoll handlers
void network_connector_send_event(NetworkConnector* connector) {
for (;;) {
if (connector->muted())
return;
// calls like this probably become something like this
// connector->thread->context->backend_context->wait_on(send_event_fd)
if (eventfd_wait(connector->send_event_fd) < 0) {
// semaphore is zero, we can epoll_wait() again
if (errno == EAGAIN)
break;
panic("handle eventfd read error: " + std::to_string(errno));
}
// invariant: if we decremented the semaphore the queue must have a message
// we loop because thread synchronization may be delayed
SendMessage send;
while (!connector->send_queue.try_dequeue(send))
; // wait
connector->send(send);
}
} |
Beta Was this translation helpful? Give feedback.
-
|
Things in this comment should be addressed later than the last comment:
Would be nice to bind them together. This way we reduce the noise in the main class. It's also worth to wrap it, since we have at least 4 different
|
Beta Was this translation helpful? Give feedback.
-
The documentation can be in this issue, and we can tag this issue as |
Beta Was this translation helpful? Give feedback.
-
epoll_wait() // fd is readable
buffer = read(fd)
do_stuff(buffer)
iouring_wait() // fd _has been read_
// read() <-- missing
do_stuff(cqe.buffer)
---
epoll_wait() // send_event_fd is readable
switch event type;
eventfd_wait(send_event_fd) // this is a read()
handle_send_event() // dequeue from concurrentqueue, send the message, etc.
iouring_wait() // send_event_fd _has been read_
switch event type;
handle_send_event()
// epoll
void network_connector_send_event_epoll(Connector conn) {
eventfd_wait(conn->send_event_fd); // epoll: need to do the read ourselves
network_connector_send_event_inner(conn);
}
// io uring
void network_connector_send_event_iouring(Connector conn) {
iouring_prep_read(conn->send_event_fd); // io uring: need to re-up the request
network_connector_send_event_inner(conn);
} |
Beta Was this translation helpful? Give feedback.
-
struct BackendContext {
struct EpollContext {
int epoll_fd;
std::vector<EpollData*> io_cache;
void add_epoll(int fd, uint32_t flags, EpollType type, void* data);
};
auto wait();
// Dispatch it to whatever context we are using
void add_connector(NetworkConnector* connector);
void remove_connector(NetworkConnector* connector);
void add_peer(RawPeer* peer);
void remove_peer(RawPeer* peer);
// When there is other context, make it a union
EpollContext context;
};
|
Beta Was this translation helpful? Give feedback.
-
|
make Connector that expose an method that return structure that contains epoll fd and events, in |
Beta Was this translation helpful? Give feedback.
-
|
@sharpener6 the return type of |
Beta Was this translation helpful? Give feedback.
-
|
I would prefer a less abstract approach here, instead of having a common event struct and sharing the code paths for the event waiting, we can have separate functions of epoll and io uring wait, and then share the common logic without dealing with the particulars of each io backend |
Beta Was this translation helpful? Give feedback.
-
|
Unsorted notes from this morning's meeting. class IoContext { vector<Thread> threads; }
class Thread { std::thread t; EventLoop loop; void run(EventLoop *loop); }
Thread::run(*loop) {
for (;;) {
auto stop = loop->process_events();
if (stop)
break;
}
}
// called by python thread
void io_context_destroy(IoContext *ctx) {
// eventfd_signal(ctx->shutdown);
// ctx->threads[0].shutdown();
}
class EventLoop { bool process_events(); void shutdown(); }
class EpollEventLoop: EventLoop { int epoll_fd; }
class IoUringEventLoop: EventLoop { ... }
class Thread {
std::thread t;
Executor exec;
}
class Executor {
// member variables
void process_events();
}
class EpollExecutor {
// epoll-specific things
int epoll_fd;
void process_events() {
epoll_Wait();
...
}
}
class IoUringExecutor {
void process_events() {
io_uring_wait();
...
}
}
void main() {
if type == epoll {
ctx = epoll_ctx();
} else if type == io_uring {
ctx = io_uring_ctx();
}
io_thread_main(ctx);
} |
Beta Was this translation helpful? Give feedback.
-
Proposal: Abstracting the IO Facility in the Transport LayerThe current implementation of the new transport layer uses epoll to manage io. Each thread has its own instance of epoll which it monitors for events on its assigned connectors, peers, and various control mechanisms (control requests, shutdown). However, there are many io facilities available: io uring, aio, libuv, and more. If we are to enable the use of different io facilities in the transport layer we need to define an abstraction. What we have nowA rough outline of the structure of the code. Note: Pseudocode struct IoContext {
std::vector<ThreadContext> threads;
// additional attributes elided for brevity
};
struct ThreadContext {
std::thread t; // the actual thread
int epoll_fd; // the fd for this thread's epoll instance
ConcurrentQueue<ControlRequest> control_queue; // the queue for control requests
int control_efd; // event fd for control requests
int shutdown_fd; // the event fd for shutting down
// additional attributes elided for brevity
};And we have the following important interactions: void io_context_init(IoContext *ctx, ...) {
for (int i = 0; i < num_threads; i++) {
ctx->threads.push_back({ .thread = std::thread(io_thread_main), ... });
}
...
}
void io_context_destroy(IoContext *ctx) {
for (auto thread : ctx->threads) {
eventfd_signal(thread->shutdown_efd);
thread->t.join();
}
...
}
void io_thread_main(ThreadContext *ctx) {
for (;;) {
auto event = epoll_wait(ctx->epoll_fd);
switch (event.type) {
// dispatch
}
}
}The most important point is: io threads run a function that loops forever, checking the io facility. For epoll this means calling Sharing CodeA surprising amount of code can be shared between implementations. Consider the following: // not important what this actually is
void handle_event_inner(...) {
...
}
// epoll dispatches to this
void handle_event_epoll(...) {
auto x = read(...); // with epoll we must do the io ourselves
handle_event_inner(...); // this is common logic
}
// io uring impl dispatches to this
void handle_event_io_uring(...) {
io_uring_prep_read(...); // for io uring we must renew the request
handle_event_inner(...);
}This demonstrates how epoll and io uring can share code, in particular the business logic contained in Option A:
|
Beta Was this translation helpful? Give feedback.
-
Not completely finished. For initial peek.UPD: Rename
|
Beta Was this translation helpful? Give feedback.
-
Diagrams of the Current DesignThis diagram is meant to document the existing design of the transport layer, so that we know what we are starting from. It is currently incomplete and will be updated over time. |
Beta Was this translation helpful? Give feedback.
-
|
@rafa-be @magniloquency Please review |
Beta Was this translation helpful? Give feedback.
-
class FileDescriptor
int fd;
public:
int accept() {
return accept(this->fd, .,..);
}
...
};This way the raw fd never needs to be exposed, and we can avoid copying. Or, if we want to copy it we could put it in a
|
Beta Was this translation helpful? Give feedback.
-
|
@magniloquency My bad for the late reply.
And YELL AT USER to not close this
gxu |
Beta Was this translation helpful? Give feedback.
-
|
Beta Was this translation helpful? Give feedback.
-
|
I agree with the discussions above, but here are a few remarks:
What I absolutely want is to remove the event dispatch (i.e. the Callbacks/std::function seems to me like an easy way to do it. But if you prefer a custom templated function to avoid the possible additional heap allocation of std::function, e.g: class SocketEventHandler {
int fd;
IOSocket *socket;
public:
void onEvent(EventType) {
...
}
};
struct EpollContext { // Shall we rename this? e.g. EpollEventLoop
template <typename EventHandler>
void registerCallbackBeforeLoop(EventHandler *);
...
};That's also fine. But please do not intertwine the upper socket layer with the lower event loop layer. It makes it harder to understand, test and re-use. |
Beta Was this translation helpful? Give feedback.
-
|
@rafa-be I tried your suggestion during the weekend. There is another (more important) reason to use this |
Beta Was this translation helpful? Give feedback.
-
|
@gxuu, can we just update the post above instead of update incrementally and put all updates in separate posts, it will be very hard for me to read through |
Beta Was this translation helpful? Give feedback.
-
|
I have started working on implementing the redesign. The code is here: magniloquency#1 Note: Still in early stages |
Beta Was this translation helpful? Give feedback.
-
@sharpener6 Sure. The third reply to this Issue will always contain the newest design interface, and Update details will be sent out as new replies. You can now get all information by simply checking the third comment. |
Beta Was this translation helpful? Give feedback.
-
|
layers: // lowest layer
void io_thread_main() {
epoll_wait();
// execute callbacks
}
// middle layer
struct awaitable {
awaitable(Message &msg) {...}
void await_suspend(handle) {
submit_to_epoll_thread([] { ...; handle.resume(); }); // <-- executed on epoll thread
}
}
awaitable send_message(msg) {
return {msg};
}
// top layer
// this is c api ?
task<> send_message2(iosocket, msg) {
co_await send_message(iosocket, msg);
}std::map<timerfd, function> callbacks;
epoll_wait();
auto em = static_cast<EventManager*>(event);
em->callback();
void add_timed_event(when, fn) {
auto fd = FileDescriptor::timerfd();
fd->set(when);
EventManager em = new EM(fd, fn);
eloop->register(em);
}
where is add_timed_event(); defined?
template<class Impl>
class Interface {
Impl impl;
T foo() { return impl.foo(); }
}
class Impl {
T foo() {
...;
}
} |
Beta Was this translation helpful? Give feedback.
-
void IoSocket::send(msg) {
// co_await socket_send(ctx, socket, msg); <- not ok
msg -> socket.queue;
}
task io_socket_session() {
auto event = co_await events(socket);
switch (event) {
case Send:
msg <- socket.queue;
co_await socket_send(ctx, socket, msg, [](Message& msg) { write(socket.fd, msg); });
}
}
awaitable socket_send(ctx, socket, msg, callback) {
awaitable aw;
auto em = EventManager([](){ callback(); aw.resume(); });
// send em to event loop *SOMEHOW*
}
void io_thread_main(ctx) {
auto event = epoll_wait();
eventManager->onEvent(event);
} |
Beta Was this translation helpful? Give feedback.
-
|
I am not sure how to embed image to issue, but here's a graph I draw for the architecture of YMQ. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Research:
We're trying to replace ZMQ
Fact: ZMQ is very complex
Sockets are a very powerful abstraction
Scaler has 4 different classes that use zmq:
what's the difference between these 4+? Actually very little, they all use a single zmq socket and differ only in transport type and router, dealer, etc. type.
Sessions -> thread pool, also a context for intraprocess to communicate (because they're not sockets)
connectors -> very similar to a zmq socket, the underlying concept for the async binder, async connector, etc. -> has peers
what is a peer: zmq sockets are an abstraction, they can be connected to multiple other sockets at a time: we call these peers
peer: represents a single side of a connection between two connectors (sockets)
socket a: pub, bound
socket b: connected to a
socket c: connected to a
a has 3 sockets: the bound tcp socket for listening, and a socket per peer (2 of them)
peer_sock <- accept(bound_sock)
in this situation there are 3 things to epoll, at least 3:
router is special: when you a send a message on a router, you must include an address. the router will then send it to a specific peer with that address.
assume a is a router: a.send(b, msg) <-- send the message to peer b
also: when you receive a message, it will tell you which peer it came from <-- for simplicity in our impl all of the sockets do this
msg <- a.recv()
msg.address -> b or c
connector types: how the peer to send to is determined
peers are not directly exposed by the library to the caller/client. they only work with connectors
Python creates future -> send_async() -> pass to io thread -> epoll() returns -> the send is queued for the peer* -> when the peer becomes writable, epoll() returns** -> write the message to the peer's socket*** -> complete -> future_set_result()
*which peer depends on the connector type, e.g. router, pair, etc.
**: we don't know when the peer will become writable, just at SOME point in the future
***: this could take multiple returns from epoll() if e.g. the tcp buffer fills up, or maybe the peer's connection fails and it has to reconnect. we guarantee sends so it's possible that the message is being written to the peer, but the conn fails, so we re-establish, try sending it again, and THEN the future is resolved <-- this could take a "long" time
async: we don't know when the future is going to resolve
problem: how to communicate from python thread to io threads?
solution: use a concurrent queue
python thread can put messages, etc. in queue without needing extra sync with io threads
python thread puts things in the queue
io threads remove things from the queue
^ it's possible for these to happen at the exact same time
python thread:
queue.enqueue(...)
eventfd_signal(queue_signal)
io thread:
epoll() <- wait for queue_signal
queue.dequeue()
Q: is it possible for dequeue and enqueue to race?
A: maybe?
do we need concurrent queue?
various kinds of fds:
Q: what if don't block on epoll -- what if we block on the queue?
A: this won't work because then the io thread would only do work when something arrives in the queue -- what if a peer had a message or disconnected or became writable?
Beta Was this translation helpful? Give feedback.
All reactions