-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathoperation.rs
355 lines (327 loc) · 12.4 KB
/
operation.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
use super::{beekem::PathChange, error::CgkaError};
use crate::{
crypto::{digest::Digest, share_key::ShareKey},
principal::{document::id::DocumentId, individual::id::IndividualId},
util::content_addressed_map::CaMap,
};
use derivative::Derivative;
use nonempty::NonEmpty;
use serde::{Deserialize, Serialize};
use std::{
borrow::Borrow,
collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
hash::{Hash, Hasher},
mem,
ops::Deref,
rc::Rc,
};
use topological_sort::TopologicalSort;
/// An ordered [`NonEmpty`] of concurrent [`CgkaOperation`]s.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct CgkaEpoch(NonEmpty<Rc<CgkaOperation>>);
impl From<NonEmpty<Rc<CgkaOperation>>> for CgkaEpoch {
fn from(item: NonEmpty<Rc<CgkaOperation>>) -> Self {
CgkaEpoch(item)
}
}
impl Deref for CgkaEpoch {
type Target = NonEmpty<Rc<CgkaOperation>>;
fn deref(&self) -> &NonEmpty<Rc<CgkaOperation>> {
&self.0
}
}
#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
pub enum CgkaOperation {
Add {
added_id: IndividualId,
pk: ShareKey,
leaf_index: u32,
predecessors: Vec<Digest<CgkaOperation>>,
add_predecessors: Vec<Digest<CgkaOperation>>,
doc_id: DocumentId,
},
Remove {
id: IndividualId,
leaf_idx: u32,
removed_keys: Vec<ShareKey>,
predecessors: Vec<Digest<CgkaOperation>>,
doc_id: DocumentId,
},
Update {
id: IndividualId,
new_path: Box<PathChange>,
predecessors: Vec<Digest<CgkaOperation>>,
doc_id: DocumentId,
},
}
impl CgkaOperation {
// FIXME: Remove
pub(crate) fn name(&self) -> String {
match self {
CgkaOperation::Add { .. } => String::from("Add"),
CgkaOperation::Remove { .. } => String::from("Remove"),
CgkaOperation::Update { .. } => String::from("Update"),
}
}
/// The zero or more immediate causal predecessors of this operation.
pub(crate) fn predecessors(&self) -> HashSet<Digest<CgkaOperation>> {
match self {
CgkaOperation::Add { predecessors, .. } => {
HashSet::from_iter(predecessors.iter().cloned())
}
CgkaOperation::Remove { predecessors, .. } => {
HashSet::from_iter(predecessors.iter().cloned())
}
CgkaOperation::Update { predecessors, .. } => {
HashSet::from_iter(predecessors.iter().cloned())
}
}
}
/// Document id
pub(crate) fn doc_id(&self) -> &DocumentId {
match self {
CgkaOperation::Add { doc_id, .. } => doc_id,
CgkaOperation::Remove { doc_id, .. } => doc_id,
CgkaOperation::Update { doc_id, .. } => doc_id,
}
}
}
/// Causal graph of [`CgkaOperation`]s.
#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, Derivative)]
#[derivative(Hash)]
pub(crate) struct CgkaOperationGraph {
pub(crate) cgka_ops: CaMap<CgkaOperation>,
#[derivative(Hash(hash_with = "hash_cgka_ops_preds"))]
pub(crate) cgka_ops_predecessors:
HashMap<Digest<CgkaOperation>, HashSet<Digest<CgkaOperation>>>,
#[derivative(Hash(hash_with = "crate::util::hasher::hash_set"))]
pub(crate) cgka_op_heads: HashSet<Digest<CgkaOperation>>,
#[derivative(Hash(hash_with = "crate::util::hasher::hash_set"))]
pub(crate) add_heads: HashSet<Digest<CgkaOperation>>,
}
fn hash_cgka_ops_preds<H: Hasher>(
hmap: &HashMap<Digest<CgkaOperation>, HashSet<Digest<CgkaOperation>>>,
state: &mut H,
) {
hmap.iter()
.map(|(k, v)| (k, v.iter().collect::<BTreeSet<_>>()))
.collect::<BTreeMap<_, _>>()
.hash(state)
}
impl CgkaOperationGraph {
pub(crate) fn new() -> Self {
Self {
cgka_ops: CaMap::new(),
cgka_ops_predecessors: HashMap::new(),
cgka_op_heads: HashSet::new(),
add_heads: HashSet::new(),
}
}
pub(crate) fn contains_op_hash(&self, op_hash: &Digest<CgkaOperation>) -> bool {
self.cgka_ops.contains_key(op_hash)
}
/// Whether the causal graph has a single head. More than one head indicates
/// unresolved merges of concurrent operations.
pub(crate) fn has_single_head(&self) -> bool {
self.cgka_op_heads.len() == 1
}
/// Add an operation that was created locally to the graph.
pub(crate) fn add_local_op(&mut self, op: &CgkaOperation) {
println!("!@ OpsGraph: adding local {}", Digest::hash(op));
self.add_op_and_update_heads(op, None);
}
/// Add an operation to the graph.
pub(crate) fn add_op(&mut self, op: &CgkaOperation, heads: &HashSet<Digest<CgkaOperation>>) {
println!("!@ OpsGraph: adding remote {}", Digest::hash(op));
self.add_op_and_update_heads(op, Some(heads));
}
/// Add an operation to the graph, add new heads, and remove any heads that
/// were replaced by causal successors.
fn add_op_and_update_heads(
&mut self,
op: &CgkaOperation,
external_heads: Option<&HashSet<Digest<CgkaOperation>>>,
) {
let op_hash = Digest::hash(op);
let mut op_predecessors = HashSet::new();
self.cgka_ops.insert(op.clone().into());
let is_add = self.is_add_op(&op_hash);
if let Some(heads) = external_heads {
for h in heads {
op_predecessors.insert(*h);
self.cgka_op_heads.remove(h);
}
if let CgkaOperation::Add {
add_predecessors, ..
} = op
{
for h in add_predecessors {
self.add_heads.remove(h);
}
}
} else {
for h in &self.cgka_op_heads {
op_predecessors.insert(*h);
}
self.cgka_op_heads.clear();
if is_add {
self.add_heads.clear();
}
};
self.cgka_op_heads.insert(op_hash);
if self.is_add_op(&op_hash) {
self.add_heads.insert(op_hash);
}
self.cgka_ops_predecessors.insert(op_hash, op_predecessors);
}
pub(crate) fn heads_contained_in(&self, heads: &HashSet<Digest<CgkaOperation>>) -> bool {
self.cgka_op_heads.is_subset(heads)
}
fn is_add_op(&self, hash: &Digest<CgkaOperation>) -> bool {
let op = self.cgka_ops.get(hash).expect("op to be in history");
matches!(op.borrow(), &CgkaOperation::Add { .. })
}
pub(crate) fn predecessors_for(
&self,
op_hash: &Digest<CgkaOperation>,
) -> Option<&HashSet<Digest<CgkaOperation>>> {
self.cgka_ops_predecessors.get(op_hash)
}
/// Topsort all operation in the graph.
pub(crate) fn topsort_graph(&self) -> Result<NonEmpty<CgkaEpoch>, CgkaError> {
self.topsort_for_heads(&self.cgka_op_heads)
}
/// Topsort all ancestor operations for the provided heads. These are grouped by
/// "epoch", which in this context means sets of ops that were concurrent. Each
/// epoch set is then ordered and placed into a distinct [`CgkaEpoch`].
pub(crate) fn topsort_for_heads(
&self,
heads: &HashSet<Digest<CgkaOperation>>,
) -> Result<NonEmpty<CgkaEpoch>, CgkaError> {
println!("!@ topsort_for_heads 0");
debug_assert!(heads.iter().all(|head| self.cgka_ops.contains_key(head)));
let mut op_hashes = Vec::new();
let mut dependencies = TopologicalSort::<Digest<CgkaOperation>>::new();
let mut successors: HashMap<Digest<CgkaOperation>, HashSet<Digest<CgkaOperation>>> =
HashMap::new();
let mut frontier = VecDeque::new();
let mut seen = HashSet::new();
for head in heads {
frontier.push_back(*head);
seen.insert(head);
successors.insert(*head, HashSet::new());
}
// Populate dependencies and successors with all ancestors of the initial heads.
while let Some(op_hash) = frontier.pop_front() {
println!("!@ topsort_for_heads loopy op_hash: {}", op_hash);
let preds = self
.predecessors_for(&op_hash)
.ok_or(CgkaError::OperationNotFound)?;
println!("!@ -- preds: {:?}", preds);
for update_pred in preds {
dependencies.add_dependency(*update_pred, op_hash);
successors.entry(*update_pred).or_default().insert(op_hash);
if seen.contains(update_pred) {
continue;
}
seen.insert(update_pred);
frontier.push_back(*update_pred);
}
}
if dependencies.is_empty() {
let single_epoch = heads
.iter()
.map(|hash| {
self.cgka_ops
.get(hash)
.ok_or(CgkaError::OperationNotFound)
.expect("head to be present")
.clone()
})
.collect::<Vec<_>>();
op_hashes.push(
NonEmpty::from_vec(single_epoch)
.expect("to have at least one op hash")
.into(),
);
return Ok(NonEmpty::from_vec(op_hashes).expect("to have at least one op hash"));
}
// Partition heads into ordered epochs representing ordered sets of
// concurrent operations.
let mut epoch_heads = HashSet::new();
let mut next_epoch: Vec<Rc<CgkaOperation>> = Vec::new();
while !dependencies.is_empty() {
let mut next_set = dependencies.pop_all();
next_set.sort();
for hash in &next_set {
epoch_heads.insert(*hash);
if successors.get(hash).expect("hash to be present").is_empty() {
// For terminal hashes, we insert the hash itself as its successor.
// Terminal hashes will all be included in the final epoch.
successors
.get_mut(hash)
.expect("hash to be present")
.insert(*hash);
}
}
for hash in &next_set {
for h in &epoch_heads {
if hash == h {
continue;
}
successors.get_mut(h).expect("head to exist").remove(hash);
}
}
// If all of the successors of a head H have been added as heads, then
// H can be removed.
epoch_heads = epoch_heads
.iter()
.filter(|h| !successors.get_mut(h).expect("head to exist").is_empty())
.copied()
.collect::<HashSet<_>>();
let should_end_epoch = epoch_heads.len() <= 1;
if should_end_epoch {
let mut next = Vec::new();
mem::swap(&mut next_epoch, &mut next);
if !next.is_empty() {
op_hashes.push(
NonEmpty::from_vec(next)
.expect("there to be at least one hash")
.into(),
);
}
}
for hash in next_set {
println!("!@ topsort_for_heads pushy");
next_epoch.push(
self.cgka_ops
.get(&hash)
.ok_or(CgkaError::OperationNotFound)?
.clone(),
);
}
if should_end_epoch {
let mut next = Vec::new();
mem::swap(&mut next_epoch, &mut next);
if !next.is_empty() {
op_hashes.push(
NonEmpty::from_vec(next)
.expect("there to be at least one hash")
.into(),
);
}
}
}
if !next_epoch.is_empty() {
// The final epoch consists of all terminal hashes. If there is only one member,
// it will be added as the last epoch above. If there is more than one member,
// it will be added here.
op_hashes.push(
NonEmpty::from_vec(next_epoch.clone())
.expect("there to be at least one hash")
.into(),
);
}
Ok(NonEmpty::from_vec(op_hashes).expect("to have at least one op hash"))
}
}