Skip to content

Commit bdadcec

Browse files
authored
feat: beacon (#1664)
Create new process group library called Beacon that broadcast only group counts to other nodes. Actual pids are only available to the local node. It also supports custom adapter so that we can use PubSub for the broadcasting (including regional broadcasting).
1 parent bb1c2b6 commit bdadcec

24 files changed

+1681
-49
lines changed

.github/workflows/beacon_tests.yml

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
name: Beacon Tests
2+
defaults:
3+
run:
4+
shell: bash
5+
working-directory: ./beacon
6+
on:
7+
pull_request:
8+
paths:
9+
- "beacon/**"
10+
11+
push:
12+
branches:
13+
- main
14+
15+
concurrency:
16+
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
17+
cancel-in-progress: true
18+
19+
env:
20+
MIX_ENV: test
21+
22+
jobs:
23+
tests:
24+
name: Tests & Lint
25+
runs-on: ubuntu-latest
26+
27+
steps:
28+
- uses: actions/checkout@v2
29+
- name: Setup elixir
30+
id: beam
31+
uses: erlef/setup-beam@v1
32+
with:
33+
otp-version: 27.x # Define the OTP version [required]
34+
elixir-version: 1.18.x # Define the elixir version [required]
35+
- name: Install dependencies
36+
run: mix deps.get
37+
- name: Start epmd
38+
run: epmd -daemon
39+
- name: Run tests
40+
run: MIX_ENV=test mix test
41+
env:
42+
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
43+
- name: Check for warnings
44+
run: mix compile --force --warnings-as-errors
45+
- name: Run format check
46+
run: mix format --check-formatted

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ RUN mix local.hex --force && \
3434

3535
# install mix dependencies
3636
COPY mix.exs mix.lock ./
37+
COPY beacon beacon
3738
RUN mix deps.get --only $MIX_ENV
3839
RUN mkdir config
3940

beacon/.formatter.exs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Used by "mix format"
2+
[
3+
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
4+
]

beacon/.gitignore

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# The directory Mix will write compiled artifacts to.
2+
/_build/
3+
4+
# If you run "mix test --cover", coverage assets end up here.
5+
/cover/
6+
7+
# The directory Mix downloads your dependencies sources to.
8+
/deps/
9+
10+
# Where third-party dependencies like ExDoc output generated docs.
11+
/doc/
12+
13+
# If the VM crashes, it generates a dump, let's ignore it too.
14+
erl_crash.dump
15+
16+
# Also ignore archive artifacts (built via "mix archive.build").
17+
*.ez
18+
19+
# Ignore package tarball (built via "mix hex.build").
20+
beacon-*.tar
21+
22+
# Temporary files, for example, from tests.
23+
/tmp/

beacon/README.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# Beacon
2+
3+
Beacon is a scalable process group manager. The main use case for this library is to have membership counts available on the cluster without spamming whenever a process joins or leaves a group. A node can have thousands of processes joining and leaving hundreds of groups while sending just the membership count to other nodes.
4+
5+
The main features are:
6+
7+
* Process pids are available only to the node the where the processes reside;
8+
* Groups are partitioned locally to allow greater concurrency while joining different groups;
9+
* Group counts are periodically broadcasted (defaults to every 5 seconds) to update group membership numbers to all participating nodes;
10+
* Sub-cluster nodes join by using same scope;
11+
12+
## Installation
13+
14+
The package can be installed by adding `beacon` to your list of dependencies in `mix.exs`:
15+
16+
```elixir
17+
def deps do
18+
[
19+
{:beacon, "~> 1.0"}
20+
]
21+
end
22+
```
23+
24+
## Using
25+
26+
Add Beacon to your application's supervision tree specifying a scope name (here it's `:users`)
27+
28+
```elixir
29+
def start(_type, _args) do
30+
children =
31+
[
32+
{Beacon, :users},
33+
# Or passing options:
34+
# {Beacon, [:users, opts]}
35+
# See Beacon.start_link/2 for the options
36+
```
37+
38+
Now process can join groups
39+
40+
```elixir
41+
iex> pid = self()
42+
#PID<0.852.0>
43+
iex> Beacon.join(:users, {:tenant, 123}, pid)
44+
:ok
45+
iex> Beacon.local_member_count(:users, {:tenant, 123})
46+
1
47+
iex> Beacon.local_members(:users, {:tenant, 123})
48+
[#PID<0.852.0>]
49+
iex> Beacon.local_member?(:users, {:tenant, 123}, pid)
50+
true
51+
```
52+
53+
From another node part of the same scope:
54+
55+
```elixir
56+
iex> Beacon.member_counts(:users)
57+
%{{:tenant, 123} => 1}
58+
iex> Beacon.member_count(:users, {:tenant, 123})
59+
1
60+
```

