Skip to content

Commit 3f52363

Browse files
committed
feat: first commit
0 parents  commit 3f52363

7 files changed

Lines changed: 279 additions & 0 deletions

File tree

.github/workflows/test.yml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
name: test
2+
3+
on:
4+
push:
5+
branches:
6+
- master
7+
- main
8+
pull_request:
9+
10+
jobs:
11+
test:
12+
runs-on: ubuntu-latest
13+
steps:
14+
- uses: actions/checkout@v4
15+
- uses: erlef/setup-beam@v1
16+
with:
17+
otp-version: "28"
18+
gleam-version: "1.14.0"
19+
rebar3-version: "3"
20+
# elixir-version: "1"
21+
- run: gleam deps download
22+
- run: gleam test
23+
- run: gleam format --check src test

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
*.beam
2+
*.ez
3+
/build
4+
erl_crash.dump

README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Usage
2+
```gleam
3+
import gleam/io
4+
import gleam/erlang/process
5+
import gleam/otp/actor
6+
import singleflight
7+
8+
pub fn main() -> Nil {
9+
let name = process.new_name("singleflight")
10+
let config = singleflight.config(1_000, 1_000)
11+
12+
let assert Ok(actor.Started(data: server, ..)) =
13+
singleflight.start(config, name)
14+
15+
let value = singleflight.fetch(server, "key", fn(key) { key <> "-value" })
16+
17+
io.debug(value)
18+
}

gleam.toml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
name = "singleflight"
2+
version = "1.0.0"
3+
4+
# Fill out these fields if you intend to generate HTML documentation or publish
5+
# your project to the Hex package manager.
6+
#
7+
# description = ""
8+
# licences = ["Apache-2.0"]
9+
# repository = { type = "github", user = "", repo = "" }
10+
# links = [{ title = "Website", href = "" }]
11+
#
12+
# For a full reference of all the available options, you can have a look at
13+
# https://gleam.run/writing-gleam/gleam-toml/.
14+
15+
[dependencies]
16+
gleam_stdlib = ">= 0.44.0 and < 2.0.0"
17+
gleam_otp = ">= 1.2.0 and < 2.0.0"
18+
gleam_erlang = ">= 1.3.0 and < 2.0.0"
19+
20+
[dev-dependencies]
21+
gleeunit = ">= 1.0.0 and < 2.0.0"

manifest.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# This file was generated by Gleam
2+
# You typically do not need to edit this file
3+
4+
packages = [
5+
{ name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" },
6+
{ name = "gleam_otp", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "BA6A294E295E428EC1562DC1C11EA7530DCB981E8359134BEABC8493B7B2258E" },
7+
{ name = "gleam_stdlib", version = "0.70.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "86949BF5D1F0E4AC0AB5B06F235D8A5CC11A2DFC33BF22F752156ED61CA7D0FF" },
8+
{ name = "gleeunit", version = "1.9.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "DA9553CE58B67924B3C631F96FE3370C49EB6D6DC6B384EC4862CC4AAA718F3C" },
9+
]
10+
11+
[requirements]
12+
gleam_erlang = { version = ">= 1.3.0 and < 2.0.0" }
13+
gleam_otp = { version = ">= 1.2.0 and < 2.0.0" }
14+
gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" }
15+
gleeunit = { version = ">= 1.0.0 and < 2.0.0" }

src/singleflight.gleam

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import gleam/dict
2+
import gleam/erlang/process
3+
import gleam/list
4+
import gleam/otp/actor
5+
6+
pub type Config {
7+
Config(initialisation_timeout_ms: Int, fetch_timeout_ms: Int)
8+
}
9+
10+
pub type Message(k, v) {
11+
Request(key: k, work: fn(k) -> v, caller: process.Subject(v))
12+
Done(key: k, result: v)
13+
}
14+
15+
pub opaque type Singleflight(k, v) {
16+
Singleflight(subject: process.Subject(Message(k, v)), fetch_timeout_ms: Int)
17+
}
18+
19+
type State(k, v) {
20+
State(
21+
in_flight: dict.Dict(k, List(process.Subject(v))),
22+
self: process.Subject(Message(k, v)),
23+
)
24+
}
25+
26+
pub fn config(initialisation_timeout_ms: Int, fetch_timeout_ms: Int) -> Config {
27+
Config(
28+
initialisation_timeout_ms: initialisation_timeout_ms,
29+
fetch_timeout_ms: fetch_timeout_ms,
30+
)
31+
}
32+
33+
pub fn start(
34+
config: Config,
35+
name: process.Name(Message(k, v)),
36+
) -> actor.StartResult(Singleflight(k, v)) {
37+
let Config(initialisation_timeout_ms:, fetch_timeout_ms:) = config
38+
39+
actor.new_with_initialiser(initialisation_timeout_ms, fn(self) {
40+
actor.initialised(State(in_flight: dict.new(), self: self))
41+
|> actor.returning(Singleflight(
42+
subject: self,
43+
fetch_timeout_ms: fetch_timeout_ms,
44+
))
45+
|> Ok
46+
})
47+
|> actor.on_message(handle_message)
48+
|> actor.named(name)
49+
|> actor.start
50+
}
51+
52+
pub fn fetch(singleflight: Singleflight(k, v), key: k, work: fn(k) -> v) -> v {
53+
let Singleflight(subject:, fetch_timeout_ms:) = singleflight
54+
55+
actor.call(subject, fetch_timeout_ms, fn(caller) {
56+
Request(key: key, work: work, caller: caller)
57+
})
58+
}
59+
60+
fn handle_message(
61+
state: State(k, v),
62+
message: Message(k, v),
63+
) -> actor.Next(State(k, v), Message(k, v)) {
64+
case message {
65+
Request(key, work, caller) ->
66+
case dict.get(state.in_flight, key) {
67+
Ok(waiters) ->
68+
actor.continue(
69+
State(
70+
..state,
71+
in_flight: dict.insert(state.in_flight, key, [caller, ..waiters]),
72+
),
73+
)
74+
75+
Error(Nil) -> {
76+
let self = state.self
77+
process.spawn(fn() {
78+
let result = work(key)
79+
actor.send(self, Done(key: key, result: result))
80+
})
81+
actor.continue(
82+
State(
83+
..state,
84+
in_flight: dict.insert(state.in_flight, key, [caller]),
85+
),
86+
)
87+
}
88+
}
89+
90+
Done(key, result) -> {
91+
case dict.get(state.in_flight, key) {
92+
Ok(waiters) ->
93+
list.each(waiters, fn(waiter) { process.send(waiter, result) })
94+
Error(Nil) -> Nil
95+
}
96+
actor.continue(
97+
State(..state, in_flight: dict.delete(state.in_flight, key)),
98+
)
99+
}
100+
}
101+
}

test/singleflight_test.gleam

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import gleam/erlang/process
2+
import gleam/otp/actor
3+
import gleeunit
4+
import singleflight
5+
6+
pub fn main() -> Nil {
7+
gleeunit.main()
8+
}
9+
10+
pub fn same_key_deduplicates_to_one_request_test() {
11+
let server = start_test_server("singleflight_test_same_key")
12+
let started = process.new_subject()
13+
14+
let first_result = process.new_subject()
15+
let second_result = process.new_subject()
16+
17+
let work = fn(key) {
18+
let release = process.new_subject()
19+
process.send(started, release)
20+
21+
let assert Ok("release") = process.receive(release, within: 1000)
22+
23+
key
24+
}
25+
26+
process.spawn(fn() {
27+
let result = singleflight.fetch(server, "foo", work)
28+
process.send(first_result, result)
29+
})
30+
31+
let assert Ok(first_release) = process.receive(started, within: 1000)
32+
33+
process.spawn(fn() {
34+
let result = singleflight.fetch(server, "foo", work)
35+
process.send(second_result, result)
36+
})
37+
38+
let assert Error(Nil) = process.receive(started, within: 0)
39+
40+
process.send(first_release, "release")
41+
42+
let assert Ok("foo") = process.receive(first_result, within: 1000)
43+
let assert Ok("foo") = process.receive(second_result, within: 1000)
44+
45+
let assert Error(Nil) = process.receive(started, within: 0)
46+
}
47+
48+
pub fn different_keys_run_two_requests_test() {
49+
let server = start_test_server("singleflight_test_different_keys")
50+
let started = process.new_subject()
51+
52+
let first_result = process.new_subject()
53+
let second_result = process.new_subject()
54+
55+
let work = fn(key) {
56+
let release = process.new_subject()
57+
process.send(started, #(key, release))
58+
59+
let assert Ok("release") = process.receive(release, within: 1000)
60+
61+
key
62+
}
63+
64+
process.spawn(fn() {
65+
let result = singleflight.fetch(server, "foo", work)
66+
process.send(first_result, result)
67+
})
68+
69+
process.spawn(fn() {
70+
let result = singleflight.fetch(server, "bar", work)
71+
process.send(second_result, result)
72+
})
73+
74+
let assert Ok(#("foo", foo_release)) = process.receive(started, within: 1000)
75+
let assert Ok(#("bar", bar_release)) = process.receive(started, within: 1000)
76+
let assert Error(Nil) = process.receive(started, within: 0)
77+
78+
process.send(foo_release, "release")
79+
process.send(bar_release, "release")
80+
81+
let assert Ok("foo") = process.receive(first_result, within: 1000)
82+
let assert Ok("bar") = process.receive(second_result, within: 1000)
83+
84+
let assert Error(Nil) = process.receive(started, within: 0)
85+
}
86+
87+
fn start_test_server(
88+
prefix: String,
89+
) -> singleflight.Singleflight(String, String) {
90+
let name = process.new_name(prefix)
91+
let config = singleflight.config(1000, 1000)
92+
93+
let assert Ok(actor.Started(data: server, ..)) =
94+
singleflight.start(config, name)
95+
96+
server
97+
}

0 commit comments

Comments
 (0)