@@ -9,7 +9,7 @@ function merge_tensor_stats_union(op, all_stats::Vararg{TensorStats})
99 throw (error (" merge_tensor_stats_union not implemented for: " , typeof (all_stats[1 ])))
1010end
1111
12- function reduce_tensor_stats (op, reduce_indices:: Set{IndexExpr} , stats:: TensorStats )
12+ function reduce_tensor_stats (op, init, reduce_indices:: Set{IndexExpr} , stats:: TensorStats )
1313 throw (error (" reduce_tensor_stats not implemented for: " , typeof (stats)))
1414end
1515
2222function merge_tensor_def (op, all_defs:: Vararg{TensorDef} )
2323 new_default_value = op ([def. default_value for def in all_defs]. .. )
2424 new_index_set = union ([def. index_set for def in all_defs]. .. )
25- new_dim_sizes = Dict ()
25+ new_dim_sizes = Dict {IndexExpr, UInt128} ()
2626 for index in new_index_set
2727 for def in all_defs
2828 if index in def. index_set
2929 new_dim_sizes[index] = def. dim_sizes[index]
3030 end
3131 end
3232 end
33- @assert new_index_set ⊆ keys (new_dim_sizes)
33+ # @assert new_index_set ⊆ keys(new_dim_sizes)
3434 return TensorDef (new_index_set, new_dim_sizes, new_default_value, nothing , nothing , nothing )
3535end
3636
37- function reduce_tensor_def (op, reduce_indices:: Set{IndexExpr} , def:: TensorDef )
37+ function reduce_tensor_def (op, init, reduce_indices:: Set{IndexExpr} , def:: TensorDef )
3838 op = op isa PlanNode ? op. val : op
39- new_default_value = nothing
40- if isidentity (op, def. default_value) || isidempotent (op)
41- new_default_value = op (def. default_value, def. default_value)
42- elseif op == +
43- new_default_value = def. default_value * prod ([def. dim_sizes[x] for x in reduce_indices])
44- elseif op == *
45- new_default_value = def. default_value ^ prod ([def. dim_sizes[x] for x in reduce_indices])
46- else
47- # This is going to be VERY SLOW. Should raise a warning about reductions over non-identity default values.
48- # Depending on the semantics of reductions, we might be able to do this faster.
49- println (" Warning: A reduction can take place over a tensor whose default value is not the reduction operator's identity. \\
50- This can result in a large slowdown as the new default is calculated." )
51- new_default_value = op ([def. default_value for _ in prod ([def. dim_sizes[x] for x in reduce_indices])]. .. )
39+ init = init isa PlanNode ? init. val : init
40+ if isnothing (init)
41+ if isnothing (op) && isnothing (init)
42+ init = def. default_value
43+ elseif isidentity (op, def. default_value) || isidempotent (op)
44+ init = op (def. default_value, def. default_value)
45+ elseif op == +
46+ init = def. default_value * prod ([def. dim_sizes[x] for x in reduce_indices])
47+ elseif op == *
48+ init = def. default_value ^ prod ([def. dim_sizes[x] for x in reduce_indices])
49+ else
50+ # This is going to be VERY SLOW. Should raise a warning about reductions over non-identity default values.
51+ # Depending on the semantics of reductions, we might be able to do this faster.
52+ println (" Warning: A reduction can take place over a tensor whose default value is not the reduction operator's identity. \\
53+ This can result in a large slowdown as the new default is calculated." )
54+ init = op ([def. default_value for _ in prod ([def. dim_sizes[x] for x in reduce_indices])]. .. )
55+ end
5256 end
57+ @assert ! isnothing (init)
5358 new_index_set = setdiff (def. index_set, reduce_indices)
54- new_dim_sizes = Dict ()
59+ new_dim_sizes = Dict {IndexExpr, UInt128} ()
5560 for index in new_index_set
5661 new_dim_sizes[index] = def. dim_sizes[index]
5762 end
58- return TensorDef (new_index_set, new_dim_sizes, new_default_value , nothing , nothing , nothing )
63+ return TensorDef (new_index_set, new_dim_sizes, init , nothing , nothing , nothing )
5964end
6065
6166# This function determines whether a binary operation is union-like or join-like and creates
6267# new statistics objects accordingly.
6368function merge_tensor_stats (op, all_stats:: Vararg{ST} ) where ST <: TensorStats
64- new_def = merge_tensor_def (op, [get_def (stats) for stats in all_stats]. .. )
65- join_like_args = []
66- union_like_args = []
69+ new_def:: TensorDef = merge_tensor_def (op, [get_def (stats) for stats in all_stats]. .. )
70+ join_like_args = ST []
71+ union_like_args = ST []
6772 for stats in all_stats
6873 if length (get_index_set (stats)) == 0
6974 continue
@@ -90,8 +95,8 @@ function merge_tensor_stats(op::PlanNode, all_stats::Vararg{ST}) where ST <:Tens
9095 return merge_tensor_stats (op. val, all_stats... )
9196end
9297
93- function reduce_tensor_stats (op, reduce_indices:: Union{Vector{PlanNode}, Set{PlanNode}} , stats:: ST ) where ST <: TensorStats
94- return reduce_tensor_stats (op, Set {IndexExpr} ([idx. name for idx in reduce_indices]), stats)
98+ function reduce_tensor_stats (op, init, reduce_indices:: Union{Vector{PlanNode}, Set{PlanNode}} , stats:: ST ) where ST <: TensorStats
99+ return reduce_tensor_stats (op, init, Set {IndexExpr} ([idx. name for idx in reduce_indices]), stats)
95100end
96101
97102function transpose_tensor_def (index_order:: Vector{IndexExpr} , def:: TensorDef )
@@ -101,25 +106,25 @@ end
101106
102107# ################ NaiveStats Propagation ##################################################
103108 # We do everything in log for numerical stability
104- function merge_tensor_stats_join (op, new_def, all_stats:: Vararg{NaiveStats} )
109+ function merge_tensor_stats_join (op, new_def:: TensorDef , all_stats:: Vararg{NaiveStats} )
105110 new_dim_space_size = sum ([log2 (get_dim_size (new_def, idx)) for idx in new_def. index_set])
106111 prob_non_default = sum ([log2 (stats. cardinality) - sum ([log2 (get_dim_size (stats, idx)) for idx in get_index_set (stats)]) for stats in all_stats])
107112 new_cardinality = 2 ^ (prob_non_default + new_dim_space_size)
108113 return NaiveStats (new_def, new_cardinality)
109114end
110115
111- function merge_tensor_stats_union (op, new_def, all_stats:: Vararg{NaiveStats} )
116+ function merge_tensor_stats_union (op, new_def:: TensorDef , all_stats:: Vararg{NaiveStats} )
112117 new_dim_space_size = sum ([log2 (get_dim_size (new_def, idx)) for idx in new_def. index_set])
113118 prob_default = sum ([log2 (1 - 2 ^ (log2 (stats. cardinality) - sum ([log2 (get_dim_size (stats, idx)) for idx in get_index_set (stats)]))) for stats in all_stats])
114119 new_cardinality = 2 ^ (log2 (1 - 2 ^ prob_default) + new_dim_space_size)
115120 return NaiveStats (new_def, new_cardinality)
116121end
117122
118- function reduce_tensor_stats (op, reduce_indices:: Set{IndexExpr} , stats:: NaiveStats )
123+ function reduce_tensor_stats (op, init, reduce_indices:: Set{IndexExpr} , stats:: NaiveStats )
119124 if length (reduce_indices) == 0
120125 return copy_stats (stats)
121126 end
122- new_def = reduce_tensor_def (op, reduce_indices, get_def (stats))
127+ new_def = reduce_tensor_def (op, init, reduce_indices, get_def (stats))
123128 new_dim_space_size = sum ([log2 (get_dim_size (new_def, idx)) for idx in new_def. index_set])
124129 old_dim_space_size = sum ([log2 (get_dim_size (stats, idx)) for idx in get_index_set (stats)])
125130 prob_default_value = 1 - 2 ^ (log2 (stats. cardinality)- old_dim_space_size)
@@ -135,33 +140,66 @@ function transpose_tensor_stats(index_order::Vector{IndexExpr}, stats::NaiveStat
135140end
136141
137142# ################ DCStats Propagation ##################################################
138- function merge_tensor_stats_join (op, new_def, all_stats:: Vararg{DCStats} )
139- new_dc_dict = Dict ()
140- for dc in ∪ ([stats. dcs for stats in all_stats]. .. )
141- dc_key = get_dc_key (dc)
142- current_dc = get (new_dc_dict, dc_key, Inf )
143- if dc. d < current_dc
144- new_dc_dict[dc_key] = dc. d
143+
144+ function unify_dc_ints (all_stats, new_def)
145+ final_idx_2_int = Dict {IndexExpr, Int} ()
146+ final_int_2_idx = Dict {Int, IndexExpr} ()
147+ max_int = 1
148+ for (i, idx) in enumerate (union ([keys (stat. idx_2_int) for stat in all_stats]. .. ))
149+ final_idx_2_int[idx] = max_int
150+ final_int_2_idx[max_int] = idx
151+ max_int += 1
152+ end
153+ for idx in get_index_set (new_def)
154+ if ! haskey (final_idx_2_int, idx)
155+ final_idx_2_int[idx] = max_int
156+ final_int_2_idx[max_int] = idx
157+ max_int += 1
145158 end
146- @assert new_dc_dict[dc_key] != 0
147159 end
148- new_stats = DCStats (new_def, Set {DC} (DC (key. X, key. Y, d) for (key, d) in new_dc_dict))
160+ final_idx_2_int, final_int_2_idx
161+ end
162+
163+ convert_bitset (int_to_int, b) = SmallBitSet ([int_to_int[x] for x in b])
164+
165+ function merge_tensor_stats_join (op, new_def:: TensorDef , all_stats:: Vararg{DCStats} )
166+ if length (all_stats) == 1
167+ return DCStats (new_def, copy (all_stats[1 ]. idx_2_int), copy (all_stats[1 ]. int_2_idx), copy (all_stats[1 ]. dcs))
168+ end
169+ final_idx_2_int, final_int_2_idx = unify_dc_ints (all_stats, new_def)
170+ new_dc_dict = Dict {DCKey, Float64} ()
171+ for stats in all_stats
172+ for dc in stats. dcs
173+ dc_key = (X= SmallBitSet (Int[final_idx_2_int[stats. int_2_idx[x]] for x in dc. X]),
174+ Y= SmallBitSet (Int[final_idx_2_int[stats. int_2_idx[y]] for y in dc. Y]))
175+ current_dc = get (new_dc_dict, dc_key, Inf )
176+ if dc. d < current_dc
177+ new_dc_dict[dc_key] = dc. d
178+ end
179+ end
180+ end
181+ new_stats = DCStats (new_def, final_idx_2_int, final_int_2_idx, Set {DC} (DC (key. X, key. Y, d) for (key, d) in new_dc_dict))
149182 return new_stats
150183end
151184
152- function merge_tensor_stats_union (op, new_def, all_stats:: Vararg{DCStats} )
153- dc_keys = counter (Any)
185+ function merge_tensor_stats_union (op, new_def:: TensorDef , all_stats:: Vararg{DCStats} )
186+ if length (all_stats) == 1
187+ return DCStats (new_def, copy (all_stats[1 ]. idx_2_int), copy (all_stats[1 ]. int_2_idx), copy (all_stats[1 ]. dcs))
188+ end
189+ final_idx_2_int, final_int_2_idx = unify_dc_ints (all_stats, new_def)
190+ dc_keys = counter (DCKey)
154191 stats_dcs = []
155192 # We start by extending all arguments' dcs to the new dimensions and infer dcs as needed
156193 for stats in all_stats
157- # condense_stats!(stats, timeout=1000)
158- dcs = Dict ()
194+ dcs = Dict {DCKey, Float64} ()
159195 Z = setdiff (get_index_set (new_def), get_index_set (stats))
160196 Z_dimension_space_size = get_dim_space_size (new_def, Z)
161197 for dc in stats. dcs
162- dcs[(X= dc. X, Y= dc. Y)] = dc. d
163- inc! (dc_keys, (X= dc. X, Y= dc. Y))
164- ext_dc_key = (X= dc. X, Y= ∪ (dc. Y, Z))
198+ new_key:: DCKey = (X= SmallBitSet (Int[final_idx_2_int[stats. int_2_idx[x]] for x in dc. X]),
199+ Y= SmallBitSet (Int[final_idx_2_int[stats. int_2_idx[y]] for y in dc. Y]))
200+ dcs[new_key] = dc. d
201+ inc! (dc_keys, new_key)
202+ ext_dc_key = (X= new_key. X, Y= ∪ (new_key. Y, idxs_to_bitset (final_idx_2_int, Z)))
165203 if ! haskey (dcs, ext_dc_key)
166204 inc! (dc_keys, ext_dc_key)
167205 end
@@ -172,32 +210,34 @@ function merge_tensor_stats_union(op, new_def, all_stats::Vararg{DCStats})
172210
173211 # We only keep DCs which can be inferred from all inputs. Otherwise, we might miss
174212 # important information which simply wasn't inferred
175- new_dcs = Dict {Any , UInt128} ()
213+ new_dcs = Dict {DCKey , UInt128} ()
176214 for (key, count) in dc_keys
177215 if count == length (all_stats)
178216 new_dcs[key] = min (typemax (UInt64), sum ([get (dcs, key, UInt128 (0 )) for dcs in stats_dcs]))
179- if key. Y ⊆ get_index_set (new_def)
180- new_dcs[key] = min (new_dcs[key], get_dim_space_size (new_def, key. Y))
217+ if key. Y ⊆ idxs_to_bitset (final_idx_2_int, get_index_set (new_def) )
218+ new_dcs[key] = min (new_dcs[key], get_dim_space_size (new_def, bitset_to_idxs (final_int_2_idx, key. Y) ))
181219 end
182220 end
183221 end
184222
223+ #=
185224 for Y in subsets(collect(get_index_set(new_def)))
186- Y = Set {IndexExpr} (Y)
187- proj_dc_key = (X= Set {IndexExpr} (), Y= Y)
188- new_dcs[proj_dc_key] = min (get (new_dcs, proj_dc_key, typemax (UInt64)/ 2 ), get_dim_space_size (new_def, Y))
225+ proj_dc_key = (X=SmallBitSet(), Y=idxs_to_bitset(final_idx_2_int, Y))
226+ new_dcs[proj_dc_key] = min(get(new_dcs, proj_dc_key, typemax(UInt64)/2), get_dim_space_size(new_def, Set(Y)))
189227 end
190-
191- return DCStats (new_def, Set {DC} (DC (key. X, key. Y, d) for (key, d) in new_dcs))
228+ =#
229+ return DCStats (new_def, final_idx_2_int, final_int_2_idx, Set {DC} (DC (key. X, key. Y, d) for (key, d) in new_dcs))
192230end
193231
194- function reduce_tensor_stats (op, reduce_indices:: Set{IndexExpr} , stats:: DCStats )
232+ function reduce_tensor_stats (op, init, reduce_indices:: Set{IndexExpr} , stats:: DCStats )
195233 if length (reduce_indices) == 0
196234 return copy_stats (stats)
197235 end
198- new_def = reduce_tensor_def (op, reduce_indices, get_def (stats))
236+ new_def = reduce_tensor_def (op, init, reduce_indices, get_def (stats))
199237 new_dcs = copy (stats. dcs)
200- new_stats = DCStats (new_def, new_dcs)
238+ new_idx_2_int = copy (stats. idx_2_int)
239+ new_int_2_idx = copy (stats. int_2_idx)
240+ new_stats = DCStats (new_def, new_idx_2_int, new_int_2_idx, new_dcs)
201241 return new_stats
202242end
203243
0 commit comments