Skip to content

Commit 36ee81f

Browse files
authored
Merge pull request #33 from CodaFi/queuediepie
Implement Transactional Queues
2 parents 899eac0 + f08ccd9 commit 36ee81f

File tree

2 files changed

+179
-3
lines changed

2 files changed

+179
-3
lines changed

Sources/TBQueue.swift

+106-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
// Copyright © 2015 TypeLift. All rights reserved.
77
//
88

9+
/// `TBQueue` is a bounded version of `TQueue`. The queue has a maximum capacity
10+
/// set when it is created. If the queue already contains the maximum number of
11+
/// elements, then `write()` blocks until an element is removed from the queue.
912
public struct TBQueue<A> {
1013
let readNum : TVar<Int>
1114
let readHead : TVar<[A]>
@@ -19,22 +22,27 @@ public struct TBQueue<A> {
1922
self.writeHead = writeHead
2023
}
2124

25+
/// Creates and initializes a new `TBQueue`.
2226
public init(n : Int) {
2327
let read = TVar([A]())
2428
let write = TVar([A]())
2529
let rsize = TVar(0)
2630
let wsize = TVar(n)
2731
self.init(rsize, read, wsize, write)
2832
}
29-
33+
34+
/// Uses an atomic transaction to create and initialize a new `TBQueue`.
3035
public static func create(n : Int) -> STM<TBQueue<A>> {
3136
let read = TVar([] as [A])
3237
let write = TVar([] as [A])
3338
let rsize = TVar(0)
3439
let wsize = TVar(n)
3540
return STM<TBQueue<A>>.pure(TBQueue(rsize, read, wsize, write))
3641
}
37-
42+
43+
/// Uses an atomic transaction to write the given value to the receiver.
44+
///
45+
/// Blocks if the queue is full.
3846
public func write(x : A) -> STM<()> {
3947
return self.writeNum.read().flatMap { w in
4048
let act : STM<()>
@@ -55,4 +63,100 @@ public struct TBQueue<A> {
5563
})
5664
}
5765
}
66+
67+
/// Uses an atomic transaction to read the next value from the receiver.
68+
public func read() -> STM<A> {
69+
return self.readHead.read().flatMap { xs in
70+
return self.readNum.read().flatMap { r in
71+
return self.readNum.write(r + 1)
72+
.then({
73+
if let x = xs.first {
74+
return self.readHead.write(Array(xs.dropFirst())).then(STM<A>.pure(x))
75+
}
76+
return self.writeHead.read().flatMap { ys in
77+
if ys.isEmpty {
78+
return STM<A>.retry()
79+
}
80+
let zs = ys.reverse()
81+
return self.writeHead.write([])
82+
.then(self.readHead.write(Array(zs.dropFirst())))
83+
.then(STM<A>.pure(ys.first!))
84+
}
85+
}())
86+
}
87+
}
88+
}
89+
90+
/// Uses an atomic transaction to read the next value from the receiver
91+
/// without blocking or retrying on failure.
92+
public func tryRead() -> STM<Optional<A>> {
93+
return self.read().fmap(Optional.Some).orElse(STM<A?>.pure(.None))
94+
}
95+
96+
/// Uses an atomic transaction to get the next value from the receiver
97+
/// without removing it, retrying if the queue is empty.
98+
public func peek() -> STM<A> {
99+
return self.read().flatMap { x in
100+
return self.unGet(x).then(STM<A>.pure(x))
101+
}
102+
}
103+
104+
/// Uses an atomic transaction to get the next value from the receiver
105+
/// without removing it without retrying if the queue is empty.
106+
public func tryPeek() -> STM<Optional<A>> {
107+
return self.tryRead().flatMap { m in
108+
switch m {
109+
case let .Some(x):
110+
return self.unGet(x).then(STM<A?>.pure(m))
111+
case .None:
112+
return STM<A?>.pure(.None)
113+
}
114+
}
115+
}
116+
117+
/// Uses an atomic transaction to put a data item back onto a channel where
118+
/// it will be the next item read.
119+
///
120+
/// Blocks if the queue is full.
121+
public func unGet(x : A) -> STM<()> {
122+
return self.readNum.read().flatMap { r in
123+
return { () -> STM<()> in
124+
if r > 0 {
125+
return self.readNum.write(r.predecessor())
126+
}
127+
return self.writeNum.read().flatMap { w in
128+
if w > 0 {
129+
return self.writeNum.write(w.predecessor())
130+
}
131+
return STM<()>.retry()
132+
}
133+
}().then(self.readHead.read().flatMap { xs in
134+
return self.readHead.write([x] + xs)
135+
})
136+
}
137+
}
138+
139+
/// Uses an STM transaction to return whether the channel is empty.
140+
public var isEmpty : STM<Bool> {
141+
return self.readHead.read().flatMap { xs in
142+
if xs.isEmpty {
143+
return self.writeHead.read().flatMap { ys in
144+
return STM<Bool>.pure(ys.isEmpty)
145+
}
146+
}
147+
return STM<Bool>.pure(false)
148+
}
149+
}
150+
151+
/// Uses an STM transaction to return whether the channel is full.
152+
public var isFull : STM<Bool> {
153+
return self.writeNum.read().flatMap { w in
154+
if w > 0 {
155+
return STM<Bool>.pure(false)
156+
}
157+
return self.readNum.read().flatMap { r in
158+
return STM<Bool>.pure(r <= 0)
159+
}
160+
}
161+
}
58162
}