beacon/config/config.exs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
import Config
2+
3+
# Print nothing during tests unless captured or a test failure happens
4+
config :logger, backends: [], level: :debug

beacon/lib/beacon.ex

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
defmodule Beacon do
2+
@moduledoc """
3+
Distributed process group membership tracking.
4+
"""
5+
6+
alias Beacon.Partition
7+
alias Beacon.Scope
8+
9+
@type group :: any
10+
@type start_option ::
11+
{:partitions, pos_integer()} | {:broadcast_interval_in_ms, non_neg_integer()}
12+
13+
@doc "Returns a supervisor child specification for a Beacon scope"
14+
def child_spec([scope]) when is_atom(scope), do: child_spec([scope, []])
15+
def child_spec(scope) when is_atom(scope), do: child_spec([scope, []])
16+
17+
def child_spec([scope, opts]) when is_atom(scope) and is_list(opts) do
18+
%{
19+
id: Beacon,
20+
start: {__MODULE__, :start_link, [scope, opts]},
21+
type: :supervisor
22+
}
23+
end
24+
25+
@doc """
26+
Starts the Beacon supervision tree for `scope`.
27+
28+
Options:
29+
30+
* `:partitions` - number of partitions to use (default: number of schedulers online)
31+
* `:broadcast_interval_in_ms`: - interval in milliseconds to broadcast membership counts to other nodes (default: 5000 ms)
32+
* `:message_module` - module implementing `Beacon.Adapter` behaviour (default: `Beacon.Adapter.ErlDist`)
33+
"""
34+
@spec start_link(atom, [start_option]) :: Supervisor.on_start()
35+
def start_link(scope, opts \\ []) when is_atom(scope) do
36+
{partitions, opts} = Keyword.pop(opts, :partitions, System.schedulers_online())
37+
broadcast_interval_in_ms = Keyword.get(opts, :broadcast_interval_in_ms)
38+
39+
if not (is_integer(partitions) and partitions >= 1) do
40+
raise ArgumentError,
41+
"expected :partitions to be a positive integer, got: #{inspect(partitions)}"
42+
end
43+
44+
if broadcast_interval_in_ms != nil and
45+
not (is_integer(broadcast_interval_in_ms) and broadcast_interval_in_ms > 0) do
46+
raise ArgumentError,
47+
"expected :broadcast_interval_in_ms to be a positive integer, got: #{inspect(broadcast_interval_in_ms)}"
48+
end
49+
50+
Beacon.Supervisor.start_link(scope, partitions, opts)
51+
end
52+
53+
@doc "Join pid to group in scope"
54+
@spec join(atom, any, pid) :: :ok | {:error, :not_local}
55+
def join(_scope, _group, pid) when is_pid(pid) and node(pid) != node(), do: {:error, :not_local}
56+
57+
def join(scope, group, pid) when is_atom(scope) and is_pid(pid) do
58+
Partition.join(Beacon.Supervisor.partition(scope, group), group, pid)
59+
end
60+
61+
@doc "Leave pid from group in scope"
62+
@spec leave(atom, group, pid) :: :ok
63+
def leave(scope, group, pid) when is_atom(scope) and is_pid(pid) do
64+
Partition.leave(Beacon.Supervisor.partition(scope, group), group, pid)
65+
end
66+
67+
@doc "Get total members count per group in scope"
68+
@spec member_counts(atom) :: %{group => non_neg_integer}
69+
def member_counts(scope) when is_atom(scope) do
70+
remote_counts = Scope.member_counts(scope)
71+
72+
scope
73+
|> local_member_counts()
74+
|> Map.merge(remote_counts, fn _k, v1, v2 -> v1 + v2 end)
75+
end
76+
77+
@doc "Get total member count of group in scope"
78+
@spec member_count(atom, group) :: non_neg_integer
79+
def member_count(scope, group) do
80+
local_member_count(scope, group) + Scope.member_count(scope, group)
81+
end
82+
83+
@doc "Get total member count of group in scope on specific node"
84+
@spec member_count(atom, group, node) :: non_neg_integer
85+
def member_count(scope, group, node) when node == node(), do: local_member_count(scope, group)
86+
def member_count(scope, group, node), do: Scope.member_count(scope, group, node)
87+
88+
@doc "Get local members of group in scope"
89+
@spec local_members(atom, group) :: [pid]
90+
def local_members(scope, group) when is_atom(scope) do
91+
Partition.members(Beacon.Supervisor.partition(scope, group), group)
92+
end
93+
94+
@doc "Get local member count of group in scope"
95+
@spec local_member_count(atom, group) :: non_neg_integer
96+
def local_member_count(scope, group) when is_atom(scope) do
97+
Partition.member_count(Beacon.Supervisor.partition(scope, group), group)
98+
end
99+
100+
@doc "Get local members count per group in scope"
101+
@spec local_member_counts(atom) :: %{group => non_neg_integer}
102+
def local_member_counts(scope) when is_atom(scope) do
103+
Enum.reduce(Beacon.Supervisor.partitions(scope), %{}, fn partition_name, acc ->
104+
Map.merge(acc, Partition.member_counts(partition_name))
105+
end)
106+
end
107+
108+
@doc "Check if pid is a local member of group in scope"
109+
@spec local_member?(atom, group, pid) :: boolean
110+
def local_member?(scope, group, pid) when is_atom(scope) and is_pid(pid) do
111+
Partition.member?(Beacon.Supervisor.partition(scope, group), group, pid)
112+
end
113+
114+
@doc "Get all local groups in scope"
115+
@spec local_groups(atom) :: [group]
116+
def local_groups(scope) when is_atom(scope) do
117+
Enum.flat_map(Beacon.Supervisor.partitions(scope), fn partition_name ->
118+
Partition.groups(partition_name)
119+
end)
120+
end
121+
122+
@doc "Get local group count in scope"
123+
@spec local_group_count(atom) :: non_neg_integer
124+
def local_group_count(scope) when is_atom(scope) do
125+
Enum.sum_by(Beacon.Supervisor.partitions(scope), fn partition_name ->
126+
Partition.group_count(partition_name)
127+
end)
128+
end
129+
130+
@doc "Get groups in scope"
131+
@spec groups(atom) :: [group]
132+
def groups(scope) when is_atom(scope) do
133+
remote_groups = Scope.groups(scope)
134+
135+
scope
136+
|> local_groups()
137+
|> MapSet.new()
138+
|> MapSet.union(remote_groups)
139+
|> MapSet.to_list()
140+
end
141+
142+
@doc "Get group count in scope"
143+
@spec group_count(atom) :: non_neg_integer
144+
def group_count(scope) when is_atom(scope) do
145+
remote_groups = Scope.groups(scope)
146+
147+
scope
148+
|> local_groups()
149+
|> MapSet.new()
150+
|> MapSet.union(remote_groups)
151+
|> MapSet.size()
152+
end
153+
end

