Skip to content

Commit 037e602

Browse files
committed
[FIX] Added support for the FFAT_Aggregator_GPU operator
1 parent 89c1c0e commit 037e602

30 files changed

+2254
-106
lines changed

API

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,16 @@ The corresponding builder needs two parameters (for the lift and combine logics)
102102
void(const result_t &, const result_t &, result_t &);
103103
void(const result_t &, const result_t &, result_t &, RuntimeContext &);
104104

105+
FFAT_ACCUMULATOR_GPU
106+
--------------------
107+
The corresponding builder needs two parameters (for the lift and combine logics) with the following accepted signatures:
108+
109+
* Lift
110+
void(const tuple_t &, result_t &);
111+
112+
* Combine
113+
__host__ __device__ void(const result_t &, const result_t &, result_t &);
114+
105115
SINK
106116
----
107117
void(std::optional<tuple_t> &);

README.md

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,16 +60,16 @@ WindFlow and FastFlow are released with the <strong>LGPL-3</strong> license and
6060
# Cite our Work
6161
In order to cite our work, we kindly ask interested people to use the following reference:
6262
```
63-
@article{WF_Paper,
64-
author={Mencagli, Gabriele and Torquati, Massimo and Cardaci, Andrea and Fais, Alessandra and Rinaldi, Luca and Danelutto, Marco},
65-
journal={IEEE Transactions on Parallel and Distributed Systems},
66-
title={WindFlow: High-Speed Continuous Stream Processing With Parallel Building Blocks},
67-
year={2021},
68-
volume={32},
69-
number={11},
70-
pages={2748-2763},
71-
doi={10.1109/TPDS.2021.3073970}
72-
}
63+
@article{9408386,
64+
author={Mencagli, Gabriele and Torquati, Massimo and Cardaci, Andrea and Fais, Alessandra and Rinaldi, Luca and Danelutto, Marco},
65+
journal={IEEE Transactions on Parallel and Distributed Systems},
66+
title={WindFlow: High-Speed Continuous Stream Processing with Parallel Building Blocks},
67+
year={2021},
68+
volume={},
69+
number={},
70+
pages={1-1},
71+
doi={10.1109/TPDS.2021.3073970}
72+
}
7373
```
7474

7575
# Contributors

tests/win_tests_gpu/test_win_fat_gpu_tb.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818
* Test of the FFAT_Aggregator operator with time-based windows. This test
1919
* also includes some basic GPU operators.
2020
*
21-
* +--------------------------------------------------------------------------+
22-
* | +-----+ +-----+ +-----+ +-----+ +--------+ +-----+ |
23-
* | | S | | F | | M | | M | | FAT_TB | | S | |
24-
* | | CPU +---->+ CPU +---->+ GPU +---->+ GPU +---->+ CPU +---->+ CPU | |
25-
* | | (*) | | (*) | | (*) | | (*) | | (*) | | (*) | |
26-
* | +-----+ +-----+ +-----+ +-----+ +--------+ +-----+ |
27-
* +--------------------------------------------------------------------------+
21+
* +--------------------------------------------------------------------------+
22+
* | +-----+ +-----+ +-----+ +-----+ +--------+ +-----+ |
23+
* | | S | | F | | M | | M | | FAT_TB | | S | |
24+
* | | CPU +---->+ CPU +---->+ GPU +---->+ GPU +---->+ CPU +---->+ CPU | |
25+
* | | (*) | | (*) | | (*) | | (*) | | (*) | | (*) | |
26+
* | +-----+ +-----+ +-----+ +-----+ +--------+ +-----+ |
27+
* +--------------------------------------------------------------------------+
2828
*/
2929

