Skip to content

Commit 5255bb3

Browse files
support gather
1 parent 3b92016 commit 5255bb3

2 files changed

Lines changed: 61 additions & 2 deletions

File tree

app/Main.hs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ main = do
1212
scheduler = BFSScheduler
1313

1414
broadcast = buildSchedule scheduler tiling members shape
15-
reduce = reverseSchedule broadcast
15+
converge = reverseSchedule broadcast
1616

1717
full = rootTile shape
1818
occE = Occlusion (== "E")
@@ -109,7 +109,7 @@ main = do
109109

110110
putStrLn "\nrunning full-mesh reduce:"
111111
runReduce
112-
reduce
112+
converge
113113
[ ("A", 1),
114114
("B", 2),
115115
("C", 3),
@@ -122,6 +122,20 @@ main = do
122122
(+)
123123
"A"
124124

125+
putStrLn "\nrunning full-mesh gather:"
126+
runGather
127+
converge
128+
[ ("A", "value-a"),
129+
("B", "value-b"),
130+
("C", "value-c"),
131+
("D", "value-d"),
132+
("E", "value-e"),
133+
("F", "value-f"),
134+
("G", "value-g"),
135+
("H", "value-h")
136+
]
137+
"A"
138+
125139
expectTile :: String -> Maybe Tile -> Tile
126140
expectTile _ (Just tile) = tile
127141
expectTile label Nothing = error ("expected " ++ label)

src/Tile/Execution.hs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
module Tile.Execution
22
( adjacencyList,
33
runBroadcast,
4+
runGather,
45
runReduce,
56
)
67
where
@@ -92,3 +93,47 @@ runReduce schedule initialValues combine root = do
9293
writeChan (chanMap Map.! m) (valueMap Map.! m)
9394

9495
threadDelay 1000000
96+
97+
runGather ::
98+
(Show a) =>
99+
Schedule String ->
100+
[(String, a)] ->
101+
String ->
102+
IO ()
103+
runGather schedule initialValues root = do
104+
let graph = adjacencyList schedule
105+
incoming = incomingCounts schedule
106+
members =
107+
Set.toList $
108+
Set.fromList $
109+
Map.keys graph ++ concat (Map.elems graph) ++ map fst initialValues
110+
111+
chanPairs <- forM members $ \m -> do
112+
ch <- newChan
113+
pure (m, ch)
114+
115+
let chanMap = Map.fromList chanPairs
116+
valueMap = Map.fromList initialValues
117+
118+
forM_ members $ \m -> do
119+
let inbox = chanMap Map.! m
120+
children = Map.findWithDefault [] m graph
121+
childChans = [(c, chanMap Map.! c) | c <- children]
122+
expected = Map.findWithDefault 0 m incoming
123+
localValue = [(m, valueMap Map.! m)]
124+
125+
_ <- forkIO $ do
126+
received <- concat <$> replicateM expected (readChan inbox)
127+
let gathered = localValue ++ received
128+
if m == root
129+
then putStrLn $ m ++ " gathered result: " ++ show gathered
130+
else forM_ childChans $ \(childName, childInbox) -> do
131+
putStrLn $ m ++ " sending gathered values " ++ show gathered ++ " to " ++ childName
132+
writeChan childInbox gathered
133+
pure ()
134+
135+
forM_ members $ \m ->
136+
when (Map.findWithDefault 0 m incoming == 0) $
137+
writeChan (chanMap Map.! m) [(m, valueMap Map.! m)]
138+
139+
threadDelay 1000000

0 commit comments

Comments
 (0)