beacon/lib/beacon/adapter.ex

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
defmodule Beacon.Adapter do
2+
@moduledoc """
3+
Behaviour module for Beacon messaging adapters.
4+
"""
5+
6+
@doc "Register the current process to receive messages for the given scope"
7+
@callback register(scope :: atom) :: :ok
8+
9+
@doc "Broadcast a message to all nodes in the given scope"
10+
@callback broadcast(scope :: atom, message :: term) :: any
11+
12+
@doc "Broadcast a message to specific nodes in the given scope"
13+
@callback broadcast(scope :: atom, [node], message :: term) :: any
14+
15+
@doc "Send a message to a specific node in the given scope"
16+
@callback send(scope :: atom, node, message :: term) :: any
17+
end
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
defmodule Beacon.Adapter.ErlDist do
2+
@moduledoc false
3+
4+
import Kernel, except: [send: 2]
5+
6+
@behaviour Beacon.Adapter
7+
8+
@impl true
9+
def register(scope) do
10+
Process.register(self(), Beacon.Supervisor.name(scope))
11+
:ok
12+
end
13+
14+
@impl true
15+
def broadcast(scope, message) do
16+
name = Beacon.Supervisor.name(scope)
17+
Enum.each(Node.list(), fn node -> :erlang.send({name, node}, message, [:noconnect]) end)
18+
end
19+
20+
@impl true
21+
def broadcast(scope, nodes, message) do
22+
name = Beacon.Supervisor.name(scope)
23+
Enum.each(nodes, fn node -> :erlang.send({name, node}, message, [:noconnect]) end)
24+
end
25+
26+
@impl true
27+
def send(scope, node, message) do
28+
:erlang.send({Beacon.Supervisor.name(scope), node}, message, [:noconnect])
29+
end
30+
end

0 commit comments

Comments
 (0)