3030
// includes
@@ -122,13 +122,13 @@ int main(int argc, char *argv[])
122122
.withOutputBatchSize(dist_b(rng))
123123
.build();
124124
mp.chain(filter);
125-
Map_Functor map_functor1;
125+
Map_Functor_GPU map_functor1;
126126
Map_GPU mapgpu1 = MapGPU_Builder(map_functor1)
127127
.withName("mapgpu1")
128128
.withParallelism(map1_degree)
129129
.build();
130130
mp.chain(mapgpu1);
131-
Map_Functor_KB map_functor_gpu2;
131+
Map_Functor_GPU_KB map_functor_gpu2;
132132
Map_GPU mapgpu2 = MapGPU_Builder(map_functor_gpu2)
133133
.withName("mapgpu2")
134134
.withParallelism(map2_degree)
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/******************************************************************************
2+
* This program is free software; you can redistribute it and/or modify it
3+
* under the terms of the GNU Lesser General Public License version 3 as
4+
* published by the Free Software Foundation.
5+
*
6+
* This program is distributed in the hope that it will be useful, but WITHOUT
7+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
8+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
9+
* License for more details.
10+
*
11+
* You should have received a copy of the GNU Lesser General Public License
12+
* along with this program; if not, write to the Free Software Foundation,
13+
* Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
14+
******************************************************************************
15+
*/
16+
17+
/*
18+
* Test of the FFAT_Aggregator_GPU operator with time-based windows.
19+
*
20+
* +--------------------------------------------------------------------------+
21+
* | +-----+ +-----+ +-----+ +-----+ +--------+ +-----+ |
22+
* | | S | | F | | M | | M | | FAT_TB | | S | |
23+
* | | CPU +---->+ CPU +---->+ GPU +---->+ GPU +---->+ GPU +---->+ CPU | |
24+
* | | (*) | | (*) | | (*) | | (*) | | (*) | | (*) | |
25+
* | +-----+ +-----+ +-----+ +-----+ +--------+ +-----+ |
26+
* +--------------------------------------------------------------------------+
27+
*/
28+
29+
// includes
30+
#include<string>
31+
#include<random>
32+
#include<iostream>
33+
#include<math.h>
34+
#include<ff/ff.hpp>
35+
#include<windflow.hpp>
36+
#include<windflow_gpu.hpp>
37+
#include"win_common_gpu.hpp"
38+
39+
using namespace std;
40+
using namespace chrono;
41+
using namespace wf;
42+
43+
// global variable for the result
44+
extern atomic<long> global_sum;
45+
46+
// main
47+
int main(int argc, char *argv[])
48+
{
49+
int option = 0;
50+
size_t runs = 1;
51+
size_t stream_len = 0;
52+
size_t win_len = 0;
53+
size_t win_slide = 0;
54+
size_t n_keys = 1;
55+
// initalize global variable
56+
global_sum = 0;
57+
// arguments from command line
58+
if (argc != 11) {
59+
cout << argv[0] << " -r [runs] -l [stream_length] -k [n_keys] -w [win length usec] -s [win slide usec]" << endl;
60+
exit(EXIT_SUCCESS);
61+
}
62+
while ((option = getopt(argc, argv, "r:l:k:w:s:b:")) != -1) {
63+
switch (option) {
64+
case 'r': runs = atoi(optarg);
65+
break;
66+
case 'l': stream_len = atoi(optarg);
67+
break;
68+
case 'k': n_keys = atoi(optarg);
69+
break;
70+
case 'w': win_len = atoi(optarg);
71+
break;
72+
case 's': win_slide = atoi(optarg);
73+
break;
74+
default: {
75+
cout << argv[0] << " -r [runs] -l [stream_length] -k [n_keys] -w [win length usec] -s [win slide usec]" << endl;
76+
exit(EXIT_SUCCESS);
77+
}
78+
}
79+
}
80+
// set random seed
81+
mt19937 rng;
82+
rng.seed(std::random_device()());
83+
size_t min = 1;
84+
size_t max = 9;
85+
std::uniform_int_distribution<std::mt19937::result_type> dist_p(min, max);
86+
std::uniform_int_distribution<std::mt19937::result_type> dist_b(100, 200);
87+
int filter_degree, map1_degree, map2_degree, map3_degree, fat_gpu_degree;
88+
size_t source_degree, sink_degree;
89+
long last_result = 0;
90+
source_degree = 1; // dist_p(rng);
91+
// executes the runs in DEFAULT mode
92+
for (size_t i=0; i<runs; i++) {
93+
filter_degree = dist_p(rng);
94+
map1_degree = dist_p(rng);
95+
map2_degree = dist_p(rng);
96+
map3_degree = dist_p(rng);
97+
fat_gpu_degree = dist_p(rng);
98+
sink_degree = dist_p(rng);
99+
cout << "Run " << i << endl;
100+
std::cout << "+--------------------------------------------------------------------------+" << std::endl;
101+
std::cout << "| +-----+ +-----+ +-----+ +-----+ +--------+ +-----+ |" << std::endl;
102+
std::cout << "| | S | | F | | M | | M | | FAT_TB | | S | |" << std::endl;
103+
std::cout << "| | CPU +---->+ CPU +---->+ GPU +---->+ GPU +---->+ GPU +---->+ CPU | |" << std::endl;
104+
std::cout << "| | (" << source_degree << ") | | (" << filter_degree << ") | | (" << map1_degree << ") | | (" << map2_degree << ") | | (" << fat_gpu_degree << ") | | (" << sink_degree << ") | |" << std::endl;
105+
std::cout << "| +-----+ +-----+ +-----+ +-----+ +--------+ +-----+ |" << std::endl;
106+
std::cout << "+--------------------------------------------------------------------------+" << std::endl;
107+
// prepare the test
108+
PipeGraph graph("test_win_fatgpu_gpu_tb", Execution_Mode_t::DEFAULT, Time_Policy_t::EVENT_TIME);
109+
Source_Positive_Functor source_functor(stream_len, n_keys, true);
110+
Source source = Source_Builder(source_functor)
111+
.withName("source")
112+
.withParallelism(source_degree)
113+
.withOutputBatchSize(dist_b(rng))
114+
.build();
115+
MultiPipe &mp = graph.add_source(source);
116+
Filter_Functor_KB filter_functor;
117+
Filter filter = Filter_Builder(filter_functor)
118+
.withName("filter")
119+
.withParallelism(filter_degree)
120+
.withKeyBy([](const tuple_t &t) -> size_t { return t.key; })
121+
.withOutputBatchSize(dist_b(rng))
122+
.build();
123+
mp.chain(filter);
124+
Map_Functor_GPU map_functor1;
125+
Map_GPU mapgpu1 = MapGPU_Builder(map_functor1)
126+
.withName("mapgpu1")
127+
.withParallelism(map1_degree)
128+
.build();
129+
mp.chain(mapgpu1);
130+
Map_Functor_GPU_KB map_functor_gpu2;
131+
Map_GPU mapgpu2 = MapGPU_Builder(map_functor_gpu2)
132+
.withName("mapgpu2")
133+
.withParallelism(map2_degree)
134+
.withKeyBy([] __host__ __device__ (const tuple_t &t) -> size_t { return t.key; })
135+
.build();
136+
mp.chain(mapgpu2);
137+
Lift_Functor lift_functor;
138+
Comb_Functor_GPU comb_functor_gpu;
139+
FFAT_Aggregator_GPU fatagg_gpu = FFAT_AggregatorGPU_Builder(lift_functor, comb_functor_gpu)
140+
.withName("ffat_agg_gpu")
141+
.withParallelism(fat_gpu_degree)
142+
.withKeyBy([] __host__ __device__ (const tuple_t &t) -> size_t { return t.key; })
143+
.withTBWindows(microseconds(win_len), microseconds(win_slide), microseconds(100))
144+
.withOutputBatchSize(dist_b(rng))
145+
.build();
146+
mp.add(fatagg_gpu);
147+
Sink_Functor sink_functor;
148+
Sink sink = Sink_Builder(sink_functor)
149+
.withName("sink")
150+
.withParallelism(sink_degree)
151+
.build();
152+
mp.chain_sink(sink);
153+
// run the application
154+
graph.run();
155+
if (i == 0) {
156+
last_result = global_sum;
157+
cout << "Result is --> " << GREEN << "OK" << DEFAULT_COLOR << " value " << global_sum.load() << endl;
158+
}
159+
else {
160+
if (last_result == global_sum) {
161+
cout << "Result is --> " << GREEN << "OK" << DEFAULT_COLOR << " value " << global_sum.load() << endl;
162+
}
163+
else {
164+
cout << "Result is --> " << RED << "FAILED" << DEFAULT_COLOR << " value " << global_sum.load() << endl;
165+
abort();
166+
}
167+
}
168+
global_sum = 0;
169+
}
170+
return 0;
171+
}