Sources/TQueue.swift

+73-1
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,82 @@ public struct TQueue<A> {
2020
let write = TVar([] as [A])
2121
return STM<TQueue<A>>.pure(TQueue(read, write))
2222
}
23-
23+
24+
/// Uses an atomic transaction to write the given value to the receiver.
25+
///
26+
/// Blocks if the queue is full.
2427
public func write(val : A) -> STM<()> {
2528
return self.writeEnd.read().flatMap { list in
2629
return self.writeEnd.write([val] + list)
2730
}
2831
}
32+
33+
/// Uses an atomic transaction to read the next value from the receiver.
34+
public func read() -> STM<A> {
35+
return self.readEnd.read().flatMap { xs in
36+
if let x = xs.first {
37+
return self.readEnd.write(Array(xs.dropFirst()))
38+
.then(STM<A>.pure(x))
39+
}
40+
return self.writeEnd.read().flatMap { ys in
41+
if ys.isEmpty {
42+
return STM<A>.retry()
43+
}
44+
let zs = ys.reverse()
45+
if let z = zs.first {
46+
return self.writeEnd.write([]).then(self.readEnd.write(Array(zs.dropFirst()))).then(STM<A>.pure(z))
47+
}
48+
fatalError()
49+
}
50+
}
51+
}
52+
53+
/// Uses an atomic transaction to read the next value from the receiver
54+
/// without blocking or retrying on failure.
55+
public func tryRead() -> STM<Optional<A>> {
56+
return self.read().fmap(Optional.Some).orElse(STM<A?>.pure(.None))
57+
}
58+
59+
/// Uses an atomic transaction to get the next value from the receiver
60+
/// without removing it, retrying if the queue is empty.
61+
public func peek() -> STM<A> {
62+
return self.read().flatMap { x in
63+
return self.unGet(x).then(STM<A>.pure(x))
64+
}
65+
}
66+
67+
/// Uses an atomic transaction to get the next value from the receiver
68+
/// without removing it without retrying if the queue is empty.
69+
public func tryPeek() -> STM<Optional<A>> {
70+
return self.tryRead().flatMap { m in
71+
switch m {
72+
case let .Some(x):
73+
return self.unGet(x).then(STM<A?>.pure(m))
74+
case .None:
75+
return STM<A?>.pure(.None)
76+
}
77+
}
78+
}
79+
80+
/// Uses an atomic transaction to put a data item back onto a channel where
81+
/// it will be the next item read.
82+
///
83+
/// Blocks if the queue is full.
84+
public func unGet(x : A) -> STM<()> {
85+
return self.readEnd.read().flatMap { xs in
86+
return self.readEnd.write([x] + xs)
87+
}
88+
}
89+
90+
/// Uses an STM transaction to return whether the channel is empty.
91+
public var isEmpty : STM<Bool> {
92+
return self.readEnd.read().flatMap { xs in
93+
if xs.isEmpty {
94+
return self.writeEnd.read().flatMap { ys in
95+
return STM<Bool>.pure(ys.isEmpty)
96+
}
97+
}
98+
return STM<Bool>.pure(false)
99+
}
100+
}
29101
}

0 commit comments

Comments
 (0)