Skip to content

Commit a0fb78d

Browse files
committed
Merge branch 'remove-application'
2 parents eca9c92 + d52589b commit a0fb78d

11 files changed

Lines changed: 186 additions & 132 deletions

File tree

.tool-versions

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
elixir 1.5.1
1+
elixir 1.9.1

lib/hlclock.ex

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ defmodule HLClock do
33
Hybrid Logical Clock
44
55
Provides globally-unique, monotonic timestamps. Timestamps are bounded by the
6-
clock synchronization constraint, `max_drift`.
6+
clock synchronization constraint, max_drift. By default the max_drift is set
7+
to 300 seconds.
78
89
In order to account for physical time drift within the system, timestamps
910
should regularly be exchanged between nodes. Generate a timestamp at one node
@@ -12,39 +13,45 @@ defmodule HLClock do
1213
1314
Inspired by https://www.cse.buffalo.edu/tech-reports/2014-04.pdf
1415
"""
15-
1616
alias HLClock.Timestamp
1717

18+
def child_spec(opts) do
19+
%{
20+
id: __MODULE__,
21+
type: :worker,
22+
start: {__MODULE__, :start_link, [opts]}
23+
}
24+
end
25+
26+
def start_link(opts \\ []) do
27+
HLClock.Server.start_link(opts)
28+
end
29+
1830
@doc """
1931
Generate a single HLC Timestamp for sending to other nodes or
2032
local causality tracking
2133
"""
22-
def send_timestamp do
23-
GenServer.call(HLClock.Server, :send_timestamp)
34+
def send_timestamp(server) do
35+
GenServer.call(server, :send_timestamp)
2436
end
2537

2638
@doc """
2739
Given the current timestamp for this node and a provided remote timestamp,
2840
perform the merge of both logical time and logical counters. Returns the new
29-
current timestamp for the local node
41+
current timestamp for the local node.
3042
"""
31-
def recv_timestamp(remote) do
32-
GenServer.call(HLClock.Server, {:recv_timestamp, remote})
43+
def recv_timestamp(server, remote) do
44+
GenServer.call(server, {:recv_timestamp, remote})
3345
end
3446

3547
@doc """
3648
Functionally equivalent to using `send_timestamp/0`. This generates a timestamp
3749
for local causality tracking.
3850
"""
39-
def now do
40-
GenServer.call(HLClock.Server, :send_timestamp)
51+
def now(server) do
52+
GenServer.call(server, :send_timestamp)
4153
end
4254

43-
@doc """
44-
Configurable clock synchronization parameter, ε. Defaults to 300 seconds
45-
"""
46-
def max_drift(), do: Application.get_env(:hlclock, :max_drift_millis, 300_000)
47-
4855
@doc """
4956
Determines if the clock's timestamp "happened before" a different timestamp
5057
"""

lib/hlclock/server.ex

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,84 @@
11
defmodule HLClock.Server do
2+
@moduledoc false
23
use GenServer
34

45
alias HLClock.{NodeId, Timestamp}
56

7+
@gen_server_opts [
8+
:debug,
9+
:name,
10+
:timeout,
11+
:spawn_opt,
12+
:hibernate_after
13+
]
14+
615
def start_link(opts \\ []) do
7-
opts = build_opts(opts)
8-
GenServer.start_link(__MODULE__, opts, name: opts[:name])
16+
{gen_server_opts, other_opts} = Keyword.split(opts, @gen_server_opts)
17+
18+
hlc_opts = build_opts(other_opts)
19+
GenServer.start_link(__MODULE__, hlc_opts, gen_server_opts)
920
end
1021

1122
def init(opts) do
12-
Process.send_after(self(), :periodic_send, interval())
13-
Timestamp.new(physical_time(), default_counter(), node_id(opts))
23+
node_id = node_id(opts)
24+
initial_counter = 0
25+
26+
data = %{
27+
node_id: node_id,
28+
timestamp: Timestamp.new(physical_time(), initial_counter, node_id),
29+
max_drift: opts[:max_drift]
30+
}
31+
32+
Process.send_after(self(), :periodic_send, interval(data))
33+
{:ok, data}
1434
end
1535

16-
def handle_call(:send_timestamp, _from, timestamp) do
17-
case Timestamp.send(timestamp, physical_time()) do
36+
def handle_call(:send_timestamp, _from, data) do
37+
case Timestamp.send(data.timestamp, physical_time(), data.max_drift) do
1838
{:ok, timestamp} ->
19-
{:reply, {:ok, timestamp}, timestamp}
39+
{:reply, {:ok, timestamp}, %{data | timestamp: timestamp}}
2040

2141
{:error, error} ->
22-
{:reply, {:error, error}, timestamp}
42+
{:reply, {:error, error}, data}
2343
end
2444
end
2545

26-
def handle_call({:recv_timestamp, new_time}, _from, old_time) do
27-
case Timestamp.recv(old_time, new_time, physical_time()) do
46+
def handle_call(
47+
{:recv_timestamp, new_time},
48+
_from,
49+
%{timestamp: old_time, max_drift: max_drift} = data
50+
) do
51+
case Timestamp.recv(old_time, new_time, physical_time(), max_drift) do
2852
{:ok, timestamp} ->
29-
{:reply, {:ok, timestamp}, timestamp}
53+
{:reply, {:ok, timestamp}, %{data | timestamp: timestamp}}
3054

3155
{:error, error} ->
32-
{:reply, {:error, error}, old_time}
56+
{:reply, {:error, error}, data}
3357
end
3458
end
3559

36-
def handle_info(:periodic_send, timestamp) do
37-
Process.send_after(self(), :periodic_send, interval())
60+
def handle_info(:periodic_send, data) do
61+
Process.send_after(self(), :periodic_send, interval(data))
3862

39-
case Timestamp.send(timestamp, physical_time()) do
40-
{:ok, timestamp} ->
41-
{:noreply, timestamp}
63+
case Timestamp.send(data.timestamp, physical_time(), data.max_drift) do
64+
{:ok, ts} ->
65+
{:noreply, %{data | timestamp: ts}}
4266

43-
{:error, _err} ->
44-
{:noreply, timestamp}
67+
{:error, _} ->
68+
{:noreply, data}
4569
end
4670
end
4771

4872
defp physical_time, do: System.os_time(:millisecond)
4973

50-
defp default_counter, do: 0
51-
52-
defp node_id([{:node_id, fun} | _]) when is_function(fun), do: fun.()
74+
defp interval(%{max_drift: max_drift}), do: round(max_drift / 2)
5375

54-
defp interval, do: round(HLClock.max_drift() / 2)
76+
defp node_id(opts) do
77+
case opts[:node_id] do
78+
f when is_function(f) -> f.()
79+
other -> other
80+
end
81+
end
5582

5683
defp build_opts(opts) do
5784
base_opts()
@@ -60,7 +87,8 @@ defmodule HLClock.Server do
6087

6188
defp base_opts,
6289
do: [
63-
node_id: Application.get_env(:hlclock, :node_id, fn -> NodeId.hash() end),
64-
name: __MODULE__
90+
name: __MODULE__,
91+
node_id: NodeId.hash(),
92+
max_drift: 300_000
6593
]
6694
end

lib/hlclock/timestamp.ex

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,31 +25,28 @@ defmodule HLClock.Timestamp do
2525
node's physical time), logical counter (initally zero), and the node id
2626
"""
2727
def new(time, counter, node_id \\ 0) do
28-
cond do
29-
byte_size(:binary.encode_unsigned(counter)) > 2 ->
30-
{:error, :counter_too_large}
31-
32-
byte_size(:binary.encode_unsigned(node_id)) > 8 ->
33-
{:error, :node_id_too_large}
28+
assert_byte_size(node_id, 8)
29+
assert_byte_size(counter, 2)
30+
assert_byte_size(time, 6)
3431