tests/win_tests_gpu/test_win_kw_gpu_tb.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818
* Test of the Keyed_Windows operator with time-based windows. This test
1919
* also includes some basic GPU operators.
2020
*
21-
* +-------------------------------------------------------------------------+
22-
* | +-----+ +-----+ +-----+ +-----+ +-------+ +-----+ |
23-
* | | S | | F | | M | | M | | KW_TB | | S | |
24-
* | | CPU +---->+ CPU +---->+ GPU +---->+ GPU +---->+ CPU +---->+ CPU | |
25-
* | | (*) | | (*) | | (*) | | (*) | | (*) | | (*) | |
26-
* | +-----+ +-----+ +-----+ +-----+ +-------+ +-----+ |
27-
* +-------------------------------------------------------------------------+
21+
* +-------------------------------------------------------------------------+
22+
* | +-----+ +-----+ +-----+ +-----+ +-------+ +-----+ |
23+
* | | S | | F | | M | | M | | KW_TB | | S | |
24+
* | | CPU +---->+ CPU +---->+ GPU +---->+ GPU +---->+ CPU +---->+ CPU | |
25+
* | | (*) | | (*) | | (*) | | (*) | | (*) | | (*) | |
26+
* | +-----+ +-----+ +-----+ +-----+ +-------+ +-----+ |
27+
* +-------------------------------------------------------------------------+
2828
*/
2929

3030
// includes
@@ -122,13 +122,13 @@ int main(int argc, char *argv[])
122122
.withOutputBatchSize(dist_b(rng))
123123
.build();
124124
mp.chain(filter);
125-
Map_Functor map_functor1;
125+
Map_Functor_GPU map_functor1;
126126
Map_GPU mapgpu1 = MapGPU_Builder(map_functor1)
127127
.withName("mapgpu1")
128128
.withParallelism(map1_degree)
129129
.build();
130130
mp.chain(mapgpu1);
131-
Map_Functor_KB map_functor_gpu2;
131+
Map_Functor_GPU_KB map_functor_gpu2;
132132
Map_GPU mapgpu2 = MapGPU_Builder(map_functor_gpu2)
133133
.withName("mapgpu2")
134134
.withParallelism(map2_degree)

