|
| 1 | +* DistErl |
| 2 | + |
| 3 | +Erlang nodes need to agree on a way to talk to each other over the |
| 4 | +network. That agreed-upon way is called a distribution protocol. |
| 5 | + |
| 6 | +Erlang ships with a default distribution protocol called EPMD (Erlang |
| 7 | +Port Mapper Daemon). When a node starts up, it registers itself with a |
| 8 | +little helper process (EPMD) running on the same machine, and other |
| 9 | +nodes ask EPMD "hey, how do I reach the node named foo@mymachine?" |
| 10 | +EPMD answers with a port number, and the connection proceeds from |
| 11 | +there. It's a bit like a phone book: you look up a name, get a number, |
| 12 | +dial it. |
| 13 | + |
| 14 | +** Simplified example |
| 15 | +Malls, business park, etc have directories of what stores or businesses are there and where to find them. The default for most (the EPMD) is a physical board with names and locations on it. |
| 16 | + |
| 17 | +Alternatives could be QR codes that can be scanned for that information, a mall app folks can load on their phone to navigate, or TV screens that the owners can update in real time. |
| 18 | + |
| 19 | +* Why alternatives? |
| 20 | + |
| 21 | +** Security |
| 22 | +The default setup sends a cookie (a shared secret) in a way that might not be acceptable in a locked-down environment. You might want TLS from the very first byte, before any handshake. |
| 23 | + |
| 24 | +** Different environments |
| 25 | +In a Kubernetes cluster, for example, pods discover each other through DNS or a service registry, not through EPMD. A custom module can speak that language natively. |
| 26 | + |
| 27 | +** Avoiding an extra process |
| 28 | +EPMD is a separate OS-level daemon. Some deployment setups prefer not to have it running at all, especially in containers where you want one process per container. |
| 29 | + |
| 30 | +** Custom transports |
| 31 | +The default distribution uses TCP. If you wanted to run node-to-node communication over something else — a Unix domain socket, a shared memory channel, or a message bus — you could do that by implementing the right callbacks. |
| 32 | + |
| 33 | +* Distribution contract |
| 34 | +** Registration |
| 35 | +"Hi, I'm a node named foo@bar.com and can be reached on port 4369" |
| 36 | +** Node Lookup |
| 37 | +"Do you know how to reach bob@loblaw.com" |
| 38 | +** Connections |
| 39 | +Managing sockets (listening and accepting connections) |
| 40 | + |
| 41 | +* DistErl over AMQP |
| 42 | +** Why use AMQP |
| 43 | +Traditional EPMD-based distribution creates direct connections between connecting nodes, creating either a fully-connected mesh or (using hidden nodes) a hub-spoke arrangement. |
| 44 | + |
| 45 | +With AMQP, each node instead connects to the AMQP broker (we recommend RabbitMQ obviously), and inter-node traffic flows through the broker via exchanges, queues and their bindings. |
| 46 | + |
| 47 | +While this introduces the AMQP broker as a potential SPOF (single point of failure), presumably the broker is already part of your application's infrastructure and thus not introducing new risks. |
| 48 | + |
| 49 | +The benefits include interesting routing options now and more observability of inter-node communication in a single place (the broker). |
| 50 | + |
| 51 | +** Node Startup |
| 52 | +When the node =foo@bar.com= starts up, it would publish its presence to an pre-determined exchange |
| 53 | + |
| 54 | +** Inter-node communication |
| 55 | +Instead of TCP-based "connect to IP =A= on port =B=", nodes will "publish to exchange =A= with routing key =B=" where =A= is a direct exchange (where routing key = binding key) and =B= is the routing key of the destination node. |
| 56 | + |
| 57 | + |
| 58 | + |
| 59 | +* amqp_dist |
| 60 | +** BEAM startup requirements |
| 61 | +Code can only be used from the application itself, =kernel= and =stdlib=. |
| 62 | + |
| 63 | +The module controlling distribution interactions should be suffixed =_dist=: we called ours =amqp_dist= accordingly. |
| 64 | + |
| 65 | +** Infrastructure to setup |
| 66 | + |
| 67 | +- A listener entity (a process or port) |
| 68 | +- An acceptor process to accept incoming connections via the listening entity |
| 69 | + |
| 70 | +*** Per Connection |
| 71 | +Once a connection is accepted, the module needs to create: |
| 72 | +- a connection supervisor process (handles handshake for setting up the connection) |
| 73 | +- a distribution controller (process or port) for putting data onto the connection |
| 74 | + |
| 75 | +Both should be linked so they're cleaned up when the connection goes down |
| 76 | + |
| 77 | +*** Example dist module |
| 78 | +From https://www.erlang.org/doc/apps/erts/alt_dist.html |
| 79 | + |
| 80 | +#+begin_example |
| 81 | +An example implementation of a distribution module can be found [[https://www.erlang.org/doc/apps/erts/assets/gen_tcp_dist.erl][here]] |
| 82 | + |
| 83 | +It implements the distribution over TCP/IP using the gen_tcp API with |
| 84 | +distribution controllers implemented by processes. This instead of |
| 85 | +using port distribution controllers as the ordinary TCP/IP |
| 86 | +distribution uses. |
| 87 | +#+end_example |
| 88 | + |
| 89 | +** Required Callbacks |
| 90 | + |
| 91 | +#+begin_src erlang |
| 92 | +-export([listen/1 |
| 93 | + ,accept/1 |
| 94 | + ,accept_connection/5 |
| 95 | + ,setup/5 |
| 96 | + ,close/1 |
| 97 | + ,select/1 |
| 98 | + ,is_node_name/1 |
| 99 | + ,address/0 |
| 100 | + ]). |
| 101 | +#+end_src |
| 102 | + |
| 103 | +*** listen(Name) |
| 104 | +Called once, when Erlang distribution is brought up, to listen for incoming connection requests |
| 105 | + |
| 106 | +=Name= is the username part of a =Name@Host= full node name (can be =atom()= or =string()=). |
| 107 | + |
| 108 | +Returns a 3-tuple of ={Socket, Address, Creation}=: |
| 109 | + |
| 110 | +- =Socket= Could be a =#socket{}= but in our case, we have a =#fake_socket{}= record, represents a handle which will be passed to the =accept/1= callback later. |
| 111 | +- =Address= a =#net_address{}= record (defined in =kernel/include/net_address.hrl=) about the node |
| 112 | +- =Creation= is an integer between =1..3=; we chose 3. |
| 113 | + |
| 114 | +**** =#fake_socket{}= |
| 115 | +#+begin_src erlang |
| 116 | +-record(fake_socket, {read = 0, |
| 117 | + write = 0, |
| 118 | + pending = 0, |
| 119 | + pid = self() :: pid(), |
| 120 | + name :: term(), |
| 121 | + mypid :: pid() |
| 122 | + }). |
| 123 | + |
| 124 | + |
| 125 | +{ok, Pid} = amqp_dist_acceptor:start(self(), Name) % start a gen_server |
| 126 | + |
| 127 | +#fake_socket{name=Name, mypid=Pid} % Name from listen/1 arg, Pid is the amqp_dist_acceptor gen_server |
| 128 | +#+end_src |
| 129 | + |
| 130 | +**** =#net_address{}= |
| 131 | +#+begin_src erlang |
| 132 | +#net_address{address = [] |
| 133 | + ,host = inet:gethostname() |
| 134 | + ,protocol = amqp |
| 135 | + ,family = amqp |
| 136 | + } |
| 137 | +#+end_src |
| 138 | + |
| 139 | +*** accept(Listen) |
| 140 | + |
| 141 | +Accepts new connection attempts from other Erlang nodes. |
| 142 | + |
| 143 | +#+begin_src erlang |
| 144 | +accept(Listen) -> |
| 145 | + spawn_opt(?MODULE, start_accept, [self(), Listen], [link, {priority, max}]). |
| 146 | +#+end_src |
| 147 | + |
| 148 | +=accept_loop= receives connection tuples ={connection, Tag, Node, Connection, Queue}= from =amqp_dist_acceptor= |
| 149 | + |
| 150 | +The loop will message the kernel process =Kernel ! {accept, self(), {Tag, Node, Connection, Queue, Listen}, amqp, amqp}= to accept the connection and wait for the Kernel to respond with the supervising process via ={Kernel, controller, SupervisorPid}= message back. The =accept= tuple has the shape of ={accept,AcceptPid,Socket,Family,Proto}= which maps back to the =#net_address{}= returned in =listen/1=. |
| 151 | + |
| 152 | +=net_kernel= will call =amqp_dist:accept_connection/5= which will spawn a process into the =do_accept/6= function to perform the needed handshake. A new record =#hs_data{}= tracks the handshake information. |
| 153 | + |
| 154 | +** =amqp_dist_acceptor= |
| 155 | +This =gen_server= accepts AMQP payloads from other nodes to connect. |
| 156 | + |
| 157 | +After initializing, it starts AMQP connection(s) to the configured brokers via =start_connections/0=. These settings are fetched with =application:get_env/3= with the app's =env= might look like: |
| 158 | +#+begin_src erlang |
| 159 | +{env,[{heartbeat_period_ms, 30000} |
| 160 | + ,{heartbeat_timeout_ms, 45000} |
| 161 | + ,{connection_timeout_ms, 10000} |
| 162 | + ,{pause_before_reconnect_ms, 3500} |
| 163 | + ,{server_call_timeout_ms, 750} |
| 164 | + ,{connections, ["amqp://guest:guest@broker.add.re.ss:5672"] |
| 165 | + ]} |
| 166 | +#+end_src |
| 167 | + |
| 168 | +Once the broker connection is established: |
| 169 | +1. an AMQP channel is started |
| 170 | +2. the exchange =amq.headers= is configured |
| 171 | + - Headers exchange =amq.match= (and =amq.headers= in RabbitMQ) see [[https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-headers][here]]. For routing on attributes vs routing keys |
| 172 | +3. an exclusive queue is declared: =list_to_binary(["amqp_dist_acceptor-", atom_to_list(node()), "-", pid_to_list(self())]);= |
| 173 | +4. the queue is bound to the exchange with a header argument ={<<"distribution.ping">>, bool, true}= |
| 174 | +5. start consuming from the queue |
| 175 | + |
| 176 | +Once a broker is up and channel and queue configured, amqp_dist_acceptor starts a heartbeat timer (default 60s) which will publish a message with headers: |
| 177 | +#+begin_src erlang |
| 178 | +,reply_to = QueueName |
| 179 | +,headers = [{<<"distribution.ping">>, bool, true} |
| 180 | + ,{<<"node.start">>, timestamp, Start} |
| 181 | + ] |
| 182 | +#+end_src |
| 183 | + |
| 184 | +Which should match all the bindings for any other existing nodes' queues bound to the broker. |
| 185 | + |
| 186 | +*** AMQP Message handling |
| 187 | +**** New node present |
| 188 | +When a remote node publishes its heartbeat and the local node is seeing it for the first time, the =gen_server= will determine whether to auto-connect to the node (via the =auto_connect_nodes= env param), =net_kernel:connect_node(RemoteNode)= will be spawned to establish a connection to the remote node. Ultimately this will call =amqp_dist:select(RemoteNode)= which will call =amqp_dist_acceptor:is_up(RemoteNode)= which returns whether the remote node is known and the "connection" is established in =net_kernel=. |
| 189 | + |
| 190 | +**** Remote node wants to connect |
| 191 | +When the =amqp_dist_acceptor= receives a payload off AMQP, it will be the heartbeat of another node. |
| 192 | + |
| 193 | +The payload is a term_to_binary-encoded two-tuple ={amqp_dist, connect}=. When received, a 5-tuple will be sent to the =amqp_dist= acceptor process ={connection, Label, Node, Connection, RemoteQueue}=. |
| 194 | +** =amqp_dist_node= |
| 195 | +=gen_server= that handles sending and receiving data between the local node and a connected remote node. |
| 196 | + |
| 197 | +Once the handshake is completed, messages between nodes can begin. While not necessary, =amqp_dist= spawns an input handler process =amqp_dist:dist_cntrlr_input_setup/3= to register itself with =amqp_dist_node= as the receiver process for data from the remote node. |
| 198 | + |
| 199 | +For data from the local node to send to the remote node, =erlang:dist_ctrl_get_data(DHandle)= will be called and if data is returned, =amqp_dist_node:send/2= will take care of publishing the data to the correct remote node's AMQP queue (as the routing key). |
| 200 | + |
| 201 | +Arbitrary Erlang terms are encoded using =base64:encode(term_to_binary(Term))= for sending and decoded in reverse. |
0 commit comments