Skip to content

Commit 4e6f097

Browse files
committed
Refactor Parallel
1 parent ed73b05 commit 4e6f097

File tree

1 file changed

+26
-23
lines changed

1 file changed

+26
-23
lines changed

src/transforms/parallel.jl

+26-23
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ isrevertible(p::Parallel) = any(isrevertible, p.transforms)
1616
function apply(p::Parallel, table)
1717
# apply transforms in parallel
1818
f(transform) = apply(transform, table)
19-
result = foldxt(vcat, Map(f), p.transforms)
19+
vals = foldxt(vcat, Map(f), p.transforms)
2020

2121
# retrieve tables and caches
22-
tables = first.(result)
23-
caches = last.(result)
22+
tables = first.(vals)
23+
caches = last.(vals)
2424

2525
# concatenate columns
2626
allvars, allvals = [], []
@@ -43,49 +43,52 @@ function apply(p::Parallel, table)
4343
𝒯 = (; zip(allvars, allvals)...)
4444
newtable = 𝒯 |> Tables.materializer(table)
4545

46+
# save original column names
47+
onames = Tables.columnnames(table)
48+
4649
# find first revertible transform
4750
ind = findfirst(isrevertible, p.transforms)
4851

49-
# save cache if any transform is revertible
50-
if isnothing(ind)
51-
cache = nothing
52+
# save info to revert transform
53+
rinfo = if isnothing(ind)
54+
nothing
5255
else
53-
onames = Tables.columnnames(table)
5456
tnames = Tables.columnnames.(tables)
5557
ncols = length.(tnames)
56-
rcache = caches[ind]
5758
nrcols = ncols[ind]
5859
start = sum(ncols[1:ind-1]) + 1
5960
finish = start + nrcols - 1
60-
range = (start, finish)
61-
cache = (ind, range, rcache, onames)
61+
range = start:finish
62+
(ind, range)
6263
end
6364

64-
newtable, cache
65+
newtable, (onames, caches, rinfo)
6566
end
6667

6768
function revert(p::Parallel, newtable, cache)
68-
# sanity checks
69-
@assert !isnothing(cache) "transform is not revertible"
69+
# retrieve cache
70+
onames = cache[1]
71+
caches = cache[2]
72+
rinfo = cache[3]
73+
74+
@assert !isnothing(rinfo) "transform is not revertible"
7075

71-
# retrieve subtable range and cache
72-
ind = cache[1]
73-
range = cache[2]
74-
rcache = cache[3]
75-
onames = cache[4]
76-
start, finish = range
76+
# retrieve info to revert transform
77+
ind = rinfo[1]
78+
range = rinfo[2]
79+
rtrans = p.transforms[ind]
80+
rcache = caches[ind]
7781

7882
# columns of transformed table
7983
cols = Tables.columns(newtable)
8084

81-
# retrieve subtable for revert transform
82-
rcols = [Tables.getcolumn(cols, j) for j in start:finish]
85+
# retrieve subtable to revert
86+
rcols = [Tables.getcolumn(cols, j) for j in range]
8387
𝒯 = (; zip(onames, rcols)...)
8488
rtable = 𝒯 |> Tables.materializer(newtable)
8589

8690
# revert transform on subtable
87-
rtransform = p.transforms[ind]
88-
revert(rtransform, rtable, rcache)
91+
revert(rtrans, rtable, rcache)
8992
end
9093

9194
"""

0 commit comments

Comments
 (0)