tests/win_tests_gpu/test_win_mrw_gpu_tb.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818
* Test of the MapReduce_Windows operator with time-based windows. This test
1919
* also includes some basic GPU operators.
2020
*
21-
* +--------------------------------------------------------------------------+
22-
* | +-----+ +-----+ +-----+ +-----+ +--------+ +-----+ |
23-
* | | S | | F | | M | | M | | MRW_TB | | S | |
24-
* | | CPU +---->+ CPU +---->+ GPU +---->+ GPU +---->+ CPU +---->+ CPU | |
25-
* | | (*) | | (*) | | (*) | | (*) | | (*,*) | | (*) | |
26-
* | +-----+ +-----+ +-----+ +-----+ +--------+ +-----+ |
27-
* +--------------------------------------------------------------------------+
21+
* +--------------------------------------------------------------------------+
22+
* | +-----+ +-----+ +-----+ +-----+ +--------+ +-----+ |
23+
* | | S | | F | | M | | M | | MRW_TB | | S | |
24+
* | | CPU +---->+ CPU +---->+ GPU +---->+ GPU +---->+ CPU +---->+ CPU | |
25+
* | | (*) | | (*) | | (*) | | (*) | | (*,*) | | (*) | |
26+
* | +-----+ +-----+ +-----+ +-----+ +--------+ +-----+ |
27+
* +--------------------------------------------------------------------------+
2828
*/
2929

3030
// includes
@@ -123,13 +123,13 @@ int main(int argc, char *argv[])
123123
.withOutputBatchSize(dist_b(rng))
124124
.build();
125125
mp.chain(filter);
126-
Map_Functor map_functor1;
126+
Map_Functor_GPU map_functor1;
127127
Map_GPU mapgpu1 = MapGPU_Builder(map_functor1)
128128
.withName("mapgpu1")
129129
.withParallelism(map1_degree)
130130
.build();
131131
mp.chain(mapgpu1);
132-
Map_Functor_KB map_functor_gpu2;
132+
Map_Functor_GPU_KB map_functor_gpu2;
133133
Map_GPU mapgpu2 = MapGPU_Builder(map_functor_gpu2)
134134
.withName("mapgpu2")
135135
.withParallelism(map2_degree)

tests/win_tests_gpu/test_win_paw_gpu_tb.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818
* Test of the Paned_Windows operator with time-based windows. This test
1919
* also includes some basic GPU operators.
2020
*
21-
* +--------------------------------------------------------------------------+
22-
* | +-----+ +-----+ +-----+ +-----+ +--------+ +-----+ |
23-
* | | S | | F | | M | | M | | PaW_TB | | S | |
24-
* | | CPU +---->+ CPU +---->+ GPU +---->+ GPU +---->+ CPU +---->+ CPU | |
25-
* | | (*) | | (*) | | (*) | | (*) | | (*,*) | | (*) | |
26-
* | +-----+ +-----+ +-----+ +-----+ +--------+ +-----+ |
27-
* +--------------------------------------------------------------------------+
21+
* +--------------------------------------------------------------------------+
22+
* | +-----+ +-----+ +-----+ +-----+ +--------+ +-----+ |
23+
* | | S | | F | | M | | M | | PaW_TB | | S | |
24+
* | | CPU +---->+ CPU +---->+ GPU +---->+ GPU +---->+ CPU +---->+ CPU | |
25+
* | | (*) | | (*) | | (*) | | (*) | | (*,*) | | (*) | |
26+
* | +-----+ +-----+ +-----+ +-----+ +--------+ +-----+ |
27+
* +--------------------------------------------------------------------------+
2828
*/
2929

3030
// includes
@@ -123,13 +123,13 @@ int main(int argc, char *argv[])
123123
.withOutputBatchSize(dist_b(rng))
124124
.build();
125125
mp.chain(filter);
126-
Map_Functor map_functor1;
126+
Map_Functor_GPU map_functor1;
127127
Map_GPU mapgpu1 = MapGPU_Builder(map_functor1)
128128
.withName("mapgpu1")
129129
.withParallelism(map1_degree)
130130
.build();
131131
mp.chain(mapgpu1);
132-
Map_Functor_KB map_functor_gpu2;
132+
Map_Functor_GPU_KB map_functor_gpu2;
133133
Map_GPU mapgpu2 = MapGPU_Builder(map_functor_gpu2)
134134
.withName("mapgpu2")
135135
.withParallelism(map2_degree)

0 commit comments

Comments
 (0)