35-
byte_size(:binary.encode_unsigned(time)) > 6 ->
36-
{:error, :time_too_large}
32+
%T{time: time, counter: counter, node_id: node_id}
33+
end
3734

38-
true ->
39-
{:ok, %T{time: time, counter: counter, node_id: node_id}}
40-
end
35+
defp assert_byte_size(value, size) do
36+
byte_size(:binary.encode_unsigned(value)) <= size ||
37+
raise ArgumentError, "#{value} exceeds max byte size of #{size}"
4138
end
4239

4340
@doc """
4441
Generate a single HLC Timestamp for sending to other nodes or
4542
local causality tracking
4643
"""
47-
def send(%{time: old_time, counter: counter, node_id: node_id}, pt) do
44+
def send(%{time: old_time, counter: counter, node_id: node_id}, pt, max_drift) do
4845
new_time = max(old_time, pt)
4946
new_counter = advance_counter(old_time, counter, new_time)
5047

51-
with :ok <- handle_drift(old_time, new_time) do
52-
new(new_time, new_counter, node_id)
48+
with :ok <- handle_drift(old_time, new_time, max_drift) do
49+
{:ok, new(new_time, new_counter, node_id)}
5350
end
5451
end
5552

@@ -58,15 +55,20 @@ defmodule HLClock.Timestamp do
5855
perform the merge of both logical time and logical counters. Returns the new
5956
current timestamp for the local node
6057
"""
61-
def recv(local, remote, physical_time) do
58+
def recv(local, remote, physical_time, max_drift) do
6259
new_time = Enum.max([physical_time, local.time, remote.time])
6360

6461
with {:ok, node_id} <- compare_node_ids(local.node_id, remote.node_id),
6562
:ok <-
66-
handle_drift(remote.time, physical_time, :remote_drift_violation),
67-
:ok <- handle_drift(new_time, physical_time),
63+
handle_drift(
64+
remote.time,
65+
physical_time,
66+
max_drift,
67+
:remote_drift_violation
68+
),
69+
:ok <- handle_drift(new_time, physical_time, max_drift),
6870
new_counter <- merge_logical(new_time, local, remote) do
69-
new(new_time, new_counter, node_id)
71+
{:ok, new(new_time, new_counter, node_id)}
7072
end
7173
end
7274

@@ -99,11 +101,8 @@ defmodule HLClock.Timestamp do
99101
100102
## Example
101103
102-
iex> {:ok, _t0} = HLClock.Timestamp.new(1410652800000, 0, 0)
103-
{:ok, %HLClock.Timestamp{counter: 0, node_id: 0, time: 1410652800000}}
104-
105-
...> encoded = HLClock.Timestamp.encode(t0)
106-
<<1, 72, 113, 117, 132, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>
104+
iex> _t0 = HLClock.Timestamp.new(1410652800000, 0, 0)
105+
%HLClock.Timestamp{counter: 0, node_id: 0, time: 1410652800000}
107106
108107
...> << time_and_counter :: size(64), _ :: size(64) >> = encoded
109108
<<1, 72, 113, 117, 132, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>
@@ -176,18 +175,18 @@ defmodule HLClock.Timestamp do
176175
end
177176
end
178177

179-
defp handle_drift(l, pt, err \\ :clock_drift_violation) do
178+
defp handle_drift(l, pt, max_drift, err \\ :clock_drift_violation) do
180179
cond do
181-
drift?(l, pt) ->
180+
drift?(l, pt, max_drift) ->
182181
{:error, err}
183182

184183
true ->
185184
:ok
186185
end
187186
end
188187

189-
defp drift?(l, pt) do
190-
abs(l - pt) > HLClock.max_drift()
188+
defp drift?(l, pt, max_drift) do
189+
abs(l - pt) > max_drift
191190
end
192191

193192
defp advance_counter(old_time, counter, new_time) do

mix.exs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ defmodule HLClock.Mixfile do
2121

2222
def application do
2323
[
24-
mod: {HLClock.Application, []},
2524
extra_applications: [:logger]
2625
]
2726
end

mix.lock

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1-
%{
2-
"earmark": {:hex, :earmark, "1.3.1", "73812f447f7a42358d3ba79283cfa3075a7580a3a2ed457616d6517ac3738cb9", [:mix], [], "hexpm"},
3-
"ex_doc": {:hex, :ex_doc, "0.19.3", "3c7b0f02851f5fc13b040e8e925051452e41248f685e40250d7e40b07b9f8c10", [:mix], [{:earmark, "~> 1.2", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.10", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"},
4-
"makeup": {:hex, :makeup, "0.8.0", "9cf32aea71c7fe0a4b2e9246c2c4978f9070257e5c9ce6d4a28ec450a839b55f", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"},
5-
"makeup_elixir": {:hex, :makeup_elixir, "0.13.0", "be7a477997dcac2e48a9d695ec730b2d22418292675c75aa2d34ba0909dcdeda", [:mix], [{:makeup, "~> 0.8", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"},
6-
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"},
7-
"stream_data": {:hex, :stream_data, "0.2.0", "887b7701cd4ea235e0d704ce60f86096ff5754dae55c3ead4e1d43f152a9239e", [:mix], [], "hexpm"},
8-
}
1+
%{"earmark": {:hex, :earmark, "1.3.5", "0db71c8290b5bc81cb0101a2a507a76dca659513984d683119ee722828b424f6", [:mix], [], "hexpm"},
2+
"ex_doc": {:hex, :ex_doc, "0.21.1", "5ac36660846967cd869255f4426467a11672fec3d8db602c429425ce5b613b90", [:mix], [{:earmark, "~> 1.3", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"},
3+
"makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"},
4+
"makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"},
5+
"nimble_parsec": {:hex, :nimble_parsec, "0.5.1", "c90796ecee0289dbb5ad16d3ad06f957b0cd1199769641c961cfe0b97db190e0", [:mix], [], "hexpm"},
6+
"stream_data": {:hex, :stream_data, "0.4.3", "62aafd870caff0849a5057a7ec270fad0eb86889f4d433b937d996de99e3db25", [:mix], [], "hexpm"}}

test/hlclock/node_id_test.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
defmodule HLClock.NodeIdTest do
22
use ExUnit.Case, async: true
3-
import PropertyTest
3+
use ExUnitProperties
44
import StreamData
55

66
alias HLClock.{NodeId, Generators}
77

88
property "Hashed node ids are within the correct bounds" do
9-
check all node_name <- unquoted_atom() do
9+
check all(node_name <- atom(:alphanumeric)) do
1010
hash = NodeId.hash(node_name)
1111
assert 0 <= hash && hash <= Generators.max_node()
1212
end

test/hlclock/server_test.exs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,6 @@ defmodule HLClock.ServerTest do
1212
assert %{node_id: 12345} = clock
1313
end
1414

15-
test "can be added in a config" do
16-
Application.put_env(:hlclock, :node_id, fn -> 1337 end)
17-
{:ok, hlc} = HLClock.Server.start_link(name: :config_node_id)
18-
{:ok, clock} = GenServer.call(hlc, :send_timestamp)
19-
Application.delete_env(:hlclock, :node_id)
20-
assert %{node_id: 1337} = clock
21-
end
22-
2315
test "if no node id is given then we use a hash of the node name" do
2416
{:ok, hlc} = HLClock.Server.start_link(name: :no_node_id)
2517
{:ok, clock} = GenServer.call(hlc, :send_timestamp)

0 commit comments

Comments
 (0)