@@ -158,6 +158,134 @@ impl<'a> Plan<'a> {
158158 Ok ( paths)
159159 }
160160
161+ /// Find the groupings of any unions for the given paths. This optimizes upqueries in queries
162+ /// that contain complex unions.
163+ ///
164+ /// A classical comment follows ...
165+ ///
166+ /// all right, story time!
167+ ///
168+ /// imagine you have this graph:
169+ ///
170+ /// ```text
171+ /// a b
172+ /// +--+--+
173+ /// |
174+ /// u_1
175+ /// |
176+ /// +--+--+
177+ /// c d
178+ /// +--+--+
179+ /// |
180+ /// u_2
181+ /// |
182+ /// +--+--+
183+ /// e f
184+ /// +--+--+
185+ /// |
186+ /// u_3
187+ /// |
188+ /// v
189+ /// ```
190+ ///
191+ /// where c-f are all stateless. you will end up with 8 paths for replays to v.
192+ /// a and b will both appear as the root of 4 paths, and will be upqueried that many times.
193+ /// while inefficient (TODO), that is not in and of itself a problem. the issue arises at
194+ /// the unions, which need to do union buffering (that is, they need to forward _one_
195+ /// upquery response for each set of upquery responses they get). specifically, u_1 should
196+ /// forward 4 responses, even though it receives 8. u_2 should forward 2 responses, even
197+ /// though it gets 4, etc. we may later optimize that (in theory u_1 should be able to only
198+ /// forward _one_ response to multiple children, and a and b should only be upqueried
199+ /// _once_), but for now we need to deal with the correctness issue that arises if the
200+ /// unions do not buffer correctly.
201+ ///
202+ /// the issue, ultimately, is what the unions "group" upquery responses by. they can't group
203+ /// by tag (like shard mergers do), since there are 8 tags here, so there'd be 8 groups each
204+ /// with one response. here are the replay paths for u_1:
205+ ///
206+ /// 1. a -> c -> e
207+ /// 2. a -> c -> f
208+ /// 3. a -> d -> e
209+ /// 4. a -> d -> f
210+ /// 5. b -> c -> e
211+ /// 6. b -> c -> f
212+ /// 7. b -> d -> e
213+ /// 8. b -> d -> f
214+ ///
215+ /// we want to merge 1 with 5 since they're "going the same way". similarly, we want to
216+ /// merge 2 and 6, 3 and 7, and 4 and 8. the "grouping" here then is really the suffix of
217+ /// the replay's path beyond the union we're looking at. for u_2:
218+ ///
219+ /// 1/5. a/b -> c -> e
220+ /// 2/6. a/b -> c -> f
221+ /// 3/7. a/b -> d -> e
222+ /// 4/8. a/b -> d -> f
223+ ///
224+ /// we want to merge 1/5 and 3/7, again since they are going the same way _from here_.
225+ /// and similarly, we want to merge 2/6 and 4/8.
226+ ///
227+ /// so, how do we communicate this grouping to each of the unions?
228+ /// well, most of the infrastructure is actually already there in the domains.
229+ /// for each tag, each domain keeps some per-node state (`ReplayPathSegment`).
230+ /// we can inject the information there!
231+ ///
232+ /// we're actually going to play an additional trick here, as it allows us to simplify the
233+ /// implementation a fair amount. since we know that tags 1 and 5 are identical beyond u_1
234+ /// (that's what we're grouping by after all!), why don't we just rewrite all 1 tags to 5s?
235+ /// and all 2s to 6s, and so on. that way, at u_2, there will be no replays with tag 1 or 3,
236+ /// only 5 and 7. then we can pull the same trick there -- rewrite all 5s to 7s, so that at
237+ /// u_3 we only need to deal with 7s (and 8s). this simplifies the implementation since
238+ /// unions can now _always_ just group by tags, and it'll just magically work.
239+ ///
240+ /// this approach also gives us the property that we have a deterministic subset of the tags
241+ /// (and of strictly decreasing cardinality!) tags downstream of unions. this may (?)
242+ /// improve cache locality, but could perhaps also allow further optimizations later (?).
243+ fn find_union_groupings (
244+ & self ,
245+ paths : & [ RawReplayPath ] ,
246+ assigned_tags : & [ Tag ] ,
247+ ) -> HashMap < ( NodeIndex , usize ) , Tag > {
248+ let union_suffixes = paths
249+ . iter ( )
250+ . enumerate ( )
251+ . flat_map ( |( pi, path) | {
252+ let graph = & self . graph ;
253+ path. segments ( ) . iter ( ) . enumerate ( ) . filter_map (
254+ move |( at, & IndexRef { node, .. } ) | {
255+ let n = & graph[ node] ;
256+ if n. is_union ( ) && !n. is_shard_merger ( ) {
257+ let suffix = match path. segments ( ) . get ( ( at + 1 ) ..) {
258+ Some ( x) => x,
259+ None => {
260+ // FIXME(eta): would like to return a proper internal!() here
261+ return None ;
262+ }
263+ } ;
264+ Some ( ( ( node, suffix) , pi) )
265+ } else {
266+ None
267+ }
268+ } ,
269+ )
270+ } )
271+ . fold ( BTreeMap :: new ( ) , |mut map, ( key, pi) | {
272+ #[ allow( clippy:: unwrap_or_default) ]
273+ map. entry ( key) . or_insert_with ( Vec :: new) . push ( pi) ;
274+ map
275+ } ) ;
276+
277+ // map each suffix-sharing group of paths at each union to one tag at that union
278+ union_suffixes
279+ . into_iter ( )
280+ . flat_map ( |( ( union, _suffix) , paths) | {
281+ // at this union, all the given paths share a suffix
282+ // make all of the paths use a single identifier from that point on
283+ let tag_all_as = assigned_tags[ paths[ 0 ] ] ;
284+ paths. into_iter ( ) . map ( move |pi| ( ( union, pi) , tag_all_as) )
285+ } )
286+ . collect ( )
287+ }
288+
161289 /// Finds the appropriate replay paths for the given index, and inform all domains on those
162290 /// paths about them. It also notes if any data backfills will need to be run, which is
163291 /// eventually reported back by `finalize`.
@@ -223,126 +351,12 @@ impl<'a> Plan<'a> {
223351 index_on. clone ( )
224352 } ;
225353
226- // all right, story time!
227- //
228- // image you have this graph:
229- //
230- // a b
231- // +--+--+
232- // |
233- // u_1
234- // |
235- // +--+--+
236- // c d
237- // +--+--+
238- // |
239- // u_2
240- // |
241- // +--+--+
242- // e f
243- // +--+--+
244- // |
245- // u_3
246- // |
247- // v
248- //
249- // where c-f are all stateless. you will end up with 8 paths for replays to v.
250- // a and b will both appear as the root of 4 paths, and will be upqueried that many times.
251- // while inefficient (TODO), that is not in and of itself a problem. the issue arises at
252- // the unions, which need to do union buffering (that is, they need to forward _one_
253- // upquery response for each set of upquery responses they get). specifically, u_1 should
254- // forward 4 responses, even though it receives 8. u_2 should forward 2 responses, even
255- // though it gets 4, etc. we may later optimize that (in theory u_1 should be able to only
256- // forward _one_ response to multiple children, and a and b should only be upqueried
257- // _once_), but for now we need to deal with the correctness issue that arises if the
258- // unions do not buffer correctly.
259- //
260- // the issue, ultimately, is what the unions "group" upquery responses by. they can't group
261- // by tag (like shard mergers do), since there are 8 tags here, so there'd be 8 groups each
262- // with one response. here are the replay paths for u_1:
263- //
264- // 1. a -> c -> e
265- // 2. a -> c -> f
266- // 3. a -> d -> e
267- // 4. a -> d -> f
268- // 5. b -> c -> e
269- // 6. b -> c -> f
270- // 7. b -> d -> e
271- // 8. b -> d -> f
272- //
273- // we want to merge 1 with 5 since they're "going the same way". similarly, we want to
274- // merge 2 and 6, 3 and 7, and 4 and 8. the "grouping" here then is really the suffix of
275- // the replay's path beyond the union we're looking at. for u_2:
276- //
277- // 1/5. a/b -> c -> e
278- // 2/6. a/b -> c -> f
279- // 3/7. a/b -> d -> e
280- // 4/8. a/b -> d -> f
281- //
282- // we want to merge 1/5 and 3/7, again since they are going the same way _from here_.
283- // and similarly, we want to merge 2/6 and 4/8.
284- //
285- // so, how do we communicate this grouping to each of the unions?
286- // well, most of the infrastructure is actually already there in the domains.
287- // for each tag, each domain keeps some per-node state (`ReplayPathSegment`).
288- // we can inject the information there!
289- //
290- // we're actually going to play an additional trick here, as it allows us to simplify the
291- // implementation a fair amount. since we know that tags 1 and 5 are identical beyond u_1
292- // (that's what we're grouping by after all!), why don't we just rewrite all 1 tags to 5s?
293- // and all 2s to 6s, and so on. that way, at u_2, there will be no replays with tag 1 or 3,
294- // only 5 and 7. then we can pull the same trick there -- rewrite all 5s to 7s, so that at
295- // u_3 we only need to deal with 7s (and 8s). this simplifies the implementation since
296- // unions can now _always_ just group by tags, and it'll just magically work.
297- //
298- // this approach also gives us the property that we have a deterministic subset of the tags
299- // (and of strictly decreasing cardinality!) tags downstream of unions. this may (?)
300- // improve cache locality, but could perhaps also allow further optimizations later (?).
301-
302- // find all paths through each union with the same suffix
303354 let assigned_tags: Vec < _ > = paths
304355 . iter ( )
305356 . map ( |path| self . m . tag_for_path ( & index_on, path) )
306357 . collect ( ) ;
307- let union_suffixes = paths
308- . iter ( )
309- . enumerate ( )
310- . flat_map ( |( pi, path) | {
311- let graph = & self . graph ;
312- path. segments ( ) . iter ( ) . enumerate ( ) . filter_map (
313- move |( at, & IndexRef { node, .. } ) | {
314- let n = & graph[ node] ;
315- if n. is_union ( ) && !n. is_shard_merger ( ) {
316- let suffix = match path. segments ( ) . get ( ( at + 1 ) ..) {
317- Some ( x) => x,
318- None => {
319- // FIXME(eta): would like to return a proper internal!() here
320- return None ;
321- }
322- } ;
323- Some ( ( ( node, suffix) , pi) )
324- } else {
325- None
326- }
327- } ,
328- )
329- } )
330- . fold ( BTreeMap :: new ( ) , |mut map, ( key, pi) | {
331- #[ allow( clippy:: unwrap_or_default) ]
332- map. entry ( key) . or_insert_with ( Vec :: new) . push ( pi) ;
333- map
334- } ) ;
335-
336- // map each suffix-sharing group of paths at each union to one tag at that union
337- let path_grouping: HashMap < _ , _ > = union_suffixes
338- . into_iter ( )
339- . flat_map ( |( ( union, _suffix) , paths) | {
340- // at this union, all the given paths share a suffix
341- // make all of the paths use a single identifier from that point on
342- let tag_all_as = assigned_tags[ paths[ 0 ] ] ;
343- paths. into_iter ( ) . map ( move |pi| ( ( union, pi) , tag_all_as) )
344- } )
345- . collect ( ) ;
358+ // find all paths through each union with the same suffix
359+ let path_grouping = self . find_union_groupings ( & paths, & assigned_tags) ;
346360
347361 // inform domains about replay paths
348362 for ( pi, path) in paths. into_iter ( ) . enumerate ( ) {
0 commit comments